Source code for choicemodels.tools.simulation

"""
Utilities for Monte Carlo simulation of choices.

"""
import numpy as np
import pandas as pd
from multiprocessing import Process, Manager, Array, cpu_count
from tqdm import tqdm
import warnings


[docs]def monte_carlo_choices(probabilities): """ Monte Carlo simulation of choices for a set of K scenarios, each having different probability distributions (and potentially different alternatives). Choices are independent and unconstrained, meaning that the same alternative can be chosen in multiple scenarios. This function is equivalent to applying np.random.choice() to each of the K scenarios, but it's implemented as a single-pass matrix calculation. When the number of scenarios is large, this is about 50x faster than using df.apply() or a loop. If all the choice scenarios have the same probability distribution among alternatives, you don't need this function. You can use np.random.choice() with size=K, which will be more efficient. (For example, that would work for a choice model whose expression includes only attributes of the alternatives.) NOTE ABOUT THE INPUT FORMATS: It's important for the probabilities to be structured correctly. This is computationally expensive to verify, so you will not get a warning if it's wrong! (TO DO: we should provide an option to perform these checks, though) 1. Probabilities (pd.Series) must include a two-level MultiIndex, the first level representing the scenario (observation) id and the second the alternative id. 2. Probabilities must be sorted so that each scenario's alternatives are consecutive. 3. Each scenario must have the same number of alternatives. You can pad a scenario with zero-probability alternatives if needed. 4. Each scenario's alternative probabilities must sum to 1. Parameters ---------- probabilities: pd.Series List of probabilities for each observation (choice scenario) and alternative. Please verify that the formatting matches the four requirements described above. Returns ------- pd.Series List of chosen alternative id's, indexed with the observation id. """ # TO DO - if input is a single-column dataframe, silently convert it to series obs_name, alts_name = probabilities.index.names obs = probabilities.index.get_level_values(0) alts = probabilities.index.get_level_values(1) num_obs = obs.unique().size num_alts = probabilities.size // num_obs # This Monte Carlo approach is adapted from urbansim.urbanchoice.mnl_simulate() probs = np.array(probabilities) cumprobs = probs.reshape((num_obs, num_alts)).cumsum(axis=1) # Simulate choice by subtracting a random float scaledprobs = np.subtract(cumprobs, np.random.rand(num_obs, 1)) # Replace negative values with 0 and positive values with 1, then use argmax to # return the position of the first postive value choice_ix = np.argmax((scaledprobs + 1.0).astype('i4'), axis=1) choice_ix_1d = choice_ix + (np.arange(num_obs) * num_alts) choices = pd.DataFrame({obs_name: obs.values.take(choice_ix_1d), alts_name: alts.values.take(choice_ix_1d)}) return choices.set_index(obs_name)[alts_name]
[docs]def iterative_lottery_choices(choosers, alternatives, mct_callable, probs_callable, alt_capacity=None, chooser_size=None, max_iter=None, chooser_batch_size=None): """ Monte Carlo simulation of choices for a set of choice scenarios where (a) the alternatives have limited capacity and (b) the choosers have varying probability distributions over the alternatives. Effectively, we simulate the choices sequentially, each time removing the chosen alternative or reducing its available capacity. (It's actually done in batches for better performance, but the outcome is equivalent.) This requires sampling alternatives and calculating choice probabilities multiple times, which is why callables for those actions are required inputs. Chooser priority is randomized. Capacities can be specified as counts (number of choosers that can be accommodated) or as amounts (e.g. square footage) with corresponding chooser sizes. If total capacity is insufficient to accommodate all the choosers, as many choices will be simulated as possible. Note that if all the choosers are the same size and have the same probability distribution over alternatives, you don't need this function. You can use np.random.choice() with size=K to draw chosen alternatives, which will be more efficient. (This function also works, though.) Parameters ---------- choosers : pd.DataFrame Table with one row for each chooser or choice scenario, with unique ID's in the index field. Additional columns can contain fixed attributes of the choosers. (Reserved column names: '_size'.) alternatives : pd.DataFrame Table with one row for each alternative, with unique ID's in the index field. Additional columns can contain fixed attributes of the alternatives. (Reserved column names: '_capacity'.) mct_callable : callable Callable that samples alternatives to generate a table of choice scenarios. It should accept subsets of the choosers and alternatives tables and return a choicemodels.tools.MergedChoiceTable. probs_callable : callable Callable that generates predicted probabilities for a table of choice scenarios. It should accept a choicemodels.tools.MergedChoiceTable and return a pd.Series with indexes matching the input. alt_capacity : str, optional Name of a column in the alternatives table that expresses the capacity of alternatives. If not provided, each alternative is interpreted as accommodating a single chooser. chooser_size : str, optional Name of a column in the choosers table that expresses the size of choosers. Choosers might have varying sizes if the alternative capacities are amounts rather than counts -- e.g. square footage or employment capacity. Chooser sizes must be in the same units as alternative capacities. If not provided, each chooser has a size of 1. max_iter : int or None, optional Maximum number of iterations. If None (default), the algorithm will iterate until all choosers are matched or no alternatives remain. chooser_batch_size : int or None, optional Size of the batches for processing smaller groups of choosers one at a time. Useful when the anticipated size of the merged choice tables (choosers X alternatives X covariates) will be too large for python/pandas to handle. Returns ------- pd.Series List of chosen alternative id's, indexed with the chooser (observation) id. """ # TO DO - how does MCT handle sample size greater than the number of available alts? # Copy the input dataframes to avoid editing the originals choosers = choosers.copy() alts = alternatives.copy() if alt_capacity is None: alt_capacity = '_capacity' alts.loc[:,alt_capacity] = 1 if chooser_size is None: chooser_size = '_size' choosers.loc[:,chooser_size] = 1 capacity, size = (alt_capacity, chooser_size) len_choosers = len(choosers) valid_choices = pd.Series() iter = 0 while (len(valid_choices) < len_choosers): iter += 1 if max_iter is not None: if (iter > max_iter): break if alts[capacity].max() < choosers[size].min(): print("{} choosers cannot be allocated.".format(len(choosers))) print("\nRemaining capacity on alternatives but not enough to accodomodate choosers' sizes") break if chooser_batch_size is None or chooser_batch_size > len(choosers): mct = mct_callable(choosers.sample(frac=1), alts) else: mct = mct_callable(choosers.sample(chooser_batch_size), alts) if len(mct.to_frame()) == 0: print("No valid alternatives for the remaining choosers") break probs = probs_callable(mct) choices = pd.DataFrame(monte_carlo_choices(probs)) # join capacities and sizes oid, aid = (mct.observation_id_col, mct.alternative_id_col) c = choices.join(alts[capacity], on=aid).join(choosers[size], on=oid) c.loc[:,'_cumsize'] = c.groupby(aid)[size].cumsum() # save valid choices c_valid = (c._cumsize <= c[capacity]) valid_choices = pd.concat([valid_choices, c[aid].loc[c_valid]]) print("Iteration {}: {} of {} valid choices".format(iter, len(valid_choices), len_choosers)) # update choosers and alternatives choosers = choosers.drop(c.loc[c_valid].index.values) # print("{} remaining choosers".format(len(choosers))) placed_capacity = c.loc[c_valid].groupby(aid)._cumsize.max() alts.loc[:,capacity] = alts[capacity].subtract(placed_capacity, fill_value=0) full = alts.loc[alts[capacity] == 0] alts = alts.drop(full.index) # print("{} remaining alternatives".format(len(alts))) # retain original index names valid_choices.index.name = choosers.index.name valid_choices.name = alts.index.name return valid_choices
def _parallel_lottery_choices_worker( choosers, alternatives, choices_dict, chosen_alts, mct_callable, probs_callable, alt_capacity=None, chooser_size=None, proc_num=0, batch_size=0): # pragma: no cover """ Worker process called only by the parallel_lottery_choices() method. Parameters ---------- choosers : pd.DataFrame Table with one row for each chooser or choice scenario, with unique ID's in the index field. Additional columns can contain fixed attributes of the choosers. (Reserved column names: '_size'.) alternatives : pd.DataFrame Table with one row for each alternative, with unique ID's in the index field. Additional columns can contain fixed attributes of the alternatives. (Reserved column names: '_capacity'.) choices_dict : multiprocessing.managers.SyncManager.dict A dictionary array allocated from shared memory chosen_alts : multiprocessing.Array A ctypes array allocated from shared memory mct_callable : callable Callable that samples alternatives to generate a table of choice scenarios. It should accept subsets of the choosers and alternatives tables and return a choicemodels.tools.MergedChoiceTable. probs_callable : callable Callable that generates predicted probabilities for a table of choice scenarios. It should accept a choicemodels.tools.MergedChoiceTable and return a pd.Series with indexes matching the input. alt_capacity : str, optional Name of a column in the alternatives table that expresses the capacity of alternatives. If not provided, each alternative is interpreted as accommodating a single chooser. chooser_size : str, optional Name of a column in the choosers table that expresses the size of choosers. Choosers might have varying sizes if the alternative capacities are amounts rather than counts -- e.g. square footage or employment capacity. Chooser sizes must be in the same units as alternative capacities. If not provided, each chooser has a size of 1. proc_num : int Integer representing the sequential order in which the worker was spawned batch_size : int Integer representing the chooser batch size """ st_choice_idx = proc_num * batch_size if alt_capacity is None: alt_capacity = '_capacity' if chooser_size is None: chooser_size = '_size' choosers.loc[:, chooser_size] = 1 capacity, size = (alt_capacity, chooser_size) len_choosers = len(choosers) alts_name = alternatives.index.name valid_choices = pd.Series() max_mct_size = 0 iter = 0 while (len(valid_choices) < len_choosers): chosen_alts_list = list(chosen_alts.get_obj()) alternatives = alternatives[~alternatives.index.isin(chosen_alts_list)] iter += 1 if alternatives['_capacity'].max() < choosers[size].min(): print("{} choosers cannot be allocated.".format(len(choosers))) print("\nRemaining capacity on alternatives but " "not enough to accodomodate choosers' sizes") break mct = mct_callable(choosers.sample(frac=1), alternatives) mct_size = len(mct.to_frame()) max_mct_size = max(max_mct_size, mct_size) if mct_size == 0: # print("No valid alternatives for the remaining choosers") break probs = probs_callable(mct) choices = pd.DataFrame(monte_carlo_choices(probs)) # join capacities and sizes oid, aid = (mct.observation_id_col, mct.alternative_id_col) c = choices.join( alternatives[capacity], on=aid).join( choosers[size], on=oid) # when size==1, _cumsize counts the cumulative number of times # each alternative appears. thus, below, c_valid creates a # mask that retains just the choice of the chooser who chose # their alternative first, provided that choice hasn't been # made elsewhere as documented by the shared chosen_alts list c.loc[:, '_cumsize'] = c.groupby(aid)[size].cumsum() # the shared array of chosen alts must stay locked by the # current worker between the time the worker converts it # to a list to make sure the workers choices haven't been # chosen already and the time the worker updates the array with chosen_alts.get_lock(): chosen_alts_list = list(chosen_alts.get_obj()) c_valid = (c._cumsize <= c[capacity]) & ( ~c[alts_name].isin(chosen_alts_list)) iter_valid_choices = c[aid].loc[c_valid] if len(iter_valid_choices) == 0: continue num_valid_choices = len(iter_valid_choices.values) chosen_alts[st_choice_idx:st_choice_idx + num_valid_choices] = \ iter_valid_choices.values st_choice_idx += num_valid_choices alternatives.drop(iter_valid_choices.values, inplace=True) iter_valid_choices.index.name = choosers.index.name iter_valid_choices.name = alternatives.index.name choices_dict.update(iter_valid_choices.to_dict()) valid_choices = pd.concat([valid_choices, iter_valid_choices]) choosers = choosers.drop(c.loc[c_valid].index.values) return
[docs]def parallel_lottery_choices( choosers, alternatives, mct_callable, probs_callable, alt_capacity=None, chooser_size=None, chooser_batch_size=None): """ A parallelized version of the iterative_lottery_choices method. Chooser batches are processed in parallel rather than sequentially. NOTE: In it's current form, this method is only supported for simulating choices where every alternative has a capacity of 1. Parameters ---------- choosers : pd.DataFrame Table with one row for each chooser or choice scenario, with unique ID's in the index field. Additional columns can contain fixed attributes of the choosers. (Reserved column names: '_size'.) alternatives : pd.DataFrame Table with one row for each alternative, with unique ID's in the index field. Additional columns can contain fixed attributes of the alternatives. (Reserved column names: '_capacity'.) mct_callable : callable Callable that samples alternatives to generate a table of choice scenarios. It should accept subsets of the choosers and alternatives tables and return a choicemodels.tools.MergedChoiceTable. probs_callable : callable Callable that generates predicted probabilities for a table of choice scenarios. It should accept a choicemodels.tools.MergedChoiceTable and return a pd.Series with indexes matching the input. alt_capacity : str, optional Name of a column in the alternatives table that expresses the capacity of alternatives. If not provided, each alternative is interpreted as accommodating a single chooser. chooser_size : str, optional Name of a column in the choosers table that expresses the size of choosers. Choosers might have varying sizes if the alternative capacities are amounts rather than counts -- e.g. square footage or employment capacity. Chooser sizes must be in the same units as alternative capacities. If not provided, each chooser has a size of 1. max_iter : int or None, optional Maximum number of iterations. If None (default), the algorithm will iterate until all choosers are matched or no alternatives remain. chooser_batch_size : int or None, optional Size of the batches for processing smaller groups of choosers one at a time. Useful when the anticipated size of the merged choice tables (choosers X alternatives X covariates) will be too large for python/pandas to handle. Returns ------- pd.Series List of chosen alternative id's, indexed with the chooser (observation) id. """ choosers = choosers.copy() alternatives = alternatives.copy() if alt_capacity is None: alt_capacity = '_capacity' alternatives.loc[:, alt_capacity] = 1 if chooser_size is None: chooser_size = '_size' choosers.loc[:, chooser_size] = 1 if chooser_batch_size is None or chooser_batch_size > len(choosers): obs_batches = [choosers.index.values] else: obs_batches = [ choosers.index.values[x:x + chooser_batch_size] for x in range(0, len(choosers), chooser_batch_size)] num_cpus = cpu_count() num_batches = len(obs_batches) if num_batches > num_cpus: warnings.warn( "The specified batch size yields more batches than there " "are vCPU's for parallel processing on this computer " "({0} vs. {1}). To optimize this code, choose a larger " "batch size or consider using the iterative_lottery_choices() " "method instead.".format(num_batches, num_cpus)) manager = Manager() shared_choices_dict = manager.dict() alternatives[alt_capacity] = 1 shared_chosen_alts = Array('i', len(choosers)) jobs = [] for b, batch in enumerate(obs_batches): obs = choosers.loc[batch] proc = Process( target=_parallel_lottery_choices_worker, args=( obs, alternatives, shared_choices_dict, shared_chosen_alts, mct_callable, probs_callable, alt_capacity, chooser_size, b, chooser_batch_size) ) proc.start() jobs.append(proc) for j, job in tqdm(enumerate(jobs), total=len(jobs)): job.join() choices_dict = shared_choices_dict._getvalue() # convert choices dict to series with original index names out_choices = pd.Series(choices_dict) out_choices.index.name = choosers.index.name out_choices.name = alternatives.index.name return out_choices