Source code for gadgit.gadgit

import random
import sys

import numpy as np
import pandas as pd

from deap import algorithms
from deap import base
from deap import creator
from deap import tools

from deap.algorithms import varAnd
from deap.algorithms import varOr


[docs]def single_eval(gene_info, individual): """ Single objective summation of the centrality of a particular frame's chosen column. Due to gene_info.obj_list obviously accepting a list for the purposes of extending to MOP, in the case of this single_eval, the head of the list is treated as the 'single' objective. Note: does not correctly calculate the frontier. """ assert len(individual) == gene_info.com_size, \ 'Indiv does not match community size in eval' assert set(gene_info.fixed_list_ids).issubset(individual), \ 'Indiv does not possess all fixed genes' fit_col = gene_info.obj_list[0] fit_sum = 0.0 for item in individual: fit_sum += gene_info.data_frame.loc[item, fit_col] # gene_info.frontier[item] += 1 return fit_sum,
[docs]def cx_SDB(gene_info, ind1, ind2): """SDB Crossover Computes the intersection and asserts that after the intersection, the amount of genes left over to 'deal' between two new individuals is even. Clears the set structures of their old information, updates with the intersection, and lastly hands out half of the shuffled dealer to each indiv. ind1 and ind2 and kept as objects since they inherit from set, but have additional properties. Note that this process is not destructive to the fixed genes inside of the individuals. """ # Build dealer intersect = ind1.intersection(ind2) dealer = list(ind1.union(ind2) - intersect) random.shuffle(dealer) assert len(dealer) % 2 == 0, 'Dealer assumption on indiv crossover failure' # Rebuild individuals and play out dealer ind1.clear() ind2.clear() ind1.update(dealer[:len(dealer)//2]) ind1.update(intersect) ind2.update(dealer[len(dealer)//2:]) ind2.update(intersect) assert (len(ind1) == gene_info.com_size and len(ind2) == gene_info.com_size), 'SDB created invalid individual' assert set(gene_info.fixed_list_ids).issubset(ind1), \ 'Ind1 does not possess all fixed genes after crossover' assert set(gene_info.fixed_list_ids).issubset(ind2), \ 'Ind2 does not possess all fixed genes after crossover' return ind1, ind2
[docs]def valid_add(gene_info, individual): """Based on gene info and current individual, return a valid index to add to an individual. """ return random.choice(list(set(range(0, gene_info.gene_count)) - individual))
[docs]def valid_remove(gene_info, individual): """Based on gene info, removed an index from an individual that respects fixed genes """ return random.choice(sorted(tuple(individual - set(gene_info.fixed_list_ids))))
[docs]def self_correction(gene_info, individual): """This function takes a potentially broken individual and returns a correct one. Procedure: Add all fixed genes while size isn't right; add or remove """ individual.update(gene_info.fixed_list_ids) while True: indiv_size = len(individual) if indiv_size < gene_info.com_size: individual.add(valid_add(gene_info, individual)) elif indiv_size > gene_info.com_size: individual.remove(valid_remove(gene_info, individual)) else: # Must be equal break assert len(individual) == gene_info.com_size, \ 'Self correction failed to create indiv with proper size' assert set(gene_info.fixed_list_ids).issubset(individual), \ 'Individual not possess all fixed genes after self correction' return individual
[docs]def cx_OPS(gene_info, ind1, ind2): """Standard one-point crossover implemented for set individuals. Self correction is handled by abstracted function. Note that this function has no ability to make assertions on the individuals it generates. """ pivot = random.randint(0, gene_info.gene_count) ind1_new = [i for i in ind1 if i < pivot] # Read from same ind1_new.extend([i for i in ind2 if i > pivot]) # Read from other ind2_new = [i for i in ind2 if i < pivot] ind2_new.extend([i for i in ind1 if i > pivot]) ind1.clear() # Forcibly use proper individual class ind1.update(ind1_new) ind2.clear() ind2.update(ind2_new) return self_correction(gene_info, ind1), self_correction(gene_info, ind2)
[docs]def mut_flipper(gene_info, individual): """Flip based mutation. Flip one off to on, and one on to off. Must not allow the choice of a fixed gene to be turned off. """ assert len(individual) == gene_info.com_size, \ 'Mutation received invalid indiv' individual.remove(valid_remove(gene_info, individual)) individual.add(valid_add(gene_info, individual)) assert len(individual) == gene_info.com_size, \ 'Mutation created an invalid indiv' assert set(gene_info.fixed_list_ids).issubset(individual), \ ('Individual does not possess all fixed genes after mutation') return individual,
[docs]def indiv_builder(gene_info): """Implementation of forcing fixed genes in creation of new individual.""" num_choices = gene_info.com_size - len(gene_info.fixed_list) valid_choices = list(set(range(gene_info.gene_count)) - set(gene_info.fixed_list_ids)) base_indiv = random.sample(valid_choices, num_choices) base_indiv.extend(gene_info.fixed_list_ids) return base_indiv
[docs]def ga_single(gene_info, ga_info): """Main loop which sets DEAP objects and calls a single objective EA algorithm. Parameters ------- gene_info, GeneInfo class See respective class documentation. ga_info, GAInfo class See respective class documentation. Returns ------- pop, DEAP object stats, DEAP object hof, DEAP object See post_run function for examples of how to interpret results. """ random.seed(ga_info.seed) creator.create("Fitness", base.Fitness, weights=(-1.0,)) creator.create("Individual", set, fitness=creator.Fitness) toolbox = base.Toolbox() toolbox.register("indices", indiv_builder, gene_info) toolbox.register("individual", tools.initIterate, creator.Individual, toolbox.indices) toolbox.register("population", tools.initRepeat, list, toolbox.individual) toolbox.register("evaluate", single_eval, gene_info) if len(gene_info.obj_list) > 1: raise AttributeError('Attempted to start single objective GA with' 'multiple objectives.') if ga_info.cross_meth == 'ops': toolbox.register("mate", cx_OPS, gene_info) elif ga_info.cross_meth == 'sdb': toolbox.register("mate", cx_SDB, gene_info) else: raise AttributeError('Invalid crossover string specified') toolbox.register("mutate", mut_flipper, gene_info) toolbox.register("select", tools.selTournament, tournsize=ga_info.nk) pop = toolbox.population(n=ga_info.pop) hof = tools.HallOfFame(1) stats = tools.Statistics(lambda ind: ind.fitness.values) stats.register("avg", np.mean, axis=0) stats.register("max", np.max, axis=0) algorithms.eaSimple(pop, toolbox, ga_info.cxpb, ga_info.mutpb, ga_info.gen, stats, halloffame=hof) return pop, stats, hof
[docs]def ga_multi(gene_info, ga_info): """Main loop which sets DEAP objects and calls a multi objective EA algorithm. Parameters ------- gene_info, GeneInfo class See respective class documentation. ga_info, GAInfo class See respective class documentation. Returns ------- pop, DEAP object stats, DEAP object hof, DEAP object See post_run function for examples of how to interpret results. """ random.seed(ga_info.seed) creator.create("Fitness", base.Fitness, weights=(1.0, )) creator.create("Individual", set, fitness=creator.Fitness) toolbox = base.Toolbox() toolbox.register("indices", indiv_builder, gene_info) toolbox.register("individual", tools.initIterate, creator.Individual, toolbox.indices) toolbox.register("population", tools.initRepeat, list, toolbox.individual) toolbox.register("evaluate", single_eval, gene_info) if len(gene_info.obj_list) < 2: print('Attempted to start multi objective GA with single objective.', file=sys.stderr) if ga_info.cross_meth == 'ops': toolbox.register("mate", cx_OPS, gene_info) elif ga_info.cross_meth == 'sdb': toolbox.register("mate", cx_SDB, gene_info) else: raise AttributeError('Invalid crossover string specified') toolbox.register("mutate", mut_flipper, gene_info) toolbox.register("select", tools.selTournament, tournsize=ga_info.nk) pop = toolbox.population(n=ga_info.pop) hof = tools.HallOfFame(1) # Empty, as SoR objects are special stats = tools.Statistics() eaSoR(ga_info, gene_info, pop, toolbox, ga_info.cxpb, ga_info.mutpb, ga_info.gen, stats, halloffame=hof) return pop, stats, hof
[docs]def multi_eval(gene_info, population): """Helper function to implement the SoR table operations.""" # Build raw objective information all_rows = [] for indiv in population: indiv_slice = gene_info.data_frame.loc[list(indiv)] indiv_sums = [indiv_slice[obj].sum() for obj in gene_info.obj_list] all_rows.append(indiv_sums) raw_frame = pd.DataFrame(all_rows, columns=gene_info.obj_list) # Ranking procedure sor = pd.DataFrame() obj_log_info = {} for obj in raw_frame.columns: obj_log_info[f'new_gen_max_{obj}'] = raw_frame[obj].max() obj_log_info[f'new_gen_mean_{obj}'] = raw_frame[obj].mean() rank_series = np.argsort(raw_frame[obj]) swap_index = pd.Series(dict((v, k) for k, v in rank_series.iteritems())) append_ranks = swap_index.sort_index() sor[obj+'_rank_norm'] = append_ranks / append_ranks.max() sor['sum'] = sor[list(sor.columns)].sum(axis=1) return sor['sum'].rank(method='first'), obj_log_info
[docs]def eaSoR(ga_info, gene_info, population, toolbox, cxpb, mutpb, ngen, stats=None, halloffame=None, verbose=__debug__): """ This function runs an EA using the SoR fitness methodology. It is essentially a fork of the eaSimple function from deap. It is not meant to be exposed to users and insstead is only used internally by the package. """ logbook = tools.Logbook() logbook.header = ['gen', 'nevals'] if stats: for obj in gene_info.obj_list: logbook.header.append(f'new_gen_max_{obj}') logbook.header.append(f'new_gen_mean_{obj}') # Offload SoR to table fit_series, obj_log_info = multi_eval(gene_info, population) # Update ALL fitness vals for index, fit_val in fit_series.items(): population[index].fitness.values = fit_val, if halloffame is not None: halloffame.update(population) logbook.record(gen=0, nevals='maximal-temp', **obj_log_info) if verbose: print(logbook.stream) # Begin the generational process for gen in range(1, ngen + 1): # Select the next generation individuals to breed breed_pop = toolbox.select(population, len(population)) # Vary the pool of individuals # offspring = varAnd(breed_pop, toolbox, cxpb, mutpb) offspring = varOr(breed_pop, toolbox, len(population),cxpb, mutpb) # Offload SoR to table fit_series, obj_log_info = multi_eval(gene_info, offspring) # Update ALL fitness vals for index, fit_val in fit_series.items(): offspring[index].fitness.values = fit_val, # Update the hall of fame with the generated individuals if halloffame is not None: halloffame.update(offspring) # Strict elitism population = tools.selBest(offspring + [halloffame[0]], len(population)) # Update frontier based on elite index for index in tools.selBest(population, 1)[0]: gene_info.frontier[index] += 1 # # Manually marking old individuals # for indiv in population: # del indiv.fitness.values # Append the current generation statistics to the logbook logbook.record(gen=gen, nevals='maximal-temp', **obj_log_info) if verbose: print(logbook.stream) return population, logbook