:py:mod:`bw_timex.timeline_builder` =================================== .. py:module:: bw_timex.timeline_builder Module Contents --------------- Classes ~~~~~~~ .. autoapisummary:: bw_timex.timeline_builder.TimelineBuilder .. py:class:: TimelineBuilder(base_lca: bw2calc.LCA, starting_datetime: datetime.datetime, edge_filter_function: Callable, database_dates: dict, database_dates_static: dict, activity_time_mapping: dict, node_collections: dict, nodes: dict, temporal_grouping: str = 'year', interpolation_type: str = 'linear', cutoff: float = 1e-09, max_calc: int = 2000, graph_traversal: str = 'priority', *args, **kwargs) Class for building a process timeline based on the temporal distributions of their exchanges. First, the `EdgeExtractor` does a priority-first graph traversal and extracts a timeline of exchanges (edge_timeline) with temporal information. Identical edges within temporal grouping (e.g. year, month, day, hour) are then grouped and the amount of exchanges is summed up. :param base_lca: A static LCA object. :type base_lca: LCA :param starting_datetime: Point in time when the demand occurs. :type starting_datetime: datetime | str, optional :param edge_filter_function: A callable that filters edges. If not provided, a function that always returns False is used. :type edge_filter_function: Callable :param database_dates: A dictionary mapping databases to dates. :type database_dates: dict :param database_dates_static: same as database_dates, but excluding the "dynamic" foreground databases. :type database_dates_static: dict :param activity_time_mapping: A dictionary to map processes to specific times. :type activity_time_mapping: dict :param node_collections: A dictionary collecting useful subsets of node ids. :type node_collections: dict :param nodes: A dictionary {node_id: 'bw2data.backends.proxies.Activity'} for all nodes. :type nodes: dict :param temporal_grouping: The temporal grouping to be used. Default is "year". :type temporal_grouping: str, optional :param interpolation_type: The type of interpolation to be used to select the background databases. Default is "linear". :type interpolation_type: str, optional :param cutoff: The cutoff value for the graph traversal. Default is 1e-9. :param max_calc: The maximum number of calculations to be performed by the graph traversal. Default is 2000. :param args: Keyword arguments passed to the EdgeExtractor which inherits from TemporalisLCA. Here, things like the further settings for graph traversal can be set. For details, see bw_temporalis.TemporalisLCA. :type args: Variable length argument list :param kwargs: Keyword arguments passed to the EdgeExtractor which inherits from TemporalisLCA. :type kwargs: Arbitrary keyword arguments .. py:method:: build_timeline() -> pandas.DataFrame Create a DataFrame with grouped, time-explicit edges and, for each grouped edge, interpolate to the database with the closest time of representativeness. It uses the edge_timeline, an output from the graph traversal in EdgeExtractor. Edges from same producer to same consumer that occur at different times within the same time window (temporal_grouping) are grouped together. Possible temporal groupings are "year", "month", "day" and "hour". For edges between foreground and background system, the column "temporal_market_shares" assigns the ratio [0-1] of the edge's amount to be taken from the database with the closest time of representativeness. If a process is in the foreground system only, the interpolation weight is set to None. Available interpolation types are: - "linear": linear interpolation between the two closest databases, based on temporal distance. - "closest": closest database is assigned 1 :param None: (all are already passed during instantiation) :returns: A timeline with grouped, time-explicit edges and temporal_market_shares to background databases. :rtype: pd.DataFrame .. py:method:: check_database_names() -> None Check that the strings of the databases exist in the databases of the Brightway project. .. py:method:: extract_edge_data(edge: bw_timex.edge_extractor.Edge) -> dict Stores the attributes of an Edge instance in a dictionary. :param edge: Edge instance :type edge: Edge :returns: Dictionary with the attributes of the edge instance. :rtype: dict .. py:method:: adjust_sign_of_amount_based_on_edge_type(edge_type) It checks if the an exchange is of type substitution or a technosphere exchange, based on bw2data labelling convention, and adjusts the amount accordingly. Flips the sign of the amount value in the timeline for substitution (positive technosphere) exchanges. :param edge_type: Type of the edge, as defined in the exchange data. :type edge_type: str :returns: Multiplier for the amount value, 1 for technosphere exchanges, -1 for substitution exchanges. :rtype: int .. py:method:: get_time_mapping_key(node_id: int, node_hash: int) -> int Returns the time_mapping_id (key) from the activity_time_mapping for a given node. :param node_id: database id of the node. :type node_id: int :param node_hash: datetime_as_integer of the node. :type node_hash: int :returns: time_mapping_id (key) of the corresponding time-mapped activity. :rtype: int .. py:method:: add_column_temporal_market_shares_to_timeline(tl_df: pandas.DataFrame, interpolation_type: str = 'linear') -> pandas.DataFrame Add a column to a timeline with the weights for an interpolation between the two nearest dates, from the list of dates of the available databases. :param tl_df: Timeline as a DataFrame. :type tl_df: pd.DataFrame :param interpolation_type: Type of interpolation between the nearest lower and higher dates. Available options: "linear" and "nearest", defaulting to "linear". :type interpolation_type: str, optional :returns: Timeline as a DataFrame with a column 'temporal_market_shares' added, this column looks like {database_name: weight, database_name: weight}. :rtype: pd.DataFrame .. py:method:: find_closest_date(target: datetime.datetime, dates: tuple[datetime.datetime, Ellipsis]) -> dict Find the closest date to the target in the dates list. :param target: Target datetime object. :type target: datetime.datetime :param dates: List of datetime.datetime objects. :type dates: KeysView[datetime] :returns: Dictionary with the key as the closest datetime.datetime object from the list and a value of 1. :rtype: dict .. py:method:: get_weights_for_interpolation_between_nearest_years(reference_date: datetime.datetime, dates_list: tuple[datetime.datetime, Ellipsis], interpolation_type: str | None = None) -> dict Find the nearest dates (lower and higher) for a given date from a list of dates and calculate the interpolation weights based on temporal proximity. :param reference_date: Target date. :type reference_date: datetime :param dates_list: List of datetime objects representing the temporal representativeness of the available databases. :type dates_list: KeysView[datetime] :param interpolation_type: Type of interpolation between the nearest lower and higher dates. For now, only "linear" is available. :type interpolation_type: str, optional :returns: Dictionary with datetimes of the available closest databases as keys and the weights for interpolation as values. :rtype: dict .. py:method:: add_interpolation_weights_at_intersection_to_background(row) -> Union[dict, None] returns the interpolation weights to background databases only for those exchanges, where the producing process actually comes from a background database (temporal markets). Only these processes are receiving inputs from the background databases. All other process in the timeline are not directly linked to the background, so the interpolation weight info is not needed and set to None :param row: Row of the timeline DataFrame :type row: pd.Series :returns: Dictionary with the name of databases and interpolation weights. :rtype: dict .. py:method:: get_consumer_name(idx: int) -> str Returns the name of consumer node. If consuming node is the functional unit, returns -1. :param id: Id of node. :type id: int :returns: Name of the node or -1 :rtype: str