Skip to content

Benchmarkers

hgp_lib.benchmarkers.gp_benchmarker.GPBenchmarker

Benchmarker for Boolean GP: runs multiple independent experiments with stratified train/test split and k-fold CV per run, then aggregates results.

Data flow per run:

  1. Stratified train/test split (using test_size).
  2. For each of the n_folds folds:

a. A fresh copy of the binarizer is fitted on the training fold. b. The validation fold is transformed using the same fitted binarizer. c. Binarized data is converted to boolean numpy arrays and used for GP training.

  1. The best fold (highest validation score) is selected. Its binarizer is used to transform the held-out test set for final evaluation.

Raw (non-binarized) data should be passed as a pandas.DataFrame in BenchmarkerConfig.data. Binarization happens internally per fold to prevent data leakage.

Parameters:

Name Type Description Default
config BenchmarkerConfig

Configuration with data, labels, binarizer, trainer_config template, and benchmarker-specific options. See BenchmarkerConfig for more details.

required

Examples:

>>> import numpy as np
>>> import pandas as pd
>>> from hgp_lib.configs import BooleanGPConfig, TrainerConfig, BenchmarkerConfig
>>> from hgp_lib.benchmarkers import GPBenchmarker
>>> from hgp_lib.rules import Rule
>>> data = pd.DataFrame({
...     "f1": [True, False, True, False, True, False, True, False],
...     "f2": [False, True, True, False, False, True, True, False],
... })
>>> labels = np.array([1, 0, 1, 0, 1, 0, 1, 0])
>>> def acc(p, l): return float((p == l).mean())
>>> gp_config = BooleanGPConfig(score_fn=acc, optimize_scorer=False)
>>> trainer_config = TrainerConfig(gp_config=gp_config, num_epochs=3, progress_bar=False)
>>> config = BenchmarkerConfig(
...     data=data, labels=labels, trainer_config=trainer_config,
...     num_runs=2, n_folds=2, n_jobs=1,
... )
>>> benchmarker = GPBenchmarker(config)
>>> result = benchmarker.fit()
>>> len(result.runs)
2
>>> isinstance(result.best_rule, Rule)
True
Source code in hgp_lib\benchmarkers\gp_benchmarker.py
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
class GPBenchmarker:
    """
    Benchmarker for Boolean GP: runs multiple independent experiments with
    stratified train/test split and k-fold CV per run, then aggregates results.

    Data flow per run:

    1. Stratified train/test split (using `test_size`).
    2. For each of the `n_folds` folds:

       a. A fresh copy of the `binarizer` is fitted on the training fold.
       b. The validation fold is transformed using the same fitted binarizer.
       c. Binarized data is converted to boolean numpy arrays and used for GP
          training.

    3. The best fold (highest validation score) is selected. Its binarizer is
       used to transform the held-out test set for final evaluation.

    Raw (non-binarized) data should be passed as a
    `pandas.DataFrame` in `BenchmarkerConfig.data`. Binarization happens
    internally per fold to prevent data leakage.

    Args:
        config (BenchmarkerConfig): Configuration with data,
            labels, binarizer, trainer_config template, and benchmarker-specific
            options. See `BenchmarkerConfig` for more details.

    Examples:
        >>> import numpy as np
        >>> import pandas as pd
        >>> from hgp_lib.configs import BooleanGPConfig, TrainerConfig, BenchmarkerConfig
        >>> from hgp_lib.benchmarkers import GPBenchmarker
        >>> from hgp_lib.rules import Rule
        >>> data = pd.DataFrame({
        ...     "f1": [True, False, True, False, True, False, True, False],
        ...     "f2": [False, True, True, False, False, True, True, False],
        ... })
        >>> labels = np.array([1, 0, 1, 0, 1, 0, 1, 0])
        >>> def acc(p, l): return float((p == l).mean())
        >>> gp_config = BooleanGPConfig(score_fn=acc, optimize_scorer=False)
        >>> trainer_config = TrainerConfig(gp_config=gp_config, num_epochs=3, progress_bar=False)
        >>> config = BenchmarkerConfig(
        ...     data=data, labels=labels, trainer_config=trainer_config,
        ...     num_runs=2, n_folds=2, n_jobs=1,
        ... )
        >>> benchmarker = GPBenchmarker(config)
        >>> result = benchmarker.fit()
        >>> len(result.runs)
        2
        >>> isinstance(result.best_rule, Rule)
        True
    """

    def __init__(self, config: BenchmarkerConfig):
        validate_benchmarker_config(config)
        self.config = config
        if self.config.binarizer is None:
            self.config.binarizer = StandardBinarizer()
        self._run_results: ExperimentResult | None = None

    def _effective_n_jobs(self) -> int:
        """
        Compute the effective number of parallel jobs to use.

        Returns:
            int: The number of parallel workers to use (always >= 1).

        Examples:
            >>> import numpy as np
            >>> import pandas as pd
            >>> from hgp_lib.configs import BooleanGPConfig, TrainerConfig, BenchmarkerConfig
            >>> from hgp_lib.benchmarkers import GPBenchmarker
            >>> data = pd.DataFrame({"f": [True, False, True, False, True, False, True, False]})
            >>> labels = np.array([1, 0, 1, 0, 1, 0, 1, 0])
            >>> def acc(p, l): return float((p == l).mean())
            >>> gp = BooleanGPConfig(score_fn=acc, optimize_scorer=False)
            >>> tc = TrainerConfig(gp_config=gp, num_epochs=1, progress_bar=False)
            >>> cfg = BenchmarkerConfig(data=data, labels=labels, trainer_config=tc,
            ...     num_runs=3, n_folds=2, n_jobs=1)
            >>> GPBenchmarker(cfg)._effective_n_jobs()
            1
        """
        n_jobs = self.config.n_jobs
        if n_jobs < 0:
            n_jobs = os.cpu_count() or 1
        return max(1, min(n_jobs, self.config.num_runs))

    def _run_sequential(self) -> ExperimentResult:
        """Run all benchmark runs sequentially with nested progress bars."""
        run_results: List[RunResult] = []

        if self.config.show_run_progress:
            self.config.trainer_config.leave_progress_bar = False

        for run_id in tqdm(
            range(self.config.num_runs),
            desc="Benchmark Runs",
            disable=not self.config.show_run_progress,
        ):
            result = execute_single_run(
                run_id, self.config.base_seed + run_id, self.config
            )
            run_results.append(result)

        return ExperimentResult(runs=run_results)

    def _run_parallel(self, n_jobs: int) -> ExperimentResult:
        """Run all benchmark runs in parallel with centralized progress bars."""
        show_progress = self.config.trainer_config.progress_bar

        total_runs = self.config.num_runs
        total_folds = total_runs * self.config.n_folds
        total_epochs = total_folds * self.config.trainer_config.num_epochs

        progress_config = ProgressConfig(
            total_runs=total_runs,
            total_folds=total_folds,
            total_epochs=total_epochs,
            show_run_progress=self.config.show_run_progress and show_progress,
            show_fold_progress=self.config.show_fold_progress and show_progress,
            show_epoch_progress=self.config.show_epoch_progress and show_progress,
        )

        manager = multiprocessing.Manager()
        queue = manager.Queue()

        listener = ProgressListener(queue, progress_config)
        listener.start()

        run_args = [
            (run_id, self.config.base_seed + run_id, self.config, queue)
            for run_id in range(self.config.num_runs)
        ]

        try:
            with multiprocessing.Pool(processes=n_jobs) as pool:
                run_results = pool.map(single_run_wrapper, run_args)
            # Normal completion - wait for listener to finish processing
            listener.join()
        except Exception:
            # Error occurred - force stop the listener to prevent hang
            listener.stop()
            raise

        return ExperimentResult(runs=run_results)

    def fit(self) -> ExperimentResult:
        """
        Run all benchmark runs (parallel or sequential) and aggregate results.

        Each run performs a stratified train/test split, k-fold CV with per-fold
        binarization, and test-set evaluation. Results across runs are aggregated
        into an ExperimentResult.

        Returns:
            ExperimentResult: Contains all run results with methods to get
                best_run, best_rule, test scores statistics, etc.
        """
        effective_n_jobs = self._effective_n_jobs()

        if effective_n_jobs == 1:
            run_results = self._run_sequential()
        else:
            run_results = self._run_parallel(effective_n_jobs)

        self._run_results = run_results
        return run_results

