15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629 | class BooleanGP:
"""
Boolean Genetic Programming algorithm for evolving rule-based classifiers.
This class implements a genetic programming algorithm that evolves a population of
boolean rules to optimize a fitness function. Each generation applies crossover and
mutation operations, evaluates the population, and selects the best individuals.
The algorithm tracks the current best rule (best in the current run). When enabled,
regeneration resets the population if no improvement is observed for a specified
number of epochs.
Training data and labels are provided via `BooleanGPConfig`. The number of features
(`num_features`) is derived from the data shape and passed to the configured
`population_factory` and `mutation_factory` for runtime construction of the
`PopulationGenerator` and `MutationExecutor`.
Args:
config (BooleanGPConfig): Configuration containing `train_data`,
`train_labels`, `score_fn`, `population_factory`,
`mutation_factory`, and optional components.
Examples:
>>> import numpy as np
>>> from hgp_lib.configs import BooleanGPConfig
>>> from hgp_lib.algorithms import BooleanGP
>>>
>>> def accuracy(predictions, labels):
... return np.mean(predictions == labels)
>>>
>>> train_data = np.array([[True, False, True, False], [False, True, False, True]])
>>> train_labels = np.array([1, 0])
>>> config = BooleanGPConfig(
... score_fn=accuracy,
... train_data=train_data,
... train_labels=train_labels,
... optimize_scorer=False,
... )
>>> gp = BooleanGP(config)
>>> gen_metrics = gp.step()
"""
def __init__(self, config: BooleanGPConfig, current_depth: int = 0):
validate_gp_config(config)
train_data = config.train_data
train_labels = config.train_labels
# TODO: We should add in documentation that our score_fn follows the sklearn
# standard of (predictions, labels) and sample_weight support is recommended for optimization.
# Careful! the sklearn pattern is labels, predictions!
score_fn = config.score_fn
self._original_score_fn = score_fn
if config.optimize_scorer:
score_fn, train_cm, train_data, train_labels = optimize_scorers_for_data(
config.score_fn,
confusion_matrix,
data=config.train_data,
labels=config.train_labels,
)
else:
train_cm = confusion_matrix
self.score_fn = score_fn
self.train_cm = train_cm
self.complexity_penalty = config.complexity_penalty
self.train_data = train_data
self.train_labels = train_labels
self.current_depth = current_depth
num_features = train_data.shape[1]
self.population_generator = config.population_factory.create(
num_features, score_fn, train_data, train_labels
)
self.mutation_executor = config.mutation_factory.create(
num_features, config.check_valid
)
self.crossover_executor = config.crossover_factory.create(config.check_valid)
selection = config.selection
if selection is None:
selection = TournamentSelection()
self.selection = selection
self.regeneration = config.regeneration
self.regeneration_patience = config.regeneration_patience
self.population = self.population_generator.generate()
self.population_size = len(self.population)
self._top_k = config.top_k_transfer
self.best_score = -float("inf")
self.global_best_score = -float("inf")
self.best_rule: Rule | None = None
self.global_best_rule: Rule | None = None
self.best_not_improved_epochs = 0
self._epoch = -1
self.config = config
self.child_populations: List["BooleanGP"] = []
self.feature_mapping: dict[int, int] | None = None
self._transfer_size: int = 0
self.parent_rule_indices: List[int] = []
if config.max_depth > current_depth:
self._create_child_populations()
def _create_child_populations(self) -> None:
"""Create child populations using the sampling strategy.
Calls sample() once for all children to ensure correct overlap/partitioning
behavior controlled by the replace parameter. Each SamplingResult in the
returned list is used to configure one child population.
"""
results = self.config.sampling_strategy.sample(
self.train_data,
self.train_labels,
self.config.num_child_populations,
)
for result in results:
child_config = replace(
self.config,
train_data=result.data,
train_labels=result.labels,
)
child = BooleanGP(child_config, current_depth=self.current_depth + 1)
child.feature_mapping = result.feature_mapping
self.child_populations.append(child)
def step(self) -> GenerationMetrics:
"""
Perform one training step (generation) of the genetic programming algorithm.
Each step applies crossover to create offspring, mutates the population, evaluates
all rules, updates the best rule, and selects individuals for the next generation.
If regeneration is enabled and no improvement has been observed for
``regeneration_patience`` epochs, the population is regenerated.
Returns:
GenerationMetrics: Metrics for this generation including rules, scores, etc.
Examples:
>>> import numpy as np
>>> from hgp_lib.configs import BooleanGPConfig
>>> from hgp_lib.algorithms import BooleanGP
>>> def accuracy(predictions, labels):
... return np.mean(predictions == labels)
>>> data = np.array([[True, False], [False, True], [True, True], [False, False]])
>>> labels = np.array([1, 0, 1, 0])
>>> config = BooleanGPConfig(
... score_fn=accuracy, train_data=data, train_labels=labels,
... optimize_scorer=False,
... )
>>> gp = BooleanGP(config)
>>> metrics = gp.step()
>>> isinstance(metrics.best_train_score, float)
True
>>> len(metrics.train_scores) > 0
True
"""
self._forward()
return self._backward()
def _forward(self) -> None:
"""
Perform the forward pass: collect child rules, apply crossover, then mutate.
Recursively calls ``_forward()`` on all child populations, collects their top-K
rules into a crossover pool alongside the current population, produces offspring
via crossover, and applies mutations to the expanded population.
Examples:
>>> import numpy as np
>>> from hgp_lib.configs import BooleanGPConfig
>>> from hgp_lib.algorithms import BooleanGP
>>> def accuracy(p, l): return np.mean(p == l)
>>> data = np.array([[True, False], [False, True], [True, True], [False, False]])
>>> labels = np.array([1, 0, 1, 0])
>>> config = BooleanGPConfig(
... score_fn=accuracy, train_data=data, train_labels=labels,
... optimize_scorer=False,
... )
>>> gp = BooleanGP(config)
>>> pop_before = len(gp.population)
>>> gp._forward()
>>> len(gp.population) >= pop_before
True
"""
for child in self.child_populations:
child._forward()
crossover_pool = []
feature_mappings = []
self._transfer_size = 0
for child in self.child_populations:
crossover_pool.extend(child._get_top_k_rules())
self._transfer_size += child._top_k
feature_mappings.extend([child.feature_mapping] * child._top_k)
crossover_pool.extend(self.population)
feature_mappings.extend([None] * len(self.population))
offspring, self.parent_rule_indices = self.crossover_executor.apply(
crossover_pool, feature_mappings
)
self.population += offspring
self.mutation_executor.apply(self.population)
def _backward(self, parent_scores: ndarray | None = None) -> GenerationMetrics:
"""
Perform the backward pass: evaluate, propagate feedback, and select the next generation.
Evaluates all rules against training data, optionally incorporates feedback from a
parent population, propagates signals to child populations, and creates the next
generation via selection.
Args:
parent_scores (ndarray | None):
Optional scores propagated from a parent population. Used in hierarchical GP
to reward child rules that contributed to successful offspring. Default: `None`.
Returns:
GenerationMetrics: Metrics for this generation.
Examples:
>>> import numpy as np
>>> from hgp_lib.configs import BooleanGPConfig
>>> from hgp_lib.algorithms import BooleanGP
>>> def accuracy(p, l): return np.mean(p == l)
>>> data = np.array([[True, False], [False, True], [True, True], [False, False]])
>>> labels = np.array([1, 0, 1, 0])
>>> config = BooleanGPConfig(
... score_fn=accuracy, train_data=data, train_labels=labels,
... optimize_scorer=False,
... )
>>> gp = BooleanGP(config)
>>> gp._forward()
>>> metrics = gp._backward()
>>> isinstance(metrics.best_train_score, float)
True
"""
scores = self.evaluate_population(
self.train_data, self.train_labels, self.score_fn
)
if parent_scores is not None:
self._apply_feedback(scores, parent_scores)
if self.child_populations:
child_feedbacks = self._generate_child_feedback(scores)
children_metrics = [
child._backward(feedback)
for child, feedback in zip(self.child_populations, child_feedbacks)
]
else:
children_metrics = []
return self._new_generation(scores, children_metrics)
def _get_top_k_rules(self) -> List[Rule]:
"""Retrieve the top-K rules for transfer to the parent population.
Returns the first `_top_k` rules from the population, which are expected
to be the highest-scoring rules. After each generation (except the first),
non-root populations sort their rules by score in descending order during
`_new_generation()`, ensuring that indices 0 through `_top_k - 1` contain
the best-performing rules.
Note:
During the first epoch (before any `backward()` call completes), the
population has not yet been sorted by score. In this case, the returned
rules are simply the first `_top_k` rules from the initial population,
which are effectively random. This is acceptable since all populations
start with randomly generated rules.
Returns:
List[Rule]: The top-K rules from this population, to be used in the
parent's crossover pool during hierarchical GP.
Examples:
>>> import numpy as np
>>> from hgp_lib.algorithms import BooleanGP
>>> from hgp_lib.configs import BooleanGPConfig
>>> from hgp_lib.populations import FeatureSamplingStrategy
>>> from sklearn.metrics import accuracy_score
>>> data = np.random.rand(50, 10) > 0.5
>>> labels = np.random.randint(0, 2, 50)
>>> config = BooleanGPConfig(
... score_fn=accuracy_score,
... train_data=data,
... train_labels=labels,
... max_depth=1,
... num_child_populations=2,
... sampling_strategy=FeatureSamplingStrategy(),
... top_k_transfer=5,
... )
>>> gp = BooleanGP(config)
>>> child = gp.child_populations[0]
>>> top_rules = child._get_top_k_rules()
>>> len(top_rules)
5
"""
return self.population[: self._top_k]
def _apply_feedback(self, scores: ndarray, parent_scores: ndarray) -> ndarray:
"""Apply incoming feedback from parent to the first self._top_k scores."""
parent_scores *= self.config.feedback_strength
if self.config.feedback_type == "multiplicative":
scores[: self._top_k] *= 1 + parent_scores
else:
scores[: self._top_k] += parent_scores
return scores
def _generate_child_feedback(self, scores: ndarray) -> ndarray:
"""Generate feedback signals for each child from offspring performance.
Each parent that came from a child population receives a signal based on
how well the offspring it helped create performed. The signal is normalized
relative to the population's score distribution.
Args:
scores (ndarray): Fitness scores for all rules in the current population,
including both the original population and offspring from crossover.
Returns:
ndarray: A 2D array of shape (num_children, top_k) containing feedback
signals for each child population's transferred rules.
"""
num_children = len(self.child_populations)
mean_s = float(np.mean(scores))
min_s = float(np.min(scores))
max_s = float(np.max(scores))
if max_s == min_s:
return np.zeros((num_children, self._top_k))
# Offspring are situated after the original parent population in the scores array
offspring_scores = scores[self.population_size :]
above = offspring_scores >= mean_s
signals = np.where(
above,
(offspring_scores - mean_s) / (max_s - mean_s),
(offspring_scores - mean_s) / (mean_s - min_s),
)
child_feedbacks = np.zeros((num_children, self._top_k))
child_counts = np.zeros((num_children, self._top_k))
# parent_rule_indices stores TWO indices per offspring (one for each parent).
# For offspring i, parent_rule_indices[2*i] and parent_rule_indices[2*i + 1]
# are the indices of its two parents in the crossover pool.
# Therefore, j // 2 maps from the parent_rule_indices position to the
# corresponding offspring index in offspring_scores.
for j, parent_idx in enumerate(self.parent_rule_indices):
if parent_idx < self._transfer_size:
# parent_idx < _transfer_size means this parent came from a child population
# Determine which child and which local rule index within that child
child_idx, local_idx = divmod(parent_idx, self._top_k)
offspring_idx = j // 2 # Two parent indices per offspring
child_feedbacks[child_idx, local_idx] += signals[offspring_idx]
child_counts[child_idx, local_idx] += 1
mask = child_counts > 0
child_feedbacks[mask] /= child_counts[mask]
return child_feedbacks
def _compute_regularized_scores(
self, scores: ndarray, complexities: List[int]
) -> ndarray:
"""
Compute regularized scores: ``score - complexity_penalty * ln(complexity)``.
When ``complexity_penalty`` is zero, returns ``scores`` unchanged.
Args:
scores (ndarray):
Raw fitness scores for the population.
complexities (List[int]):
Number of nodes in each rule.
Returns:
ndarray: Regularized scores for selection.
Examples:
>>> import numpy as np
>>> from hgp_lib.configs import BooleanGPConfig
>>> from hgp_lib.algorithms import BooleanGP
>>> def accuracy(p, l): return np.mean(p == l)
>>> data = np.array([[True, False], [False, True]])
>>> labels = np.array([1, 0])
>>> config = BooleanGPConfig(
... score_fn=accuracy, train_data=data, train_labels=labels,
... optimize_scorer=False,
... )
>>> gp = BooleanGP(config)
>>> gp.complexity_penalty = 0.0
>>> scores = np.array([0.9, 0.8])
>>> np.allclose(gp._compute_regularized_scores(scores, [3, 5]), scores)
True
>>> gp.complexity_penalty = 0.1
>>> reg = gp._compute_regularized_scores(np.array([1.0, 1.0]), [3, 5])
>>> bool(reg[0] > reg[1])
True
"""
if self.complexity_penalty == 0:
return scores
return scores - self.complexity_penalty * np.log(complexities)
def _new_generation(
self, scores: ndarray, children_metrics: List[GenerationMetrics]
) -> GenerationMetrics:
"""
Creates a new generation by selecting individuals and optionally regenerating the population.
Updates the best rule tracking, checks if regeneration is needed, and selects the next
generation using the configured selection strategy. If regeneration is triggered,
the population is completely regenerated and best tracking is reset.
Args:
scores (ndarray):
Fitness scores for all rules in the current population. Must have the same
length as `self.population`.
Returns:
GenerationMetrics: Metrics about the generation step.
"""
best_idx = int(np.argmax(scores))
current_best = float(scores[best_idx])
current_best_rule = self.population[best_idx].copy()
self._update_best(current_best, current_best_rule)
regenerated = False
if (
self.regeneration
and self.best_not_improved_epochs >= self.regeneration_patience
):
regenerated = True
self._epoch += 1
# Create GenerationMetrics
complexities = [len(rule) for rule in self.population]
metrics = GenerationMetrics.from_population(
best_idx=best_idx,
best_rule=current_best_rule,
train_scores=scores.tolist(),
complexities=complexities,
child_population_generation_metrics=children_metrics,
)
if regenerated:
self.population = self.population_generator.generate()
self.best_score = -float("inf")
self.best_not_improved_epochs = 0
else:
regularized_scores = self._compute_regularized_scores(scores, complexities)
self.population, selected_scores = self.selection.select(
self.population, regularized_scores, self.population_size
)
# Non-root populations need reordering so top-K rules are at the front
# for transfer to parent population during the next forward pass.
if self.current_depth > 0 and self._top_k < self.population_size:
# top_k must be positive if current_depth > 0
sorted_indices = np.argpartition(-selected_scores, self._top_k)
self.population = [self.population[i] for i in sorted_indices]
return metrics
def evaluate_population(
self,
data: ndarray,
labels: ndarray,
score_fn: Callable[[ndarray, ndarray], float],
) -> ndarray:
"""
Evaluate all rules in the population against the given data.
Args:
data (ndarray):
Data to evaluate rules on (2D boolean array).
labels (ndarray):
True labels (1D integer array).
score_fn (Callable[[ndarray, ndarray], float]):
Function to compute fitness scores.
Returns:
ndarray: Array of fitness scores, one for each rule in the population.
Examples:
>>> import numpy as np
>>> from hgp_lib.configs import BooleanGPConfig
>>> from hgp_lib.algorithms import BooleanGP
>>> def accuracy(predictions, labels):
... return np.mean(predictions == labels)
>>> data = np.array([[True, False], [False, True], [True, True], [False, False]])
>>> labels = np.array([1, 0, 1, 0])
>>> config = BooleanGPConfig(
... score_fn=accuracy, train_data=data, train_labels=labels,
... optimize_scorer=False,
... )
>>> gp = BooleanGP(config)
>>> scores = gp.evaluate_population(data, labels, accuracy)
>>> len(scores) == len(gp.population)
True
>>> all(0.0 <= s <= 1.0 for s in scores)
True
"""
return np.array(
[score_fn(rule.evaluate(data), labels) for rule in self.population]
)
def _update_best(self, current_best: float, current_best_rule: Rule):
"""
Update the best rule tracking based on the current generation's best.
If ``current_best`` is greater than or equal to the stored best score, resets the
stagnation counter and stores the new best. Otherwise, increments the counter.
Args:
current_best (float):
The best fitness score from the current generation.
current_best_rule (Rule):
The rule that achieved the best score.
Examples:
>>> import numpy as np
>>> from hgp_lib.configs import BooleanGPConfig
>>> from hgp_lib.algorithms import BooleanGP
>>> from hgp_lib.rules import Literal
>>> def accuracy(p, l): return np.mean(p == l)
>>> data = np.array([[True, False], [False, True]])
>>> labels = np.array([1, 0])
>>> config = BooleanGPConfig(
... score_fn=accuracy, train_data=data, train_labels=labels,
... optimize_scorer=False,
... )
>>> gp = BooleanGP(config)
>>> gp._update_best(0.8, Literal(value=0))
>>> gp.best_score
0.8
>>> gp._update_best(0.5, Literal(value=1))
>>> gp.best_score
0.8
>>> gp.best_not_improved_epochs
1
"""
if current_best >= self.best_score:
self.best_not_improved_epochs = 0
self.best_score = current_best
self.best_rule = current_best_rule
if current_best >= self.global_best_score:
self.global_best_score = current_best
self.global_best_rule = current_best_rule
else:
self.best_not_improved_epochs += 1
def evaluate_best(
self,
data: ndarray,
labels: ndarray,
score_fn: Callable[[ndarray, ndarray], float] | None = None,
) -> float:
"""
Evaluate the global best rule on validation or test data.
Args:
data (ndarray):
Validation/test data (2D boolean array).
labels (ndarray):
Validation/test labels (1D integer array).
score_fn (Callable[[ndarray, ndarray], float] | None):
Optional scoring function. Uses the original (non-optimized) scorer when
``None``, since the optimized scorer has ``sample_weight`` bound to training
data. Default: `None`.
Returns:
float: Score of the global best rule on the provided data.
Raises:
RuntimeError: If no best rule is available (run at least one step first).
Examples:
>>> import numpy as np
>>> from hgp_lib.configs import BooleanGPConfig
>>> from hgp_lib.algorithms import BooleanGP
>>> def accuracy(predictions, labels):
... return np.mean(predictions == labels)
>>> data = np.array([[True, False], [False, True], [True, True], [False, False]])
>>> labels = np.array([1, 0, 1, 0])
>>> config = BooleanGPConfig(
... score_fn=accuracy, train_data=data, train_labels=labels,
... optimize_scorer=False,
... )
>>> gp = BooleanGP(config)
>>> _ = gp.step()
>>> score = gp.evaluate_best(data, labels)
>>> isinstance(score, float)
True
"""
if self.global_best_rule is None:
raise RuntimeError("No best rule available. Run at least one step first.")
fn = self._original_score_fn if score_fn is None else score_fn
return float(fn(self.global_best_rule.evaluate(data), labels))
@property
def original_score_fn(self):
return self._original_score_fn
|