bw_timex.timeline_builder#

Module Contents#

Classes#

TimelineBuilder

Class for building a process timeline based on the temporal distributions of their exchanges.

class bw_timex.timeline_builder.TimelineBuilder(base_lca: bw2calc.LCA, starting_datetime: datetime.datetime, edge_filter_function: Callable, database_dates: dict, database_dates_static: dict, activity_time_mapping: dict, node_collections: dict, nodes: dict, temporal_grouping: str = 'year', interpolation_type: str = 'linear', cutoff: float = 1e-09, max_calc: int = 2000, graph_traversal: str = 'priority', *args, **kwargs)[source]#

Class for building a process timeline based on the temporal distributions of their exchanges.

First, the EdgeExtractor does a priority-first graph traversal and extracts a timeline of exchanges (edge_timeline) with temporal information. Identical edges within temporal grouping (e.g. year, month, day, hour) are then grouped and the amount of exchanges is summed up.

Parameters:
  • base_lca (LCA) – A static LCA object.

  • starting_datetime (datetime | str, optional) – Point in time when the demand occurs.

  • edge_filter_function (Callable) – A callable that filters edges. If not provided, a function that always returns False is used.

  • database_dates (dict) – A dictionary mapping databases to dates.

  • database_dates_static (dict) – same as database_dates, but excluding the “dynamic” foreground databases.

  • activity_time_mapping (dict) – A dictionary to map processes to specific times.

  • node_collections (dict) – A dictionary collecting useful subsets of node ids.

  • nodes (dict) – A dictionary {node_id: ‘bw2data.backends.proxies.Activity’} for all nodes.

  • temporal_grouping (str, optional) – The temporal grouping to be used. Default is “year”.

  • interpolation_type (str, optional) – The type of interpolation to be used to select the background databases. Default is “linear”.

  • cutoff – The cutoff value for the graph traversal. Default is 1e-9.

  • max_calc – The maximum number of calculations to be performed by the graph traversal. Default is 2000.

  • args (Variable length argument list) – Keyword arguments passed to the EdgeExtractor which inherits from TemporalisLCA. Here, things like the further settings for graph traversal can be set. For details, see bw_temporalis.TemporalisLCA.

  • kwargs (Arbitrary keyword arguments) – Keyword arguments passed to the EdgeExtractor which inherits from TemporalisLCA.

build_timeline() pandas.DataFrame[source]#

Create a DataFrame with grouped, time-explicit edges and, for each grouped edge, interpolate to the database with the closest time of representativeness.

It uses the edge_timeline, an output from the graph traversal in EdgeExtractor. Edges from same producer to same consumer that occur at different times within the same time window (temporal_grouping) are grouped together. Possible temporal groupings are “year”, “month”, “day” and “hour”.

For edges between foreground and background system, the column “temporal_market_shares” assigns the ratio [0-1] of the edge’s amount to be taken from the database with the closest time of representativeness. If a process is in the foreground system only, the interpolation weight is set to None.

Available interpolation types are:

  • “linear”: linear interpolation between the two closest databases, based on temporal distance.

  • “closest”: closest database is assigned 1

Parameters:

None – (all are already passed during instantiation)

Returns:

A timeline with grouped, time-explicit edges and temporal_market_shares to background databases.

Return type:

pd.DataFrame

check_database_names() None[source]#

Check that the strings of the databases exist in the databases of the Brightway project.

extract_edge_data(edge: bw_timex.edge_extractor.Edge) dict[source]#

Stores the attributes of an Edge instance in a dictionary.

Parameters:

edge (Edge) – Edge instance

Returns:

Dictionary with the attributes of the edge instance.

Return type:

dict

adjust_sign_of_amount_based_on_edge_type(edge_type)[source]#

It checks if the an exchange is of type substitution or a technosphere exchange, based on bw2data labelling convention, and adjusts the amount accordingly. Flips the sign of the amount value in the timeline for substitution (positive technosphere) exchanges.

Parameters:

edge_type (str) – Type of the edge, as defined in the exchange data.

Returns:

Multiplier for the amount value, 1 for technosphere exchanges, -1 for substitution exchanges.

Return type:

int

get_time_mapping_key(node_id: int, node_hash: int) int[source]#

Returns the time_mapping_id (key) from the activity_time_mapping for a given node.

Parameters:
  • node_id (int) – database id of the node.

  • node_hash (int) – datetime_as_integer of the node.

Returns:

time_mapping_id (key) of the corresponding time-mapped activity.

Return type:

int

add_column_temporal_market_shares_to_timeline(tl_df: pandas.DataFrame, interpolation_type: str = 'linear') pandas.DataFrame[source]#

Add a column to a timeline with the weights for an interpolation between the two nearest dates, from the list of dates of the available databases.

Parameters:
  • tl_df (pd.DataFrame) – Timeline as a DataFrame.

  • interpolation_type (str, optional) – Type of interpolation between the nearest lower and higher dates. Available options: “linear” and “nearest”, defaulting to “linear”.

Returns:

Timeline as a DataFrame with a column ‘temporal_market_shares’ added, this column looks like {database_name: weight, database_name: weight}.

Return type:

pd.DataFrame

find_closest_date(target: datetime.datetime, dates: tuple[datetime.datetime, Ellipsis]) dict[source]#

Find the closest date to the target in the dates list.

Parameters:
  • target (datetime.datetime) – Target datetime object.

  • dates (KeysView[datetime]) – List of datetime.datetime objects.

Returns:

Dictionary with the key as the closest datetime.datetime object from the list and a value of 1.

Return type:

dict

get_weights_for_interpolation_between_nearest_years(reference_date: datetime.datetime, dates_list: tuple[datetime.datetime, Ellipsis], interpolation_type: str | None = None) dict[source]#

Find the nearest dates (lower and higher) for a given date from a list of dates and calculate the interpolation weights based on temporal proximity.

Parameters:
  • reference_date (datetime) – Target date.

  • dates_list (KeysView[datetime]) – List of datetime objects representing the temporal representativeness of the available databases.

  • interpolation_type (str, optional) – Type of interpolation between the nearest lower and higher dates. For now, only “linear” is available.

Returns:

Dictionary with datetimes of the available closest databases as keys and the weights for interpolation as values.

Return type:

dict

add_interpolation_weights_at_intersection_to_background(row) dict | None[source]#

returns the interpolation weights to background databases only for those exchanges, where the producing process actually comes from a background database (temporal markets).

Only these processes are receiving inputs from the background databases. All other process in the timeline are not directly linked to the background, so the interpolation weight info is not needed and set to None

Parameters:

row (pd.Series) – Row of the timeline DataFrame

Returns:

Dictionary with the name of databases and interpolation weights.

Return type:

dict

get_consumer_name(idx: int) str[source]#

Returns the name of consumer node. If consuming node is the functional unit, returns -1.

Parameters:

id (int) – Id of node.

Returns:

Name of the node or -1

Return type:

str