fit()

Run all benchmark runs (parallel or sequential) and aggregate results.

Each run performs a stratified train/test split, k-fold CV with per-fold binarization, and test-set evaluation. Results across runs are aggregated into an ExperimentResult.

Returns:

Name Type Description
ExperimentResult ExperimentResult

Contains all run results with methods to get best_run, best_rule, test scores statistics, etc.

Source code in hgp_lib\benchmarkers\gp_benchmarker.py
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
def fit(self) -> ExperimentResult:
    """
    Run all benchmark runs (parallel or sequential) and aggregate results.

    Each run performs a stratified train/test split, k-fold CV with per-fold
    binarization, and test-set evaluation. Results across runs are aggregated
    into an ExperimentResult.

    Returns:
        ExperimentResult: Contains all run results with methods to get
            best_run, best_rule, test scores statistics, etc.
    """
    effective_n_jobs = self._effective_n_jobs()

    if effective_n_jobs == 1:
        run_results = self._run_sequential()
    else:
        run_results = self._run_parallel(effective_n_jobs)

    self._run_results = run_results
    return run_results

hgp_lib.benchmarkers.runner.execute_single_run(run_id, seed, config, progress_queue=None)

Execute one benchmark run: stratified train/test split, per-fold binarization, k-fold CV training, best-fold selection, and test-set evaluation.

