from collections.abc import Mapping
from datetime import datetime
from functools import partial
from typing import Callable, Optional
import bw2data as bd
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import seaborn as sb
from bw2calc import LCA
from bw2data import (
Database,
Method,
Normalization,
Weighting,
databases,
methods,
normalizations,
projects,
weightings,
)
from bw2data.backends.schema import ActivityDataset as AD
from bw2data.backends.schema import get_id
from bw2data.errors import Brightway2Project
from dynamic_characterization import characterize
from loguru import logger
from peewee import fn
from scipy import sparse
from ._lci_cache import BACKGROUND_UNIT_LCI_CACHE, LCI_SOLVE_CACHE
[docs]
FACTORIZE_SOLVES_THRESHOLD = 8
from .dynamic_biosphere_builder import DynamicBiosphereBuilder
from .helper_classes import InterDatabaseMapping, TimeMappingDict
from .matrix_modifier import MatrixModifier
from .timeline_builder import TimelineBuilder
from .utils import (
convert_date_string_to_datetime,
extract_date_as_integer,
round_datetime,
round_datetime_series_to_year,
)
from .validation import (
BuildTimelineInputs,
DynamicLCIAInputs,
LCIInputs,
PlotDynamicInventoryInputs,
TimexLCAInputs,
)
[docs]
class TimexLCA:
"""
Class to perform time-explicit LCA calculations.
A TimexLCA contains the LCI of processes occurring at explicit points in time. It tracks the timing of processes,
relinks their technosphere and biosphere exchanges to match the technology landscape at that point in time,
and also keeps track of the timing of the resulting emissions. As such, it combines prospective and dynamic LCA
approaches.
TimexLCA first calculates a static LCA, which informs a priority-first graph traversal. From the
graph traversal, temporal relationships between exchanges and processes are derived. Based on
the timing of the processes, bw_timex matches the processes at the intersection between
foreground and background to the best available background databases. This temporal relinking is
achieved by using datapackages to add new time-specific processes. The new processes and their
exchanges to other technosphere processes or biosphere flows extent the technosphere and
biosphere matrices.
Temporal information of both processes and biosphere flows is retained, allowing for dynamic
LCIA.
TimexLCA calculates:
1) a static "base" LCA score (`TimexLCA.base_score`, same as `bw2calc.lca.score`),
2) a static time-explicit LCA score (`TimexLCA.static_score`), which links LCIs to the
respective background databases, but without dynamic characterization of the time-explicit inventory
3) a dynamic time-explicit LCA score (`TimexLCA.dynamic_score`), with dynamic inventory and
dynamic characterization. These are provided for radiative forcing and GWP but can also be
user-defined.
Example
-------
>>> demand = {('my_foreground_database', 'my_process'): 1}
>>> method = ("some_method_family", "some_category", "some_method")
>>> database_dates = {
'my_background_database_one': datetime.strptime("2020", "%Y"),
'my_background_database_two': datetime.strptime("2030", "%Y"),
'my_background_database_three': datetime.strptime("2040", "%Y"),
'my_foreground_database':'dynamic'
}
>>> tlca = TimexLCA(demand, method, database_dates)
>>> tlca.build_timeline() # has many optional arguments
>>> tlca.lci()
>>> tlca.static_lcia()
>>> print(tlca.static_score)
>>> tlca.dynamic_lcia(metric="radiative_forcing") # also available: "GWP"
>>> print(tlca.dynamic_score)
""" """"""
def __init__(
self,
demand: dict,
method: tuple,
database_dates: dict = None,
use_global_lci_cache: bool = True,
) -> None:
"""
Instantiating a `TimexLCA` object calculates a static LCA, initializes time mappings
for activities and biosphere flows, and stores useful subsets of ids in the
node_collections.
Parameters
----------
demand : dict[object: float]
The demand for which the LCA will be calculated. The keys can be Brightway `Node`
instances, `(database, code)` tuples, or integer ids.
method : tuple
Tuple defining the LCIA method, such as `('foo', 'bar')` or default methods, such as
`("EF v3.1", "climate change", "global warming potential (GWP100)")`
database_dates : dict, optional
Dictionary mapping database names to dates.
use_global_lci_cache : bool, optional
If True (default), background unit LCI matrices are cached at
module level and reused across `TimexLCA` objects within the
same Python session. The cache is keyed by background process
identity plus the database's `modified` token, so edits to a
background database invalidate stale entries automatically. Set
to False to isolate this object's caching (e.g. when mutating
background databases via raw SQL that bypasses bw2data). The
module-level cache can be cleared with
`bw_timex.clear_background_lci_cache()`.
"""
logger.info("Initializing TimexLCA object...")
self.demand = demand
self.method = method
self.database_dates = database_dates
if not self.database_dates:
logger.info(
"No database_dates provided. Treating the databases containing the functional \
unit as dynamic. No remapping of inventories to time explicit databases will be done."
)
self.database_dates = {key[0]: "dynamic" for key in demand.keys()}
TimexLCAInputs(
demand=self.demand, method=self.method, database_dates=self.database_dates
)
logger.info("Calculating base LCA...")
# Calculate static LCA results using a custom prepare_lca_inputs function that includes all
# background databases in the LCA. We need all the IDs for the time mapping dict.
fu, data_objs, remapping = self.prepare_base_lca_inputs(
demand=self.demand, method=self.method
)
self.base_lca = LCA(fu, data_objs=data_objs, remapping_dicts=remapping)
self.base_lca.lci()
self.base_lca.lcia()
# Create static_only dict that excludes dynamic processes that will be exploded later.
# This way we only have the "background databases" that we can later link to from the dates
# of the timeline.
self.database_dates_static = {
k: v for k, v in self.database_dates.items() if isinstance(v, datetime)
}
logger.info("Collecting node infos...")
# Create some collections of nodes that will be useful down the line, e.g. all nodes from
# the background databases that link to foreground nodes.
self.create_node_collections()
self.interdatabase_activity_mapping = InterDatabaseMapping()
# Getting all nodes from the databases for faster lookup later
all_nodes = bd.backends.ActivityDataset.select().where(
bd.backends.ActivityDataset.database << list(self.database_dates.keys())
)
self.nodes = {node.id: bd.backends.Activity(node) for node in all_nodes}
# Build a cache mapping activity code to name for efficient lookups
# This avoids repeated database queries in plotting and labeling functions
self._activity_code_to_name_cache = {
node["code"]: node["name"] for node in self.nodes.values()
}
self._last_timeline_build_key = None
self._cached_timeline = None
self._default_edge_filter_function = None
self._dynamic_lcia_inventory_cache = {}
self._background_unit_lci_cache = (
BACKGROUND_UNIT_LCI_CACHE if use_global_lci_cache else {}
)
# Whether the last lci() call restored its supply_array / inventory
# from `LCI_SOLVE_CACHE` instead of running a fresh `spsolve`.
self._lci_used_cached_solve = False
# Whether the last lci() call LU-factorized the expanded technosphere.
self._lci_did_factorize = False
########################################
# Main functions to be called by users #
########################################
[docs]
def build_timeline(
self,
starting_datetime: datetime | str = "now",
temporal_grouping: str = "year",
interpolation_type: str = "linear",
edge_filter_function: Callable = None,
cutoff: float = 1e-9,
max_calc: int = 2000,
graph_traversal: str = "priority",
*args,
**kwargs,
) -> pd.DataFrame:
"""
Creates a `TimelineBuilder` instance that does the graph traversal (similar to
bw_temporalis) and extracts all edges with their temporal information. Creates the
`TimexLCA.timeline` of technosphere exchanges.
Parameters
----------
starting_datetime: datetime | str, optional
Point in time when the demand occurs. This is the initial starting point of the
graph traversal and the timeline. Something like `"now"` or `"2023-01-01"`.
Default is `"now"`.
temporal_grouping : str, optional
Time resolution for grouping exchanges over time in the timeline. Default is 'year',
other options are 'month', 'day', 'hour'.
interpolation_type : str, optional
Type of interpolation when sourcing the new producers in the time-mapped background
databases. Default is 'linear', which means linear interpolation between the closest 2
databases, other options are 'nearest' (or 'closest'), which selects only the closest database.
edge_filter_function : Callable, optional
Function to skip edges in the graph traversal. Default is to skip all edges within
background databases.
cutoff: float, optional
The cutoff value for the graph traversal. Default is 1e-9.
max_calc: float, optional
The maximum number of calculations to be performed by the graph traversal. Default is
2000.
graph_traversal : str, optional
The graph traversal algorithm to use. Default is 'priority' (priority-first,
using bw_temporalis TemporalisLCA). Alternative is 'bfs' (Breadth-First-Search,
independent of TemporalisLCA, avoids per-subgraph LCA overhead).
*args : iterable
Positional arguments for the graph traversal, for `bw_temporalis.TemporalisLCA` passed
to the `EdgeExtractor` class, which inherits from `TemporalisLCA`. See `bw_temporalis`
documentation for more information.
**kwargs : dict
Additional keyword arguments for the graph traversal, for `bw_temporalis.TemporalisLCA`
passed to the EdgeExtractor class, which inherits from TemporalisLCA. See bw_temporalis
documentation for more information.
Returns
-------
pandas.DataFrame:
A DataFrame containing the timeline of technosphere exchanges
See also
--------
bw_timex.timeline_builder.TimelineBuilder: Class that builds the timeline.
"""
validated = BuildTimelineInputs(
starting_datetime=starting_datetime,
temporal_grouping=temporal_grouping,
interpolation_type=interpolation_type,
edge_filter_function=edge_filter_function,
cutoff=cutoff,
max_calc=max_calc,
graph_traversal=graph_traversal,
)
interpolation_type = validated.interpolation_type
graph_traversal = validated.graph_traversal
timeline_cache_key = (
str(validated.starting_datetime),
temporal_grouping,
interpolation_type,
cutoff,
max_calc,
graph_traversal,
"default" if edge_filter_function is None else id(edge_filter_function),
)
if timeline_cache_key == self._last_timeline_build_key:
self.timeline = self._cached_timeline
return self.timeline[
[
"date_producer",
"producer_name",
"date_consumer",
"consumer_name",
"amount",
"temporal_market_shares",
]
]
if edge_filter_function is None:
logger.info(
"No edge filter function provided. Skipping all edges in background databases."
)
if self._default_edge_filter_function is None:
skippable = set()
for db in self.database_dates_static.keys():
skippable.update(node.id for node in bd.Database(db))
self._default_edge_filter_function = skippable.__contains__
self.edge_filter_function = self._default_edge_filter_function
else:
self.edge_filter_function = edge_filter_function
self.starting_datetime = starting_datetime
self.temporal_grouping = temporal_grouping
self.interpolation_type = interpolation_type
self.cutoff = cutoff
self.max_calc = max_calc
logger.info("Creating activity time mapping...")
# Create a time mapping dict that maps each activity to a activity_time_mapping_id in the
# format (('database', 'code'), datetime_as_integer): time_mapping_id)
self.activity_time_mapping = TimeMappingDict(
start_id=bd.backends.ActivityDataset.select(fn.MAX(AD.id)).scalar() + 1
) # making sure we get unique ids by counting up from the highest current activity id
# pre-populate the activity time mapping dict with the static activities.
# Doing this here because we need the temporal grouping for consistent time resolution.
self.add_static_activities_to_activity_time_mapping()
# Create timeline builder that does the graph traversal (similar to bw_temporalis) and
# extracts all edges with their temporal information. Can later be used to build a timeline
# with the TimelineBuilder.build_timeline() method.
self.timeline_builder = TimelineBuilder(
self.base_lca,
self.starting_datetime,
self.edge_filter_function,
self.database_dates,
self.database_dates_static,
self.activity_time_mapping,
self.node_collections,
self.nodes,
self.temporal_grouping,
self.interpolation_type,
self.cutoff,
self.max_calc,
graph_traversal=graph_traversal,
*args,
**kwargs,
)
self.timeline = self.timeline_builder.build_timeline()
self.add_interdatabase_activity_mapping_from_timeline()
self._last_timeline_build_key = timeline_cache_key
self._cached_timeline = self.timeline
self._dynamic_lcia_inventory_cache.clear()
return self.timeline[
[
"date_producer",
"producer_name",
"date_consumer",
"consumer_name",
"amount",
"temporal_market_shares",
]
]
[docs]
def lci(
self,
build_dynamic_biosphere: Optional[bool] = True,
expand_technosphere: Optional[bool] = True,
) -> None:
"""
Calculates the time-explicit LCI.
There are two ways to generate time-explicit LCIs:
If `expand_technosphere' is True, the biosphere and technosphere matrices are expanded by inserting
time-specific processes via the `MatrixModifier` class by calling `TimexLCA.build_datapackage().
Otherwise ('expand_technosphere' is False), it generates a dynamic inventory directly from the
timeline without technosphere matrix calculations.
Next to the choice above concerning how to retrieve the time-explicit inventory, users
can also decide if they want to retain all temporal information at the biosphere level
(build_dynamic_biosphere = True).
Set `build_dynamic_biosphere` to False if you only want to get a new overall score of
the time-explicit inventory and don't care about the timing of the emissions.
This saves time and memory.
Parameters
----------
build_dynamic_biosphere: bool
if True, build the dynamic biosphere matrix and calculate the dynamic LCI.
Default is True.
expand_technosphere: bool
if True, creates an expanded time-explicit technosphere and biosphere matrix and
calculates the LCI from it.
if False, creates no new technosphere, but calculates the dynamic inventory directly
from the timeline. Building from the timeline currently only works if
`build_dynamic_biosphere` is also True.
Returns
-------
None
calls LCI calculations from bw2calc and calculates the dynamic inventory, if
`build_dynamic_biosphere` is True.
See also
--------
build_datapackage:
Method to create the datapackages that contain the modifications
to the technosphere and biosphere matrix using the `MatrixModifier` class.
calculate_dynamic_inventory:
Method to calculate the dynamic inventory if `build_dynamic_biosphere` is True.
"""
LCIInputs(
build_dynamic_biosphere=build_dynamic_biosphere,
expand_technosphere=expand_technosphere,
)
if hasattr(self, "dynamic_inventory"):
del self.dynamic_inventory
# Reset per-call; decided downstream based on cache state and the
# pending-solve count.
self._lci_did_factorize = False
# Whether the initial fu solve was restored from `LCI_SOLVE_CACHE`
# instead of running `lci_calculation`.
self._lci_used_cached_solve = False
# Whether `redo_lci(self.fu)` was called at the end to restore the
# fu inventory. Only needed when the build actually ran any
# `redo_lci` during background unit-LCI solves.
self._lci_did_reset = False
# Number of background unit-LCI solves the most recent
# build_dynamic_biosphere_matrix had to perform (cache misses).
# Set by calculate_dynamic_inventory.
self._lci_pending_solves = 0
if not hasattr(self, "timeline"):
raise AttributeError(
"Timeline not yet built. Call TimexLCA.build_timeline() first."
)
# mapping of the demand id to demand time
self.demand_timing = self.create_demand_timing()
self.fu, self.data_objs, self.remapping = self.prepare_bw_timex_inputs(
demand=self.demand,
method=self.method,
)
if expand_technosphere:
logger.info("Expanding matrices...")
self.datapackage = self.build_datapackage()
data_obs = self.data_objs + self.datapackage
self.expanded_technosphere = True # set flag for later static lcia usage
else: # setup for timeline approach
logger.warning(
"Building the dynamic inventory directly from the timeline. This feature is under development.\
Use at your own risk... and check your results! Disaggregated lci is not yet implemented."
)
self.collect_temporalized_processes_from_timeline()
data_obs = self.data_objs
self.expanded_technosphere = False # set flag for later lcia usage
self.lca = LCA(
self.fu,
data_objs=data_obs,
remapping_dicts=self.remapping,
)
logger.info("Calculating dynamic inventory...")
if not build_dynamic_biosphere:
self.lca.lci()
else: # building dynamic biosphere
if expand_technosphere:
# Build matrices and dicts without solving; the fu solve
# is decided next based on actual pending background solves
# and the module-level solve cache.
self.lca.load_lci_data()
self.lca.build_demand_array()
# Placeholder so the shadow builder's __init__ can read
# supply_array; replaced with the real (cached or freshly
# solved) array below before any consumer uses it.
self.lca.supply_array = np.zeros(
self.lca.technosphere_matrix.shape[0]
)
shadow = DynamicBiosphereBuilder(
self.lca,
self.activity_time_mapping,
TimeMappingDict(start_id=0),
self.demand_timing,
self.node_collections,
self.temporal_grouping,
self.database_dates,
self.database_dates_static,
self.timeline,
self.interdatabase_activity_mapping,
expand_technosphere=True,
background_unit_lci_cache=self._background_unit_lci_cache,
)
pending = shadow.count_pending_background_solves()
self._lci_pending_solves = pending
solve_key = self._solve_cache_key(expand_technosphere=True)
if pending == 0 and solve_key in LCI_SOLVE_CACHE:
# Fully warm: skip the ~1.4 s fu solve and the trailing
# reset; the cached supply_array & inventory cover both.
supply, inventory = LCI_SOLVE_CACHE[solve_key]
self.lca.supply_array = supply
self.lca.inventory = inventory
self._lci_used_cached_solve = True
elif pending + 1 >= FACTORIZE_SOLVES_THRESHOLD:
# Many redo_lci's coming; pay the factorize once.
self.lca.lci(factorize=True)
LCI_SOLVE_CACHE[solve_key] = (
self.lca.supply_array.copy(),
self.lca.inventory.copy(),
)
self._lci_did_factorize = True
else:
# Few or no redo_lci's; single solve, no factorize.
self.lca.lci_calculation()
LCI_SOLVE_CACHE[solve_key] = (
self.lca.supply_array.copy(),
self.lca.inventory.copy(),
)
self.calculate_dynamic_inventory(expand_technosphere=True)
# Restore the fu inventory only if the build actually mutated
# `self.lca.inventory` via at least one `redo_lci` call
# (i.e. there was a cache miss). With everything cached, the
# inventory is still the fu solve from above.
if self._lci_pending_solves > 0:
self.lca.redo_lci(self.fu)
self._lci_did_reset = True
else:
self.calculate_dynamic_inventory(expand_technosphere=False)
[docs]
def disaggregate_background_lci(self) -> None:
"""
This method disaggregates the background LCI's of the temporal markets.
The disaggregated background LCI's allow a contribution analysis on the
orginal inventory level as compared to the aggregated temporal market emissions.
Parameters
----------
None
Returns
-------
None
Stores the disaggregated background inventory in the attribute
`dynamic_inventory_disaggregated` as a matrix and in `dynamic_inventory_disaggregated_df`
as a DataFrame.
"""
if not hasattr(self, "dynamic_inventory"):
raise AttributeError(
"Dynamic lci not yet calculated. Call TimexLCA.lci(build_dynamic_biosphere=True) first."
)
if not self.expanded_technosphere:
raise NotImplementedError(
"Currently the disaggregation of background processes is only possible\n\
if the expanded matrix has been built. Please call TimexLCA.lci(expand_technosphere=True) first."
)
# create array_dict for fast lookup
# (key becomes index, value becomes value of 1D array)
bio_dict_array = np.zeros(
max(self.lca.dicts.biosphere.reversed.keys()) + 1, dtype=int
)
for key, value in self.lca.dicts.biosphere.reversed.items():
bio_dict_array[key] = value
# create biosphere_time_mapping_int for fast lookup
biosphere_time_mapping_int = {
(key[0], key[1].astype("int64")): value
for key, value in self.biosphere_time_mapping.items()
}
self.dynamic_inventory_disaggregated = self.dynamic_inventory.tocsc()
# 1) set all temporal market emissions to zero
for col in self.dynamic_biosphere_builder.temporal_market_cols:
self.dynamic_inventory_disaggregated.data[
self.dynamic_inventory_disaggregated.indptr[
col
] : self.dynamic_inventory_disaggregated.indptr[col + 1]
] = 0
self.dynamic_inventory_disaggregated.eliminate_zeros()
# 2) add all background inventory to the dynamic inventory for all temporal markets
self.dynamic_inventory_disaggregated = (
self.dynamic_inventory_disaggregated.tocoo()
)
dynamic_inv_row_ids = self.dynamic_inventory_disaggregated.row.tolist()
dynamic_inv_col_ids = self.dynamic_inventory_disaggregated.col.tolist()
dynamic_inv_data = self.dynamic_inventory_disaggregated.data.tolist()
for id_, lci in self.temporal_market_lcis.items():
((_, _), time) = self.activity_time_mapping.reversed[
id_
] # time of temporal market
time_in_datetime = convert_date_string_to_datetime(
self.temporal_grouping, str(time)
)
time_int = (
np.datetime64(time_in_datetime).astype("datetime64[s]").astype("int64")
) # now time is a int64 in secs
lci = lci.tocoo()
# create list of tuples for fast lookup
time_array = np.ones(len(lci.row), dtype="int64") * time_int
list_of_tuples = list(zip(bio_dict_array[lci.row], time_array))
new_rows = [biosphere_time_mapping_int[x] for x in list_of_tuples]
dynamic_inv_row_ids.extend(new_rows)
dynamic_inv_col_ids.extend(lci.col)
dynamic_inv_data.extend(lci.data)
# construct the new dynamic inventory including background inventory instead of aggregated temporal market emissions
dynamic_inventory_disaggregated = sparse.coo_matrix(
(dynamic_inv_data, (dynamic_inv_row_ids, dynamic_inv_col_ids)),
shape=self.dynamic_inventory_disaggregated.shape,
)
self.dynamic_inventory_disaggregated = dynamic_inventory_disaggregated.tocsr()
self.dynamic_inventory_disaggregated_df = (
self.create_dynamic_inventory_dataframe(use_disaggregated_lci=True)
)
self._dynamic_lcia_inventory_cache.clear()
[docs]
def static_lcia(self) -> None:
"""
Calculates static LCIA using time-explicit LCIs with the standard static characterization
factors of the selected LCIA method using `bw2calc.lcia()`.
Parameters
----------
None
Returns
-------
None
Stores the static score in the attribute `static_score`.
"""
if not hasattr(self, "lca"):
raise AttributeError("LCI not yet calculated. Call TimexLCA.lci() first.")
if not self.expanded_technosphere:
raise ValueError(
"Currently the static lcia score can only be calculated if the expanded matrix has \
been built. Please call TimexLCA.lci(expand_technosphere=True) first."
)
self.lca.lcia()
[docs]
def dynamic_lcia(
self,
metric: str = "radiative_forcing",
time_horizon: int = 100,
fixed_time_horizon: bool = False,
time_horizon_start: datetime = None,
characterization_functions: dict = None,
characterization_function_co2: dict = None,
use_disaggregated_lci: bool = False,
) -> pd.DataFrame:
"""
Calculates dynamic LCIA with the `DynamicCharacterization` class using the dynamic inventory
and dynamic characterization functions. Dynamic characterization is handled by the separate
package `dynamic_characterization` (https://dynamic-characterization.readthedocs.io).
Dynamic characterization functions in the form of a dictionary {biosphere_flow_database_id:
characterization_function} can be given by the user.
If none are given, a set of default dynamic characterization functions based on IPCC AR6 are
provided from `dynamic_characterization` package. These are mapped to the biosphere3 flows
of the chosen static climate change impact category. If there is no characterization
function for a biosphere flow, it will be ignored.
Two dynamic climate change metrics are supported: "GWP" and "radiative_forcing".
The time horizon for the impact assessment can be set with the `time_horizon` parameter,
defaulting to 100 years. The `fixed_time_horizon` parameter determines whether the emission
time horizon for all emissions is calculated from a specific starting point `time_horizon_start`
(`fixed_time_horizon=True`) or from the time of the emission (`fixed_time_horizon=False`).
The former is the implementation of the Levasseur approach
(see https://doi.org/10.1021/es9030003), while the latter is how conventional LCA is done.
Parameters
----------
metric : str, optional
the metric for which the dynamic LCIA should be calculated. Default is
"radiative_forcing". Available: "GWP" and "radiative_forcing"
time_horizon: int, optional
the time horizon for the impact assessment. Unit is years. Default is 100.
fixed_time_horizon: bool, optional
Whether the emission time horizon for all emissions is calculated from the functional
unit (fixed_time_horizon=True) or from the time of the emission
(fixed_time_horizon=False). Default is False.
time_horizon_start: pd.Timestamp, optional
The starting timestamp of the time horizon for the dynamic characterization. Only needed
for fixed time horizons. Default is datetime.now().
characterization_functions: dict, optional
Dict of the form {biosphere_flow_database_id: characterization_function}. Default is
None, which triggers the use of the provided dynamic characterization functions based on
IPCC AR6 Chapter 7.
characterization_function_co2: Callable, optional
Characterization function for CO2 emissions. Necessary if GWP metric is chosen. Default
is None, which triggers the use of the provided dynamic characterization function of CO2
based on IPCC AR6 Chapter 7.
use_disaggregated_lci: bool, optional
Whether to use the disaggregated background LCI for the dynamic LCIA. Default is False.
Use True if you want to perform a contribution analysis on the disaggregated background.
Returns
-------
pandas.DataFrame
A DataFrame with the characterized inventory for the chosen metric and parameters.
See also
--------
dynamic_characterization: Package handling the dynamic characterization: https://dynamic-characterization.readthedocs.io/en/latest/
"""
DynamicLCIAInputs(
metric=metric,
time_horizon=time_horizon,
fixed_time_horizon=fixed_time_horizon,
time_horizon_start=time_horizon_start,
characterization_functions=characterization_functions,
characterization_function_co2=characterization_function_co2,
use_disaggregated_lci=use_disaggregated_lci,
)
if not hasattr(self, "dynamic_inventory"):
raise AttributeError(
"Dynamic lci not yet calculated. Call TimexLCA.lci(build_dynamic_biosphere=True) first."
)
self.current_metric = metric
self.current_time_horizon = time_horizon
if use_disaggregated_lci:
if not self.expanded_technosphere:
raise NotImplementedError(
"Currently the disaggregation of background processes is only possible if the \
expanded matrix has been built. Please call TimexLCA.lci(expand_technosphere=True) first."
)
# Check if disaggregated inventory is available
# otherwise disaggregate the background LCI
if not hasattr(self, "dynamic_inventory_disaggregated"):
logger.info("Disaggregating background LCI...")
self.disaggregate_background_lci()
logger.info("Background LCI's disaggregated.")
dynamic_inventory_df = self.dynamic_inventory_disaggregated_df
else:
dynamic_inventory_df = self.dynamic_inventory_df
cache_key = ("disaggregated" if use_disaggregated_lci else "aggregate")
if cache_key not in self._dynamic_lcia_inventory_cache:
inventory_rounded = dynamic_inventory_df.copy()
inventory_rounded.date = round_datetime_series_to_year(
inventory_rounded.date
)
self._dynamic_lcia_inventory_cache[cache_key] = (
inventory_rounded.groupby(inventory_rounded.columns.tolist())
.sum()
.reset_index()
)
# Set a default for inventory_in_time_horizon using the full dynamic_inventory_df
inventory_in_time_horizon = self._dynamic_lcia_inventory_cache[cache_key]
# Calculate the latest considered impact date
t0_date = pd.Timestamp(self.timeline_builder.edge_extractor.t0.date[0])
latest_considered_impact = t0_date + pd.DateOffset(years=time_horizon)
# Update inventory_in_time_horizon if a fixed time horizon is used
if fixed_time_horizon:
last_emission = dynamic_inventory_df.date.max()
if latest_considered_impact < last_emission:
logger.warning(
"An emission occurs outside of the specified time horizon and will not be \
characterized. Please make sure this is intended."
)
inventory_in_time_horizon = dynamic_inventory_df[
dynamic_inventory_df.date <= latest_considered_impact
]
if not time_horizon_start:
time_horizon_start = t0_date
self.characterized_inventory = characterize(
dynamic_inventory_df=inventory_in_time_horizon,
metric=metric,
characterization_functions=characterization_functions,
base_lcia_method=self.method,
time_horizon=time_horizon,
fixed_time_horizon=fixed_time_horizon,
time_horizon_start=time_horizon_start,
characterization_function_co2=characterization_function_co2,
)
return self.characterized_inventory
###################
# Core properties #
###################
@property
[docs]
def base_score(self) -> float:
"""
Score of the base LCA, i.e., the "normal" LCA without time-explicit information.
Same as bw2calc.LCA.score
"""
return self.base_lca.score
@property
[docs]
def static_score(self) -> float:
"""
Score resulting from the static LCIA of the time-explicit inventory.
"""
if not hasattr(self, "lca"):
raise AttributeError("LCI not yet calculated. Call TimexLCA.lci() first.")
return self.lca.score
@property
[docs]
def dynamic_score(self) -> float:
"""
Score resulting from the dynamic LCIA of the time-explicit inventory.
"""
if not hasattr(self, "characterized_inventory"):
raise AttributeError(
"Characterized inventory not yet calculated. Call TimexLCA.dynamic_lcia() first."
)
return self.characterized_inventory["amount"].sum()
###############################################
# Other core functions for the inner workings #
###############################################
[docs]
def build_datapackage(self) -> list:
"""
Creates the datapackages that contain the modifications to the technosphere and biosphere
matrix using the `MatrixModifier` class.
Parameters
----------
None
Returns
-------
list
List of datapackages that contain the modifications to the technosphere and biosphere
matrix
See also
--------
bw_timex.matrix_modifier.MatrixModifier: Class that handles the technosphere and biosphere matrix modifications.
"""
self.matrix_modifier = MatrixModifier(
self.timeline,
self.database_dates_static,
self.demand_timing,
self.nodes,
self.interdatabase_activity_mapping,
)
self.node_collections["temporal_markets"] = (
self.matrix_modifier.temporal_market_ids
)
self.node_collections["temporalized_processes"] = (
self.matrix_modifier.temporalized_process_ids
)
return self.matrix_modifier.create_datapackage()
[docs]
def _solve_cache_key(self, expand_technosphere: bool) -> tuple:
"""Fingerprint identifying a unique fu solve on this scenario.
Reuse from ``LCI_SOLVE_CACHE`` is only safe when the consuming
scenario produces *identical* technosphere/biosphere matrices and
demand RHS. We hash the matrix data directly so any change in the
timeline (different relinking, different temporal_grouping, etc.)
produces a different key — `len(activity_time_mapping)` alone
collides for differently-relinked scenarios of equal size.
"""
import hashlib
T = self.lca.technosphere_matrix
B = self.lca.biosphere_matrix
d = self.lca.demand_array
tech_hash = hashlib.md5(
T.data.tobytes() + T.indices.tobytes() + T.indptr.tobytes()
).digest()
bio_hash = hashlib.md5(
B.data.tobytes() + B.indices.tobytes() + B.indptr.tobytes()
).digest()
demand_hash = hashlib.md5(np.asarray(d).tobytes()).digest()
return (
bd.projects.current,
bool(expand_technosphere),
T.shape,
B.shape,
tech_hash,
bio_hash,
demand_hash,
)
[docs]
def _has_matching_cached_unit_lcis(self, expand_technosphere: bool) -> bool:
"""Whether the cache contains entries for any of this scenario's databases.
Cache keys are ``("db_code", db, code, modified)`` (structure-
independent triplet form); a hit on any background database in
``self.database_dates`` with a matching ``modified`` token means
the build will likely reuse cached unit LCIs.
"""
project = bd.projects.current
db_versions = {
db: bd.databases[db].get("modified") if db in bd.databases else None
for db in self.database_dates
}
for key in self._background_unit_lci_cache:
if not (
isinstance(key, tuple) and len(key) >= 5 and key[0] == "db_code"
):
continue
key_project, db, _code, modified = key[1], key[2], key[3], key[4]
if (
key_project == project
and db in db_versions
and db_versions[db] == modified
):
return True
return False
[docs]
def calculate_dynamic_inventory(
self,
expand_technosphere=True,
) -> None:
"""
Calculates the dynamic inventory, by first creating a dynamic biosphere matrix using the
`DynamicBiosphereBuilder` class and then multiplying it with the dynamic supply array. The
dynamic inventory matrix is stored in the attribute `dynamic_inventory`. It is also
converted to a DataFrame and stored in the attribute `dynamic_inventory_df`.
Parameters
----------
expand_technosphere: bool
A boolean indicating if the dynamic biosphere matrix is built directly from the expanded
matrices or from the timeline. Default is True (from expanded matrices).
Returns
-------
None
calculates the dynamic inventory and stores it in the attribute
`dynamic_inventory` as a matrix and in `dynamic_inventory_df` as a DataFrame.
Also calculates and stores the lci of the temporal markets in the attribute
self.temporal_market_lcis for use in contribution analysis of the background processes.
See also
--------
bw_timex.dynamic_biosphere_builder.DynamicBiosphereBuilder: Class for creating the dynamic biosphere matrix and inventory.
"""
if not hasattr(self, "lca"):
raise AttributeError(
"Time-explicit LCA object does not exist. Call TimexLCA.lci() first."
)
self.biosphere_time_mapping = TimeMappingDict(start_id=0)
self.dynamic_biosphere_builder = DynamicBiosphereBuilder(
self.lca,
self.activity_time_mapping,
self.biosphere_time_mapping,
self.demand_timing,
self.node_collections,
self.temporal_grouping,
self.database_dates,
self.database_dates_static,
self.timeline,
self.interdatabase_activity_mapping,
expand_technosphere=expand_technosphere,
background_unit_lci_cache=self._background_unit_lci_cache,
)
# The pending-solve count + factorize decision is now made upfront
# by `lci()`. We trust `self._lci_pending_solves` / `_lci_did_factorize`
# set there. Calling `calculate_dynamic_inventory` directly without
# going through `lci()` will skip those optimizations but still
# produce correct results.
self.dynamic_biosphere_matrix, self.temporal_market_lcis = (
self.dynamic_biosphere_builder.build_dynamic_biosphere_matrix(
expand_technosphere=expand_technosphere,
)
)
# Build the dynamic inventory
count = len(self.dynamic_biosphere_builder.dynamic_supply_array)
# diagonalization of supply array keeps the dimension of the process, which we want to pass
# as additional information to the dynamic inventory dict
diagonal_supply_array = sparse.spdiags(
[self.dynamic_biosphere_builder.dynamic_supply_array], [0], count, count
)
self.dynamic_inventory = self.dynamic_biosphere_matrix @ diagonal_supply_array
self.dynamic_inventory_df = self.create_dynamic_inventory_dataframe(
expand_technosphere
)
self._dynamic_lcia_inventory_cache.clear()
[docs]
def create_dynamic_inventory_dataframe(
self,
expand_technosphere=True,
use_disaggregated_lci=False,
) -> pd.DataFrame:
"""
Brings the dynamic inventory from its matrix form in `dynamic_inventory` into the
format of a pandas.DataFrame, with the right structure to later apply dynamic
characterization functions.
Format is:
+------------+--------+------+----------+
| date | amount | flow | activity |
+============+========+======+==========+
| datetime | 33 | 1 | 2 |
+------------+--------+------+----------+
| datetime | 32 | 1 | 2 |
+------------+--------+------+----------+
| datetime | 31 | 1 | 2 |
+------------+--------+------+----------+
- date: datetime, e.g. '2024-01-01 00:00:00'
- flow: flow id
- activity: activity id
Parameters
----------
expand_technosphere: bool
A boolean indicating if the dynamic biosphere matrix is built directly from the
expanded matrices or from the timeline. Default is True.
Returns
-------
pandas.DataFrame, dynamic inventory in DataFrame format
"""
dynamic_inventory = (
self.dynamic_inventory_disaggregated
if use_disaggregated_lci
else self.dynamic_inventory
)
dynamic_inventory = dynamic_inventory.tocoo()
if expand_technosphere:
activities = [
self.lca.activity_dict.reversed[col] for col in dynamic_inventory.col
]
else:
timeline_tmap = self.timeline["time_mapped_producer"].to_numpy()
activities = timeline_tmap[dynamic_inventory.col].tolist()
rows_resolved = [
self.biosphere_time_mapping.reversed[row] for row in dynamic_inventory.row
]
dates = [item[1] for item in rows_resolved]
flows = [item[0] for item in rows_resolved]
df = pd.DataFrame(
{
"date": dates,
"amount": dynamic_inventory.data,
"flow": flows,
"activity": activities,
}
)
df.date = df.date.astype("datetime64[s]")
return df.sort_values(by=["date", "amount"], ascending=[True, False])
#############
# For setup #
#############
[docs]
def create_node_collections(self) -> None:
"""
Creates a dict of collections of nodes that will be useful down the line, e.g. to determine
static nodes for the graph traversal or create the dynamic biosphere matrix.
Available collections are:
- ``background``: set of node ids of all processes that depend on the demand processes and are in the background databases
- ``foreground``: set of node ids of all processes that are not in the background databases
- ``first_level_background_static``: set of node ids of all processes that are in the background databases and are directly linked to the demand processes
Parameters
----------
None
Returns
-------
None
adds the `node_collections containing` the above-mentioned collections,
as well as interdatabase_activity_mapping
"""
self.node_collections = {}
# Original variable names preserved, set types for performance and uniqueness
demand_database_names = {
db
for db in self.database_dates.keys()
if db not in self.database_dates_static.keys()
}
demand_dependent_database_names = set()
demand_dependent_database_names.update(demand_database_names)
for db in demand_database_names:
demand_dependent_database_names.update(
bd.Database(db).find_graph_dependents()
)
demand_dependent_background_database_names = (
demand_dependent_database_names & self.database_dates_static.keys()
)
background = {
node.id
for db in demand_dependent_background_database_names
for node in bd.Database(db)
}
self.node_collections["background"] = background
first_level_background_static = set()
foreground = set()
for db_name in demand_database_names:
for node in bd.Database(db_name):
foreground.add(node.id)
for exc in node.technosphere():
if (
exc.input["database"]
in demand_dependent_background_database_names
):
first_level_background_static.add(exc.input.id)
for exc in node.substitution():
if (
exc.input["database"]
in demand_dependent_background_database_names
):
first_level_background_static.add(exc.input.id)
self.node_collections["foreground"] = foreground
self.node_collections["first_level_background_static"] = (
first_level_background_static
)
[docs]
def add_interdatabase_activity_mapping_from_timeline(self) -> None:
"""
Fills the interdatabase_activity_mapping, which is a SetList of the matching processes
across background databases in the format of {(id, database_name_1), (id, database_name_2)}
with only those activities and background databases that are actually mapped in the
timeline.
Parameters
----------
None
Returns
-------
None
Adds the ids of producers in other background databases
(only those interpolated to in the timeline) to the `interdatabase_activity_mapping`.
"""
if not hasattr(self, "timeline"):
raise AttributeError(
"Timeline not yet built. Call TimexLCA.build_timeline() first."
)
filtered_timeline = self.timeline.loc[
self.timeline.temporal_market_shares.notnull()
]
unique_producers = filtered_timeline.producer.unique()
self.interdatabase_activity_mapping.update(
{producer: {} for producer in unique_producers}
)
producer_tuples_dict = {}
for producer in unique_producers:
producer_node = self.nodes[producer]
producer_tuples_dict[
(
producer_node["name"],
producer_node.get("reference product"),
producer_node["location"],
)
] = producer
unique_produces_tuples = producer_tuples_dict.keys()
for node in self.nodes.values():
node_tuple = (node["name"], node.get("reference product"), node["location"])
if node_tuple in unique_produces_tuples:
producer_id = producer_tuples_dict[node_tuple]
self.interdatabase_activity_mapping[producer_id][
node["database"]
] = node.id
self.interdatabase_activity_mapping.make_reciprocal()
[docs]
def collect_temporalized_processes_from_timeline(self) -> None:
"""
Prepares the input for the LCA from the timeline.
Parameters
----------
None
Returns
-------
None
Adds "temporal_markets" and "temporalized_processes" to the
node_collections based on the timeline.
"""
unique_producers = (
self.timeline.groupby(["producer", "time_mapped_producer"])
.count()
.index.values
)
temporal_market_ids = set()
temporalized_process_ids = set()
for producer, time_mapped_producer in unique_producers:
if self.nodes[producer]["database"] in self.database_dates_static.keys():
temporal_market_ids.add(time_mapped_producer)
else:
temporalized_process_ids.add(time_mapped_producer)
self.node_collections["temporal_markets"] = temporal_market_ids
self.node_collections["temporalized_processes"] = temporalized_process_ids
[docs]
def add_static_activities_to_activity_time_mapping(self) -> None:
"""
Adds all activities from the static LCA to `activity_time_mapping`, an instance of
`TimeMappingDict`. This gives a unique mapping in the form of
(('database', 'code'), datetime_as_integer): time_mapping_id) that is later used to uniquely
identify time-resolved processes. Here, the activity_time_mapping is the
pre-population with the static activities. The time-explicit activities (from other
temporalized background databases) are added later on by the TimelineBuilder.
Activities in the foreground database are mapped with
(('database', 'code'), "dynamic"): time_mapping_id)" as their timing is not yet known.
Parameters
----------
None
Returns
-------
None
adds the static activities to the `activity_time_mapping`
"""
static_db_time_mapping = {
db: extract_date_as_integer(time, self.temporal_grouping)
for db, time in self.database_dates.items()
if isinstance(time, datetime)
}
dynamic_db_time_mapping = {
db: time for db, time in self.database_dates.items() if isinstance(time, str)
}
for idx in self.base_lca.dicts.activity.keys(): # activity ids
key = self.base_lca.remapping_dicts["activity"][idx] # ('database', 'code')
db_name = key[0]
if db_name in dynamic_db_time_mapping:
self.activity_time_mapping.add(
(key, dynamic_db_time_mapping[db_name]), unique_id=idx
)
elif db_name in static_db_time_mapping:
self.activity_time_mapping.add(
(key, static_db_time_mapping[db_name]), unique_id=idx
)
else:
raise ValueError(f"Time of activity {key} is neither datetime nor str.")
[docs]
def create_demand_timing(self) -> dict:
"""
Generate a dictionary that maps producer (key) to timing (value) for the demands in the
product system. It searches the timeline for those rows that contain the functional units
(demand-processes as producer and -1 as consumer) and returns the time of the demand as an
integer. Time of demand can have flexible resolution (year=YYYY, month=YYYYMM, day=YYYYMMDD,
hour=YYYYMMDDHH) defined in `temporal_grouping`.
Parameters
----------
None
Returns
-------
dict
Dictionary mapping producer ids to reference timing for the specified demands.
"""
demand_ids = [bd.get_id(key) for key in self.demand.keys()]
demand_rows = self.timeline[
self.timeline["producer"].isin(demand_ids)
& (self.timeline["consumer"] == -1)
]
missing_demand_ids = sorted(set(demand_ids) - set(demand_rows["producer"]))
if missing_demand_ids:
functional_unit_producers = sorted(
set(self.timeline.loc[self.timeline["consumer"] == -1, "producer"])
)
raise ValueError(
"Could not find functional-unit timing rows for demand id(s) "
f"{missing_demand_ids}. The existing timeline contains functional-unit "
f"producer id(s) {functional_unit_producers}. This usually means the "
"Brightway database or demand nodes changed after build_timeline(); "
"recreate the TimexLCA object and rebuild the timeline before calling lci()."
)
self.demand_timing = {
row.producer: row.hash_producer for row in demand_rows.itertuples()
}
return self.demand_timing
######################################
# For creating human-friendly output #
######################################
[docs]
def create_labelled_technosphere_dataframe(self) -> pd.DataFrame:
"""
Returns the technosphere matrix as a dataframe with comprehensible labels instead of ids.
Parameters
----------
None
Returns
-------
pd.DataFrame
technosphere matrix as a pandas.DataFrame with comprehensible labels instead
of ids.
"""
df = pd.DataFrame(self.lca.technosphere_matrix.toarray())
df.rename( # from matrix id to activity id
index=self.lca.dicts.activity.reversed,
columns=self.lca.dicts.activity.reversed,
inplace=True,
)
df.rename( # from activity id to ((database, code), time)
index=self.activity_time_mapping.reversed,
columns=self.activity_time_mapping.reversed,
inplace=True,
)
return df
[docs]
def create_labelled_biosphere_dataframe(self) -> pd.DataFrame:
"""
Returns the biosphere matrix as a pandas.DataFrame with comprehensible labels instead of ids.
Parameters
----------
None
Returns
-------
pd.DataFrame
biosphere matrix as a pandas.DataFrame with comprehensible labels instead of
ids.
"""
df = pd.DataFrame(self.lca.biosphere_matrix.toarray())
df.rename( # from matrix id to activity id
index=self.lca.dicts.biosphere.reversed,
columns=self.lca.dicts.activity.reversed,
inplace=True,
)
df.rename(
index=self.lca.remapping_dicts[
"biosphere"
], # from activity id to bioflow name
columns=self.activity_time_mapping.reversed, # id to ((database, code), time)
inplace=True,
)
return df
[docs]
def create_labelled_dynamic_biosphere_dataframe(self) -> pd.DataFrame:
"""
Returns the dynamic biosphere matrix as a dataframe with comprehensible labels instead of
ids.
Parameters
----------
None
Returns
-------
pd.DataFrame
dynamic biosphere matrix as a pandas.DataFrame with comprehensible labels
instead of ids.
"""
df = pd.DataFrame(self.dynamic_biosphere_matrix.toarray())
df.rename( # from matrix id to activity id
index=self.biosphere_time_mapping.reversed,
columns=self.lca.dicts.activity.reversed,
inplace=True,
)
df.rename( # from activity id to ((database, code), time)
columns=self.activity_time_mapping.reversed,
inplace=True,
)
df = df.loc[(df != 0).any(axis=1)] # For readablity, remove all-zero rows
return df
[docs]
def get_activity_name_from_time_mapped_id(self, time_mapped_id: int) -> str:
"""
Get the activity name for a time-mapped activity ID.
Uses the pre-built code-to-name cache for efficient lookups.
Parameters
----------
time_mapped_id : int
The time-mapped activity ID from activity_time_mapping
Returns
-------
str
The name of the activity
"""
# Extract the code from the activity_time_mapping
# Structure: time_mapped_id -> (('database', 'code'), time)
((_, code), _) = self.activity_time_mapping.reversed[time_mapped_id]
# Use the pre-built cache for O(1) lookup instead of database query
return self._activity_code_to_name_cache.get(code, code)
[docs]
def create_labelled_dynamic_inventory_dataframe(self) -> pd.DataFrame:
"""
Returns the dynamic_inventory_df with comprehensible labels for flows and activities instead
of ids.
Parameters
----------
None
Returns
-------
pd.DataFrame
dynamic inventory matrix as a pandas.DataFrame with comprehensible labels
instead of ids.
"""
if not hasattr(self, "dynamic_inventory_df"):
raise AttributeError(
"Dynamic inventory not yet calculated. Call \
TimexLCA.lci(build_dynamic_biosphere=True) first."
)
df = self.dynamic_inventory_df.copy()
df["flow"] = df["flow"].apply(lambda x: bd.get_node(id=x)["name"])
# Build activity name cache efficiently using pre-built code-to-name cache
activity_name_cache = {
activity: self.get_activity_name_from_time_mapped_id(activity)
for activity in df["activity"].unique()
}
df["activity"] = df["activity"].map(activity_name_cache)
return df
[docs]
def plot_dynamic_inventory(self, bio_flows, cumulative=False) -> None:
"""
Simple plot of dynamic inventory of a biosphere flow over time, with optional cumulative
plotting.
Parameters
----------
bio_flows : list of int
database ids of the biosphere flows to plot.
cumulative : bool
if True, plot cumulative amounts over time
Returns
-------
None
shows a plot
"""
PlotDynamicInventoryInputs(bio_flows=bio_flows, cumulative=cumulative)
plt.figure(figsize=(14, 6))
filtered_df = self.dynamic_inventory_df[
self.dynamic_inventory_df["flow"].isin(bio_flows)
]
aggregated_df = filtered_df.groupby("date").sum()["amount"].reset_index()
if cumulative:
aggregated_df["amount"] = np.cumsum(aggregated_df["amount"])
plt.plot(
aggregated_df["date"], aggregated_df["amount"], marker="o", linestyle="none"
)
plt.ylim(bottom=0)
plt.xlabel("time")
plt.ylabel("amount [kg]")
plt.grid(True)
plt.tight_layout() # Adjust layout to make room for the rotated date labels
plt.show()
[docs]
def plot_dynamic_characterized_inventory(
self,
cumsum: bool = False,
sum_emissions_within_activity: bool = False,
sum_activities: bool = False,
) -> None:
"""
Plot the characterized inventory of the dynamic LCI in a very simple plot.
Legend and title are selected automatically based on the chosen metric.
Parameters
----------
cumsum : bool
if True, plot cumulative amounts over time
sum_emissions_within_activity : bool
if True, sum emissions within each activity over time
sum_activities : bool
if True, sum emissions over all activities over time
Returns
-------
None
shows a plot
"""
if not hasattr(self, "characterized_inventory"):
raise AttributeError(
"Characterized inventory not yet calculated. Call TimexLCA.dynamic_lcia() first."
)
metric_ylabels = {
"radiative_forcing": "radiative forcing [W/m²]",
"GWP": f"GWP{self.current_time_horizon} [kg CO₂-eq]",
}
# Fetch the inventory to use in plotting, modify based on flags
plot_data = self.characterized_inventory.copy()
if cumsum:
plot_data["amount_sum"] = plot_data["amount"].cumsum()
amount = "amount_sum"
else:
amount = "amount"
if sum_emissions_within_activity:
plot_data = plot_data.groupby(["date", "activity"]).sum().reset_index()
plot_data["amount_sum"] = plot_data["amount"].cumsum()
if sum_activities:
plot_data = plot_data.groupby("date").sum().reset_index()
plot_data["amount_sum"] = plot_data["amount"].cumsum()
plot_data["activity_label"] = "All activities"
else: # plotting activities separate
# Build activity name cache efficiently using pre-built code-to-name cache
activity_name_cache = {
activity: self.get_activity_name_from_time_mapped_id(activity)
for activity in plot_data["activity"].unique()
}
plot_data["activity_label"] = plot_data["activity"].map(activity_name_cache)
# Plotting
plt.figure(figsize=(14, 6))
axes = sb.scatterplot(x="date", y=amount, hue="activity_label", data=plot_data)
# Determine y-axis limit flexibly
if plot_data[amount].min() < 0:
ymin = plot_data[amount].min() * 1.1
else:
ymin = 0
axes.set_axisbelow(True)
axes.set_ylim(bottom=ymin)
axes.set_ylabel(metric_ylabels[self.current_metric])
axes.set_xlabel("time")
handles, labels = axes.get_legend_handles_labels()
axes.legend(handles[::-1], labels[::-1])
plt.grid(True)
plt.show()