Source code for urbansim_templates.utils

from __future__ import print_function

import re
from datetime import datetime as dt

import pandas as pd

import orca
from urbansim.models.util import (apply_filter_query, columns_in_filters, 
        columns_in_formula)


################################
## SPEC AND FORMAT MANAGEMENT ##
################################

[docs]def validate_template(cls): """ Checks whether a template class meets the basic expectations for working with ModelManager, to aid in development and testing. Looks for 'to_dict', 'from_dict', and 'run' methods, and 'name', 'tags', 'template', and 'template_version' attributes. Checks that an object can be instantiated without arguments, plus some additional behaviors. See documentation for a full description of ModelManager specs and guidelines. There are many behaviors this does NOT check, because we don't know what particular parameters are expected and valid for a given template. For example, saving a configured model step and reloading it should produce an equivalent object, but this needs to be checked in template-specific unit tests. Parameters ---------- cls : class Template class. Returns ------- bool """ try: m = cls() except: print("Error instantiating object without arguments") raise methods = ['to_dict', 'from_dict', 'run'] for item in methods: if item not in dir(cls): print("Expecting a '{}' method".format(item)) return False try: d = m.to_dict() except: print("Error running 'to_dict()'") raise params = ['name', 'tags', 'template', 'template_version'] for item in params: if item not in d: print("Expecting a '{}' key in dict representation".format(item)) return False if (d['template'] != m.__class__.__name__): print("Expecting 'template' value in dict to match the class name") return False try: cls.from_dict(m.to_dict()) except: print("Error instantiating object with 'from_dict()' method") raise # TO DO - check supplemental objects? (but nothing there with unconfigured steps) return True
##################################### ## REPLACEMENT FOR ORCA BROADCASTS ## ##################################### """ These utilities provide functionality for merging tables using implicit join keys instead of Orca broadcasts. See Github issue #78 for discussion of the rationale. """
[docs]def validate_table(table, reciprocal=True): """ Check some basic expectations about an Orca table: - Confirm that it includes a unique, named index column (a.k.a. primary key) or set of columns (multi-index, a.k.a. composite key). If not, raise a ValueError. - Confirm that none of the other columns in the table share names with the index(es). If they do, raise a ValueError. - If the table contains columns whose names match the index columns of other tables registered with Orca, check whether they make sense as join keys. This prints a status message with the number of presumptive foreign-key values that are found in the primary/composite key, for evaluation by the user. - Perform the same check for columns in _other_ tables whose names match the index column(s) of _this_ table. - It doesn't currently compare indexes to indexes. (Maybe it should?) Running this will trigger loading all registered Orca tables, which may take a while. Stand-alone columns will not be loaded unless their names match an index column. Doesn't currently incorporate ``orca_test`` validation, but it might be added. Parameters ---------- table : str Name of Orca table to validate. reciprocal : bool, default True Whether to also check how columns of other tables align with this one's index. If False, only check this table's columns against other tables' indexes. Returns ------- bool """ # There are a couple of reasons we're not using the orca_test library here: # (a) orca_test doesn't currently support MultiIndexes, and (b) the primary-key/ # foreign-key comparisons aren't asserting anything, just printing status # messages. We should update orca_test to support both, probably. if not orca.is_table(table): raise ValueError("Table not registered with Orca: '{}'".format(table)) idx = orca.get_table(table).index # Check index has a name if list(idx.names) == [None]: raise ValueError("Index column has no name") # Check for unique column names for name in list(idx.names): if name in list(orca.get_table(table).columns): raise ValueError("Index names and column names overlap: '{}'".format(name)) # Check for unique index values if len(idx.unique()) < len(idx): raise ValueError("Index not unique") # Compare columns to indexes of other tables, and vice versa combinations = [(table, t) for t in orca.list_tables() if table != t] if reciprocal: combinations += [(t, table) for t in orca.list_tables() if table != t] for t1, t2 in combinations: col_names = orca.get_table(t1).columns idx = orca.get_table(t2).index if set(idx.names).issubset(col_names): vals = orca.get_table(t1).to_frame(idx.names).drop_duplicates() # Easier to compare multi-column values to multi-column index if we # turn the values into an index as well vals = vals.reset_index().set_index(idx.names).index vals_in_idx = sum(vals.isin(idx)) if len(idx.names) == 1: idx_str = idx.names[0] else: idx_str = '[{}]'.format(','.join(idx.names)) print("'{}.{}': {} of {} unique values are found in '{}.{}' ({}%)"\ .format(t1, idx_str, vals_in_idx, len(vals), t2, idx_str, round(100*vals_in_idx/len(vals)))) return True
[docs]def validate_all_tables(): """ Validate all tables registered with Orca. See ``validate_table()`` above. Returns ------- bool """ for t in orca.list_tables(): validate_table(t, reciprocal=False)
[docs]def merge_tables(tables, columns=None): """ Merge two or more tables into a single DataFrame. All the data will eventually be merged onto the first table in the list. In each merge stage, we'll refer to the right-hand table as the "source" and the left-hand one as the "target". Tables are merged using ModelManager schema rules: The source table must have a unique index, and the target table must have a column with a matching name, which will be used as the join key. Multi-indexes are fine, but all of the index columns need to be present in the target table. The last table in the list is the initial source. The algorithm searches backward through the list for a table that qualifies as a target. The source table is left- joined onto the target, and then the algorithm continues with the second-to-last table as the new source. Example 1: Tables A and B share join keys. Tables B and C share join keys. Merging [A, B, C] will left-join C onto B, and then left-join the result onto A. Example 2: Tables A and B share join keys. Tables A and C also share join keys, but tables B and C don't. Merging [A, B, C] will left-join C onto A, and then left-join B onto the result of the first join. If you provide a list of ``columns``, the output table will be limited to columns in this list. The index(es) of the left-most table will always be retained, but it's a good practice to list them anyway. Column names not found will be ignored. If two tables contain columns with identical names (other than join keys), they can't be automatically merged. If the columns are just incidental and not needed in the final output, you can perform the merge by providing a ``columns`` list that excludes them. A note about data types: They will be retained, but if NaN values need to be added (e.g. if some identifiers from the target table aren't found in the source table), data may need to be cast to a type that allows missing values. For better control over this, see ``urbansim_templates.data.ColumnFromBroadcast()``. Parameters ---------- tables : list of str, orca.DataFrameWrapper, orca.TableFuncWrapper, or pd.DataFrame Two or more tables to merge. Types can be mixed and matched. columns : list of str, optional Names of columns to retain in the final output. Returns ------- pd.DataFrame """ while len(tables) > 1: # last table becomes the source source = get_df(tables[-1], columns) keys = list(source.index.names) # search for target table target_position = None for i in range(len(tables)-2, -1, -1): if set(keys).issubset(set(all_cols(tables[i]))): target_position = i target_columns = columns + keys if columns is not None else None target = get_df(tables[i], target_columns) break if target_position is None: msg = "Could not find a target to merge table {} onto".format(len(tables)) raise ValueError(msg) # merge source onto target merged = target.join(source, on=keys, how='left') # pandas 0.23+ for on=keys tables = tables[:-1] tables[target_position] = merged # drop final merge keys if not needed merged = trim_cols(merged, columns) return merged
############################### ## TEMPLATE HELPER FUNCTIONS ## ###############################
[docs]def get_df(table, columns=None): """ Returns a table as a ``pd.DataFrame``. Input can be an Orca table name, ``orca.DataFrameWrapper``, ``orca.TableFuncWrapper``, or ``pd.DataFrame``. Optionally, columns can be limited to those that appear in a list of names. The list may contain duplicates or columns not in the table. Index(es) will always be retained, but it's a good practice to list them anyway. Parameters ---------- table : str, orca.DataFrameWrapper, orca.TableFuncWrapper, or pd.DataFrame columns : list of str, optional Returns ------- pd.DataFrame """ if type(table) not in [str, orca.DataFrameWrapper, orca.TableFuncWrapper, pd.DataFrame]: raise ValueError("Table has unsupported type: {}".format(type(table))) if type(table) == pd.DataFrame: return trim_cols(table, columns) elif type(table) == str: table = orca.get_table(table) if columns is not None: # Orca requires column list to be unique and existing, or None columns = list(set(columns) & set(table.columns)) return table.to_frame(columns=columns)
[docs]def all_cols(table): """ Returns a list of all column names in a table, including index(es). Input can be an Orca table name, ``orca.DataFrameWrapper``, ``orca.TableFuncWrapper``, or ``pd.DataFrame``. Parameters ---------- table : str, orca.DataFrameWrapper, orca.TableFuncWrapper, or pd.DataFrame Returns ------- list of str """ if type(table) not in [str, orca.DataFrameWrapper, orca.TableFuncWrapper, pd.DataFrame]: raise ValueError("Table has unsupported type: {}".format(type(table))) if type(table) == str: table = orca.get_table(table) return list(table.index.names) + list(table.columns)
[docs]def cols_in_expression(expression): """ Extract all possible column names from a ``df.eval()``-style expression. This is achieved using regex to identify tokens in the expression that begin with a letter and contain any number of alphanumerics or underscores, but do not end with an opening parenthesis. This excludes function names, but would not exclude constants (e.g. "pi"), which are semantically indistinguishable from column names. Parameters ---------- expression : str Returns ------- cols : list of str """ return re.findall('[a-zA-Z_][a-zA-Z0-9_]*(?!\()', expression)
[docs]def trim_cols(df, columns=None): """ Limit a DataFrame to columns that appear in a list of names. List may contain duplicates or names not in the DataFrame. Index(es) of the DataFrame will always be retained, but it's a good practice to list them anyway. If ``columns`` is None, all columns are retained. Returns the original DataFrame, not a copy. Parameters ---------- df : pd.DataFrame columns : list of str, optional Returns ------- pd.DataFrame """ if columns is None: return df cols = set(columns) & set(df.columns) # unique, existing columns return df[list(cols)]
[docs]def to_list(items): """ In many places we accept either a single string or a list of strings. This function normalizes None -> [None], str -> [str], and leaves lists unchanged. Parameters ---------- items : str, list, or None Returns ------- list """ if not isinstance(items, list): items = [items] return items
[docs]def update_name(template, name=None): """ Generate a name for a configured model step, based on its template class and the current timestamp. But if a custom name has already been provided, return that instead. (A name is judged to be custom if it does not contain the class type.) Parameters ---------- template : str Template class name. name : str, optional Existing name for the configured model step. Returns ------- str """ if (name is None) or (template in name): return template + '-' + dt.now().strftime('%Y%m%d-%H%M%S') else: return name
[docs]def get_data(tables, fallback_tables=None, filters=None, model_expression=None, extra_columns=None): """ Generate a ``pd.DataFrame`` for model estimation or simulation. Automatically loads tables from Orca, merges them, and removes columns not referenced in a model expression or data filter. Additional columns can be requested. If filters are provided, the output will include only rows that match the filter criteria. See ``urbansim_templates.utils.merge_tables()`` for a detailed description of how the merges are performed. Parameters ---------- tables : str or list of str Orca table(s) to draw data from. fallback_tables : str or list of str, optional Table(s) to use if first parameter evaluates to `None`. (This option will be removed shortly when estimation and simulation settings are separated.) filters : str or list of str, optional Filter(s) to apply to the merged data, using `pd.DataFrame.query()`. model_expression : str, optional Model expression that will be evaluated using the output data. Only used to drop non-relevant columns. PyLogit format is not yet supported. extra_columns : str or list of str, optional Columns to include, in addition to any in the model expression and filters. (If this and the model_expression are both None, all columns will be included.) Returns ------- pd.DataFrame """ if tables is None: tables = fallback_tables colnames = None # this will get all columns if (model_expression is not None) or (extra_columns is not None): colnames = list(set(columns_in_formula(model_expression) + \ columns_in_filters(filters) + to_list(extra_columns))) if not isinstance(tables, list): df = get_df(tables, colnames) else: df = merge_tables(tables, colnames) df = apply_filter_query(df, filters) return df
[docs]def update_column(table, column, data, fallback_table=None, fallback_column=None): """ Update an Orca column. If it doesn't exist yet, add it to the wrapped DataFrame. Values will be aligned using the indexes if possible. Data types: If the column already exists, new values will be cast to match the existing data type. If the column is new, it will retain the data type of the pd.Series that's passed to this function -- unless it doesn't fully align with the table's index, in which case it may be cast to allow missing values (e.g. from int to float). Parameters ---------- table : str or list of str Name of Orca table to update. If list, the first element will be used. column : str Name of existing column to update, or new column to create. Cannot be an index. data : pd.Series Column of data to update or add. fallback_table : str or list of str Name of Orca table to use if ``table`` evaluates to None. fallback_column : str Name of Orca column to use if ``column`` evaluates to None. Returns ------- None """ if table is None: table = fallback_table if isinstance(table, list): table = table[0] if column is None: column = fallback_column dfw = orca.get_table(table) if column not in dfw.columns: dfw.update_col(column, data) # adds column else: dfw.update_col_from_series(column, data, cast=True) # updates existing column
######################## ## VERSION MANAGEMENT ## ########################
[docs]def parse_version(v): """ Parses a version string into its component parts. String is expected to follow the pattern "0.1.1.dev0", which would be parsed into (0, 1, 1, 0). The first two components are required. The third is set to 0 if missing, and the fourth to None. Parameters ---------- v : str Version string using syntax described above. Returns ------- tuple with format (int, int, int, int or None) """ v4 = None if 'dev' in v: v4 = int(v.split('dev')[1]) v = v.split('dev')[0] # 0.1.dev0 -> 0.1. if (v[-1] == '.'): v = v[:-1] # 0.1. -> 0.1 v = v.split('.') v3 = 0 if (len(v) == 3): v3 = int(v[2]) v2 = int(v[1]) v1 = int(v[0]) return (v1, v2, v3, v4)
[docs]def version_greater_or_equal(a, b): """ Tests whether version string 'a' is greater than or equal to version string 'b'. Version syntax should follow the pattern described for `version_parse()`. Note that 'dev' versions are pre-releases, so '0.2' < '0.2.1.dev5' < '0.2.1'. Parameters ---------- a : str First version string, formatted as described in `version_parse()`. b : str Second version string, formatted as described in `version_parse()`. Returns ------- boolean """ a = parse_version(a) b = parse_version(b) if (a[0] > b[0]): return True elif (a[0] == b[0]): if (a[1] > b[1]): return True elif (a[1] == b[1]): if (a[2] > b[2]): return True elif (a[2] == b[2]): if (a[3] == None): return True elif (b[3] == None): return False elif (a[3] >= b[3]): return True return False