This is a module-level function so it can be pickled for multiprocessing.

Per-fold binarization: For each fold a fresh clone of the configured binarizer is fitted on the training fold (with labels, enabling supervised binning for numerical features) and used to transform the validation fold. After selecting the best fold, its binarizer transforms the held-out test data. This prevents data leakage across folds and between train/test sets.

Parameters:

Name Type Description Default
run_id int

Index of the run (0-based).

required
seed int

Random seed for stratified split and k-fold.

required
config BenchmarkerConfig

Configuration for the benchmark run.

required
progress_queue Queue | None

Optional queue for sending progress updates.

None

Returns:

Name Type Description
RunResult RunResult

Contains run_id, seed, folds, test_score, best_rule, feature_names.

Raises:

Type Description
RuntimeError

If no best rule is available after training a fold.

Source code in hgp_lib\benchmarkers\runner.py
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
def execute_single_run(
    run_id: int,
    seed: int,
    config: BenchmarkerConfig,
    progress_queue: Optional[Queue] = None,
) -> RunResult:
    """
    Execute one benchmark run: stratified train/test split, per-fold binarization,
    k-fold CV training, best-fold selection, and test-set evaluation.

    This is a module-level function so it can be pickled for `multiprocessing`.

    **Per-fold binarization:** For each fold a fresh `clone` of the configured
    binarizer is fitted on the training fold (with labels, enabling supervised
    binning for numerical features) and used to transform the validation fold.
    After selecting the best fold, its binarizer transforms the held-out test
    data. This prevents data leakage across folds and between train/test sets.

    Args:
        run_id (int): Index of the run (0-based).
        seed (int): Random seed for stratified split and k-fold.
        config (BenchmarkerConfig): Configuration for the benchmark run.
        progress_queue (Queue | None): Optional queue for sending progress updates.

    Returns:
        RunResult: Contains run_id, seed, folds, test_score, best_rule, feature_names.

    Raises:
        RuntimeError: If no best rule is available after training a fold.
    """
    trainer_template = config.trainer_config
    gp_template = trainer_template.gp_config

    use_queue = progress_queue is not None
    show_folds = (
        config.show_fold_progress and trainer_template.progress_bar and not use_queue
    )
    show_epochs = (
        config.show_epoch_progress and trainer_template.progress_bar and not use_queue
    )

    np.random.seed(seed)
    random.seed(seed)

    train_data, test_data, train_labels, test_labels = train_test_split(
        config.data,
        config.labels,
        test_size=config.test_size,
        stratify=config.labels,
        random_state=seed,
    )

    skf = StratifiedKFold(n_splits=config.n_folds, shuffle=True, random_state=seed)

    folds: List[PopulationHistory] = []
    binarizers = []
    feature_names_per_binarizer = []

    fold_splits = skf.split(train_data, train_labels)
    if show_folds:
        fold_splits = tqdm(fold_splits, total=config.n_folds, desc="Folds", leave=False)

    epoch_callback = ProgressSender(progress_queue, "epoch") if use_queue else None

    best_fold_idx = 0
    best_fold_score = -float("inf")
    for fold_idx, (train_idx, val_idx) in enumerate(fold_splits):
        fold_train = train_data.iloc[train_idx]
        fold_train_labels = train_labels[train_idx]

        binarizer = deepcopy(config.binarizer)
        fold_train = binarizer.fit_transform(fold_train, fold_train_labels)
        binarizers.append(binarizer)
        feature_names_per_binarizer.append(
            {i: col for i, col in enumerate(fold_train.columns)}
        )
        fold_train = fold_train.to_numpy(dtype=bool)

        fold_val = binarizer.transform(train_data.iloc[val_idx]).to_numpy(dtype=bool)
        fold_val_labels = train_labels[val_idx]

        fold_gp_config = replace(
            gp_template,
            train_data=fold_train,
            train_labels=fold_train_labels,
        )
        fold_trainer_config = replace(
            trainer_template,
            gp_config=fold_gp_config,
            val_data=fold_val,
            val_labels=fold_val_labels,
            progress_bar=show_epochs,
            progress_callback=epoch_callback,
        )

        trainer = GPTrainer(fold_trainer_config)
        history = trainer.fit()

        # Prefer validation score; fall back to training score
        fold_score = (
            history.best_val_score
            if history.best_val_score is not None
            else history.best_train_score
        )
        if fold_score is not None and fold_score > best_fold_score:
            best_fold_score = fold_score
            best_fold_idx = fold_idx

        folds.append(history)

        send_progress(progress_queue, "fold", 1)

    del train_data, train_labels

    best_rule = folds[best_fold_idx].global_best_rule

    # Transform test data using best fold's binarizer
    binarizer_here = binarizers[best_fold_idx]
    feature_names = feature_names_per_binarizer[best_fold_idx]
    test_data = binarizer_here.transform(test_data).to_numpy(dtype=bool)

    if gp_template.optimize_scorer:
        test_score_fn, test_cm, test_data, test_labels = optimize_scorers_for_data(
            gp_template.score_fn, confusion_matrix, data=test_data, labels=test_labels
        )
    else:
        test_score_fn = gp_template.score_fn
        test_cm = confusion_matrix

    test_pred = best_rule.evaluate(test_data)

    test_score = float(test_score_fn(test_pred, test_labels))

    tp, fp, fn, tn = test_cm(test_pred, test_labels)

    send_progress(progress_queue, "run", 1)

    return RunResult(
        run_id=run_id,
        seed=seed,
        best_fold_idx=best_fold_idx,
        folds=folds,
        test_score=test_score,
        test_tp=tp,
        test_fp=fp,
        test_fn=fn,
        test_tn=tn,
        feature_names=feature_names,
    )

