"""
Main Differential Evolution algorithm implementation.
This module provides the core DifferentialEvolution class with
fully vectorized operations for high performance.
"""
import numpy as np
import time
import logging
from typing import Callable, Optional, Dict, Any, Union, Tuple
try:
from tqdm import tqdm
TQDM_AVAILABLE = True
except ImportError:
TQDM_AVAILABLE = False
tqdm = None
from pyrade.core.population import Population
from pyrade.operators.mutation import DErand1
from pyrade.operators.crossover import BinomialCrossover
from pyrade.operators.selection import GreedySelection
# Configure module logger
logger = logging.getLogger(__name__)
[docs]
class DifferentialEvolution:
"""
Main Differential Evolution optimizer with vectorized operations.
This implementation uses aggressive vectorization to process entire
populations at once, achieving significant performance improvements
over monolithic implementations.
Features:
- Fully vectorized (processes entire population at once)
- Strategy pattern for operators (easy to extend)
- Professional API (fit/predict style)
- Progress tracking and callbacks
- Progress bar support (tqdm integration)
- Comprehensive logging support
Parameters
----------
objective_func : callable
Function to minimize: f(x) -> float
bounds : tuple or array
(lower_bound, upper_bound) or [(lb1, ub1), (lb2, ub2), ...]
mutation : MutationStrategy, optional
Mutation strategy (default: DE/rand/1 with F=0.8)
crossover : CrossoverStrategy, optional
Crossover strategy (default: Binomial with CR=0.9)
selection : SelectionStrategy, optional
Selection strategy (default: Greedy)
pop_size : int, default=50
Population size (must be >= 4)
max_iter : int, default=1000
Maximum iterations (must be >= 1)
seed : int, optional
Random seed for reproducibility
verbose : bool, default=False
Print progress information
show_progress : bool, default=False
Show progress bar (requires tqdm)
callback : callable, optional
Called after each iteration: callback(iteration, best_fitness, best_solution)
Attributes
----------
best_solution_ : ndarray
Best solution found
best_fitness_ : float
Best fitness value
history_ : dict
Optimization history (fitness, time, etc.)
Methods
-------
optimize() : dict
Run optimization and return results
Examples
--------
>>> def sphere(x):
... return sum(x**2)
>>>
>>> optimizer = DifferentialEvolution(
... objective_func=sphere,
... bounds=(-100, 100),
... pop_size=50,
... max_iter=1000
... )
>>> result = optimizer.optimize()
>>> print(f"Best fitness: {result['best_fitness']}")
"""
[docs]
def __init__(
self,
objective_func: Callable[[np.ndarray], float],
bounds: Union[Tuple[float, float], np.ndarray],
mutation: Optional[Any] = None,
crossover: Optional[Any] = None,
selection: Optional[Any] = None,
pop_size: int = 50,
max_iter: int = 1000,
seed: Optional[int] = None,
verbose: bool = False,
show_progress: bool = False,
callback: Optional[Callable] = None
):
"""Initialize Differential Evolution optimizer."""
logger.info("Initializing DifferentialEvolution optimizer")
# Validate inputs with detailed error messages
if not callable(objective_func):
error_msg = "objective_func must be callable. Received: {}".format(type(objective_func).__name__)
logger.error(error_msg)
raise TypeError(error_msg)
if not isinstance(pop_size, int) or pop_size < 4:
error_msg = "pop_size must be an integer >= 4 (got: {}). DE requires at least 4 individuals for mutation.".format(pop_size)
logger.error(error_msg)
raise ValueError(error_msg)
if not isinstance(max_iter, int) or max_iter < 1:
error_msg = "max_iter must be an integer >= 1 (got: {})".format(max_iter)
logger.error(error_msg)
raise ValueError(error_msg)
self.objective_func = objective_func
self.bounds = bounds
self.pop_size = pop_size
self.max_iter = max_iter
self.seed = seed
self.verbose = verbose
self.show_progress = show_progress and TQDM_AVAILABLE
self.callback = callback
# Log warning if progress bar requested but tqdm not available
if show_progress and not TQDM_AVAILABLE:
warning_msg = "Progress bar requested but tqdm is not installed. Install with: pip install tqdm"
logger.warning(warning_msg)
if verbose:
print(f"Warning: {warning_msg}")
# Initialize operators with defaults if not provided
self.mutation = mutation if mutation is not None else DErand1(F=0.8)
self.crossover = crossover if crossover is not None else BinomialCrossover(CR=0.9)
self.selection = selection if selection is not None else GreedySelection()
logger.debug(f"Operators initialized: Mutation={self.mutation.__class__.__name__}, "
f"Crossover={self.crossover.__class__.__name__}, "
f"Selection={self.selection.__class__.__name__}")
# Infer dimensionality from bounds with enhanced validation
try:
bounds_array = np.array(bounds)
except Exception as e:
error_msg = f"Failed to convert bounds to numpy array: {e}"
logger.error(error_msg)
raise ValueError(error_msg)
if bounds_array.ndim == 1:
error_msg = (
"Cannot infer dimensionality from scalar bounds. "
"Please provide bounds as [(lb1, ub1), (lb2, ub2), ...] or "
"as a 2D array with shape (n_dimensions, 2). "
f"Received bounds shape: {bounds_array.shape}"
)
logger.error(error_msg)
raise ValueError(error_msg)
self.dim = bounds_array.shape[0]
logger.info(f"Problem dimensionality: {self.dim}D")
# Initialize population
try:
self.population = Population(pop_size, self.dim, bounds, seed)
logger.info(f"Population initialized: size={pop_size}, dimensions={self.dim}")
except Exception as e:
error_msg = f"Failed to initialize population: {e}"
logger.error(error_msg)
raise RuntimeError(error_msg) from e
# Results storage
self.best_solution_ = None
self.best_fitness_ = np.inf
self.history_ = {
'fitness': [],
'time': [],
'iteration': []
}
logger.debug("Initialization complete")
def _initialize_population(self):
"""Initialize random population and evaluate fitness."""
if self.verbose:
print("Initializing population...")
self.population.initialize_random()
self.population.evaluate(self.objective_func)
# Store initial best
self.best_solution_ = self.population.best_vector.copy()
self.best_fitness_ = self.population.best_fitness
if self.verbose:
print(f"Initial best fitness: {self.best_fitness_:.6e}")
def _evolve_generation(self):
"""
Evolve one generation using vectorized operations.
Steps:
1. Vectorized mutation (all individuals at once)
2. Boundary repair (vectorized clip)
3. Vectorized crossover (all at once)
4. Boundary repair (vectorized clip)
5. Evaluate all trials
6. Vectorized selection (all at once)
7. Update best solution
Returns
-------
improved_count : int
Number of individuals that improved
"""
pop_vectors = self.population.vectors
pop_fitness = self.population.fitness
best_idx = self.population.best_idx
target_indices = self.population.get_indices()
# Step 1: Vectorized mutation
mutants = self.mutation.apply(
pop_vectors, pop_fitness, best_idx, target_indices
)
# Step 2: Boundary repair (mutation)
mutants = self.population.clip_to_bounds(mutants)
# Fix: Ensure mutants are valid and not NaN/Inf (high-dimensional stability)
mutants = np.where(np.isfinite(mutants), mutants,
self.population.lb + np.random.rand(*mutants.shape) *
(self.population.ub - self.population.lb))
# Step 3: Vectorized crossover
trials = self.crossover.apply(pop_vectors, mutants)
# Step 4: Boundary repair (crossover)
trials = self.population.clip_to_bounds(trials)
# Fix: Ensure trials are valid (high-dimensional stability)
trials = np.where(np.isfinite(trials), trials, pop_vectors)
# Step 5: Evaluate all trials
trial_fitness = self.population.evaluate_vectors(trials, self.objective_func)
# Step 6: Vectorized selection
new_vectors, new_fitness = self.selection.apply(
pop_vectors, pop_fitness, trials, trial_fitness
)
# Count improvements
improved_count = np.sum(new_fitness < pop_fitness)
# Step 7: Update population (memory efficient - no unnecessary copies)
self.population.vectors[:] = new_vectors
self.population.fitness[:] = new_fitness
self.population._update_best()
# Update global best
if self.population.best_fitness < self.best_fitness_:
self.best_solution_ = self.population.best_vector.copy()
self.best_fitness_ = self.population.best_fitness
return improved_count
[docs]
def optimize(self) -> Dict[str, Any]:
"""
Run the optimization.
Returns
-------
dict with keys:
'best_solution': Best solution found
'best_fitness': Best fitness value
'n_iterations': Number of iterations run
'history': Optimization history
'success': Whether optimization succeeded
'time': Total optimization time
"""
if self.verbose:
print("="*70)
print("Starting Differential Evolution Optimization")
print("="*70)
print(f"Population size: {self.pop_size}")
print(f"Dimensions: {self.dim}")
print(f"Max iterations: {self.max_iter}")
print(f"Mutation: {self.mutation.__class__.__name__}")
print(f"Crossover: {self.crossover.__class__.__name__}")
print(f"Selection: {self.selection.__class__.__name__}")
print("="*70)
start_time = time.time()
# Initialize population
self._initialize_population()
# Store initial history
self.history_['fitness'].append(float(self.best_fitness_))
self.history_['time'].append(time.time() - start_time)
self.history_['iteration'].append(0)
# Main optimization loop with optional progress bar
iterator = range(1, self.max_iter + 1)
if self.show_progress:
iterator = tqdm(iterator, desc="Optimizing", unit="iter",
bar_format='{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}]')
for iteration in iterator:
iter_start = time.time()
# Evolve one generation
improved_count = self._evolve_generation()
# Store history (convert to float to prevent memory accumulation)
iter_time = time.time() - iter_start
self.history_['fitness'].append(float(self.best_fitness_))
self.history_['time'].append(float(time.time() - start_time))
self.history_['iteration'].append(int(iteration))
# Update progress bar postfix if available
if self.show_progress:
iterator.set_postfix({
'best': f'{self.best_fitness_:.6e}',
'improved': f'{improved_count}/{self.pop_size}'
})
# Print progress (only if not using progress bar)
if self.verbose and not self.show_progress and (iteration % 10 == 0 or iteration == 1):
print(
f"Iter {iteration:4d} | "
f"Best: {self.best_fitness_:.6e} | "
f"Improved: {improved_count:2d}/{self.pop_size} | "
f"Time: {iter_time:.3f}s"
)
# Log detailed progress
if iteration % 100 == 0 or iteration == 1:
logger.debug(f"Iteration {iteration}/{self.max_iter}: "
f"best_fitness={self.best_fitness_:.6e}, "
f"improved={improved_count}/{self.pop_size}")
# Call callback
if self.callback is not None:
try:
self.callback(iteration, self.best_fitness_, self.best_solution_)
except Exception as e:
logger.warning(f"Callback error at iteration {iteration}: {e}")
total_time = time.time() - start_time
logger.info(f"Optimization complete: best_fitness={self.best_fitness_:.6e}, "
f"total_time={total_time:.3f}s, iterations={self.max_iter}")
if self.verbose:
print("="*70)
print("Optimization Complete")
print(f"Final best fitness: {self.best_fitness_:.6e}")
print(f"Total time: {total_time:.3f}s")
print(f"Average time per iteration: {total_time/self.max_iter:.3f}s")
print("="*70)
# Return with explicit copies to prevent memory leaks
return {
'best_solution': self.best_solution_.copy(),
'best_fitness': float(self.best_fitness_),
'n_iterations': int(self.max_iter),
'history': {
'fitness': list(self.history_['fitness']),
'time': list(self.history_['time']),
'iteration': list(self.history_['iteration'])
},
'success': True,
'time': float(total_time)
}