hgp_lib.benchmarkers.progress.ProgressListener

Listener thread for aggregating progress updates from worker processes.

Runs in the main process and listens to a multiprocessing queue for progress updates from worker processes. Updates three tqdm progress bars: - Experiments (runs) - Folds - Epochs

The listener uses timeouts on queue.get() to periodically check for stop signals, preventing indefinite hangs if workers crash.

Parameters:

Name Type Description Default
progress_queue Queue

Multiprocessing queue for receiving progress updates.

required
config ProgressConfig

Progress configuration with totals and display settings.

required

Examples:

>>> from multiprocessing import Queue
>>> from hgp_lib.benchmarkers.progress import ProgressConfig, ProgressListener
>>> q = Queue()
>>> cfg = ProgressConfig(
...     total_runs=1, total_folds=1, total_epochs=1,
...     show_run_progress=False, show_fold_progress=False,
...     show_epoch_progress=False,
... )
>>> listener = ProgressListener(q, cfg)
>>> listener.start()
>>> q.put(("epoch", 1))
>>> q.put(("fold", 1))
>>> q.put(("run", 1))
>>> listener.join()
Source code in hgp_lib\benchmarkers\progress.py
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
class ProgressListener:
    """
    Listener thread for aggregating progress updates from worker processes.

    Runs in the main process and listens to a multiprocessing queue for
    progress updates from worker processes. Updates three tqdm progress bars:
    - Experiments (runs)
    - Folds
    - Epochs

    The listener uses timeouts on queue.get() to periodically check for stop
    signals, preventing indefinite hangs if workers crash.

    Args:
        progress_queue: Multiprocessing queue for receiving progress updates.
        config: Progress configuration with totals and display settings.

    Examples:
        >>> from multiprocessing import Queue
        >>> from hgp_lib.benchmarkers.progress import ProgressConfig, ProgressListener
        >>> q = Queue()
        >>> cfg = ProgressConfig(
        ...     total_runs=1, total_folds=1, total_epochs=1,
        ...     show_run_progress=False, show_fold_progress=False,
        ...     show_epoch_progress=False,
        ... )
        >>> listener = ProgressListener(q, cfg)
        >>> listener.start()
        >>> q.put(("epoch", 1))
        >>> q.put(("fold", 1))
        >>> q.put(("run", 1))
        >>> listener.join()
    """

    def __init__(self, progress_queue: Queue, config: ProgressConfig):
        self.queue = progress_queue
        self.config = config
        self._thread: threading.Thread | None = None
        self._stop_event = threading.Event()
        self._pbar_exp: tqdm | None = None
        self._pbar_fold: tqdm | None = None
        self._pbar_epoch: tqdm | None = None

    def start(self) -> None:
        """Start the listener thread."""
        self._stop_event.clear()
        self._thread = threading.Thread(target=self._listen, daemon=True)
        self._thread.start()

    def stop(self) -> None:
        """Signal the listener to stop and wait for it to finish."""
        self._stop_event.set()
        # Send sentinel to unblock queue.get() if waiting
        try:
            self.queue.put(_SHUTDOWN_SENTINEL)
        except (BrokenPipeError, EOFError):
            pass  # Queue already closed
        if self._thread is not None:
            self._thread.join(timeout=10.0)

    def join(self) -> None:
        """Wait for the listener thread to finish naturally (all runs completed)."""
        if self._thread is not None:
            self._thread.join()

    def _listen(self) -> None:
        """
        Main listener loop. Uses timeout to allow periodic stop checks.

        Expected message format: (msg_type, count)
        - ("epoch", n): Update epoch bar by n
        - ("fold", n): Update fold bar by n
        - ("run", n): Update run bar by n
        """
        # Initialize progress bars (position=0 is bottom, position=2 is top)
        self._pbar_exp = tqdm(
            total=self.config.total_runs,
            position=0,
            desc="Runs",
            leave=True,
            disable=not self.config.show_run_progress,
        )
        self._pbar_fold = tqdm(
            total=self.config.total_folds,
            position=1,
            desc="Folds",
            leave=True,
            disable=not self.config.show_fold_progress,
        )
        self._pbar_epoch = tqdm(
            total=self.config.total_epochs,
            position=2,
            desc="Epochs",
            leave=True,
            disable=not self.config.show_epoch_progress,
        )

        finished_runs = 0

        try:
            while finished_runs < self.config.total_runs:
                # Check if stop was requested
                if self._stop_event.is_set():
                    break

                try:
                    msg, count = self.queue.get(timeout=_QUEUE_TIMEOUT_SECONDS)
                except queue.Empty:
                    # Timeout - loop back to check stop_event and continue waiting
                    continue

                # Check for shutdown sentinel
                if msg == _SHUTDOWN_SENTINEL[0]:
                    break

                if msg == "epoch":
                    self._pbar_epoch.update(count)
                elif msg == "fold":
                    self._pbar_fold.update(count)
                elif msg == "run":
                    self._pbar_exp.update(count)
                    finished_runs += count
        finally:
            # Ensure bars are closed properly
            if self._pbar_epoch is not None:
                self._pbar_epoch.close()
            if self._pbar_fold is not None:
                self._pbar_fold.close()
            if self._pbar_exp is not None:
                self._pbar_exp.close()

start()

Start the listener thread.

Source code in hgp_lib\benchmarkers\progress.py
78
79
80
81
82
def start(self) -> None:
    """Start the listener thread."""
    self._stop_event.clear()
    self._thread = threading.Thread(target=self._listen, daemon=True)
    self._thread.start()

stop()

Signal the listener to stop and wait for it to finish.

Source code in hgp_lib\benchmarkers\progress.py
84
85
86
87
88
89
90
91
92
93
def stop(self) -> None:
    """Signal the listener to stop and wait for it to finish."""
    self._stop_event.set()
    # Send sentinel to unblock queue.get() if waiting
    try:
        self.queue.put(_SHUTDOWN_SENTINEL)
    except (BrokenPipeError, EOFError):
        pass  # Queue already closed
    if self._thread is not None:
        self._thread.join(timeout=10.0)

join()

Wait for the listener thread to finish naturally (all runs completed).

Source code in hgp_lib\benchmarkers\progress.py
95
96
97
98
def join(self) -> None:
    """Wait for the listener thread to finish naturally (all runs completed)."""
    if self._thread is not None:
        self._thread.join()

hgp_lib.benchmarkers.progress.send_progress(progress_queue, msg_type, count=1)

Send a progress update to the listener queue.

This is a safe helper that no-ops if the queue is None (sequential mode).

Parameters:

Name Type Description Default
progress_queue Queue | None

Multiprocessing queue or None for sequential mode.

required
msg_type str

Type of progress ("epoch", "fold", or "run").

required
count int

Number to increment by. Default: 1.

1

Examples:

>>> from hgp_lib.benchmarkers.progress import send_progress
>>> send_progress(None, "epoch", 5)  # no-op when queue is None
>>> from multiprocessing import Queue
>>> q = Queue()
>>> send_progress(q, "fold", 1)
>>> q.get()
('fold', 1)
Source code in hgp_lib\benchmarkers\progress.py
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
def send_progress(
    progress_queue: Optional[Queue], msg_type: str, count: int = 1
) -> None:
    """
    Send a progress update to the listener queue.

    This is a safe helper that no-ops if the queue is ``None`` (sequential mode).

    Args:
        progress_queue (Queue | None):
            Multiprocessing queue or ``None`` for sequential mode.
        msg_type (str):
            Type of progress (``"epoch"``, ``"fold"``, or ``"run"``).
        count (int):
            Number to increment by. Default: `1`.

    Examples:
        >>> from hgp_lib.benchmarkers.progress import send_progress
        >>> send_progress(None, "epoch", 5)  # no-op when queue is None
        >>> from multiprocessing import Queue
        >>> q = Queue()
        >>> send_progress(q, "fold", 1)
        >>> q.get()
        ('fold', 1)
    """
    if progress_queue is not None:
        progress_queue.put((msg_type, count))