Source code for bw_timex.dynamic_biosphere_builder

import bw2data as bd
import numpy as np
import pandas as pd
from bw2calc import LCA
from bw_temporalis import TemporalDistribution
from scipy import sparse as sp

from .helper_classes import SetList
from .utils import convert_date_string_to_datetime, get_temporal_evolution_factor


[docs] class DynamicBiosphereBuilder: """ Class for building a dynamic biosphere matrix with dimensions (biosphere flow at a specific point in time) x (processes) """ def __init__( self, lca_obj: LCA, activity_time_mapping: dict, biosphere_time_mapping: dict, demand_timing: dict, node_collections: dict, temporal_grouping: str, database_dates: dict, database_dates_static: dict, timeline: pd.DataFrame, interdatabase_activity_mapping: SetList, expand_technosphere: bool = True, background_unit_lci_cache: dict | None = None, ) -> None: """ Initializes the DynamicBiosphereBuilder object. Parameters ---------- lca_obj : LCA object instance of the bw2calc LCA class, e.g. TimexLCA.lca activity_time_mapping : dict A dictionary mapping activity to their respective timing in the format (('database', 'code'), datetime_as_integer): time_mapping_id) biosphere_time_mapping : dict A dictionary mapping biosphere flows to their respective timing in the format (('database', 'code'), datetime_as_integer): time_mapping_id), empty at this point. demand_timing : dict A dictionary mapping of the demand to demand time node_collections : dict A dictionary containing lists of node ids for different node subsets temporal_grouping : str A string indicating the temporal grouping of the processes, e.g. 'year', 'month', 'day', 'hour' database_dates : dict A dictionary mapping database names to their respective date database_dates_static : dict A dictionary mapping database names to their respective date, but only containing static databases, which are the background databases. timeline: pd.DataFrame The edge timeline, created from TimexLCA.build_timeline() interdatabase_activity_mapping : SetList A list of sets, where each set contains the activity ids of the same activity in different databases expand_technosphere : bool, optional A boolean indicating if the dynamic biosphere matrix is built via expanded matrices or directly from the timeline. Default is True. Returns ------- None """ self._time_res_mapping = { "year": "datetime64[Y]", "month": "datetime64[M]", "day": "datetime64[D]", "hour": "datetime64[h]", } self.lca_obj = lca_obj # Cached background unit LCIs are stored as structure-independent # triplets (bioflow_id, bg_activity_id, amount); when consumed the # matrix is rebuilt to *this* lca_obj's biosphere/technosphere # index space (see `_rebuild_unit_lci`). self._expand_technosphere = bool(expand_technosphere) if expand_technosphere: self.technosphere_matrix = ( lca_obj.technosphere_matrix.tocsc() ) # convert to csc as this is only used for column slicing self.dynamic_supply_array = lca_obj.supply_array self.activity_dict = lca_obj.dicts.activity else: self.dynamic_supply_array = timeline.amount.values.astype( float ) # get the supply vector directly from the timeline self.activity_time_mapping = activity_time_mapping self.biosphere_time_mapping = biosphere_time_mapping self.demand_timing = demand_timing self.node_collections = node_collections self.time_res = self._time_res_mapping[temporal_grouping] self.temporal_grouping = temporal_grouping self.database_dates = database_dates self.database_dates_static = database_dates_static self.timeline = timeline self.interdatabase_activity_mapping = interdatabase_activity_mapping self._matrix_entries = {} # (row, col) -> amount # Biosphere exchanges of foreground/background producers are read # from the bw2data SQL store; share results across TimexLCA objects. from ._lci_cache import BIOSPHERE_EXCHANGES_CACHE self._activity_biosphere_exchange_cache = BIOSPHERE_EXCHANGES_CACHE # Shared/global cache: only stable ("db_code", ...) keys go here so it # can safely persist across TimexLCA objects. Stored as # structure-independent triplets (bioflow_id, bg_activity_id, amount) # so the same entry can be reused across lca_objs with different # column/row spaces (different timelines, expand modes, etc.). self._background_unit_lci_cache = ( background_unit_lci_cache if background_unit_lci_cache is not None else {} ) # Per-object cache for keys that are NOT stable across TimexLCA objects # (time-mapped activity ids and the per-run "temporalized" database). self._instance_unit_lci_cache = {} # Within-build cache of rebuilt unit-LCI matrices (sized to *this* # lca_obj). Avoids re-materializing the same CSR for repeated calls # within one build_dynamic_biosphere_matrix run. self._rebuilt_unit_lci_cache = {} self.temporal_market_cols = [] # To keep track of temporal market columns
[docs] def build_dynamic_biosphere_matrix( self, expand_technosphere: bool = True, ): """ This function creates a separate biosphere matrix, with the dimensions (bio_flows at a specific time step) x (processes). Every temporally resolved biosphere flow has its own row in the matrix, making it highly sparse. The timing of the emitting process and potential additional temporal information of the biosphere flow (e.g. delay of emission compared to the timing of the process) are considered. Absolute Temporal Distributions for biosphere exchanges are dealt with as a look up function: If an activity happens at timestamp X and the biosphere exchange has an absolute temporal distribution (ATD), it looks up the amount from the ATD corresponding to timestamp X. E.g.: X = 2024, TD=(data=[2020,2021,2022,2023,2024,.....,2120], amount=[3,4,4,5,6,......,3]), it will look up the value 6 corresponding 2024. If timestamp X does not exist, it finds the nearest timestamp available (if two timestamps are equally close, it will take the first in order of appearance (see numpy.argmin() for this behavior). Parameters ---------- expand_technosphere : bool, optional A boolean indicating if the dynamic biosphere matrix is built via expanded matrices or directly from the timeline. Default is via expanded matrices. Returns ------- dynamic_biosphere_matrix : scipy.sparse.csr_matrix A sparse matrix with the dimensions (bio_flows at a specific time step) x (processes). temporal_market_lcis : dict A dictionary containing the disaggregated LCI's of the temporal markets, with the time-mapped-activity id as key. """ temporal_market_lcis = {} for row in self.timeline.itertuples(): idx = row.time_mapped_producer if expand_technosphere: process_col_index = self.activity_dict[ idx ] # get the matrix column index else: # from timeline process_col_index = row.Index # start a new matrix ( (original_db, original_code), time, ) = self.activity_time_mapping.reversed[idx] if idx in self.node_collections["temporalized_processes"]: time_in_datetime = convert_date_string_to_datetime( self.temporal_grouping, str(time) ) # now time is a datetime td_producer = TemporalDistribution( date=np.array([time_in_datetime], dtype=self.time_res), amount=np.array([1]), ).date date = td_producer[0] # Get temporal evolution factor for this timestamp temporal_evolution_factor = 1.0 if hasattr(row, "temporal_evolution") and row.temporal_evolution is not None: temporal_evolution_factor = get_temporal_evolution_factor( row.temporal_evolution, time_in_datetime ) for input_id, exc_amount, temporal_distribution in ( self.get_biosphere_exchanges(original_db, original_code) ): if temporal_distribution: td_dates = temporal_distribution.date td_values = temporal_distribution.amount # If the biosphere flows have an absolute TD, this means we have to look up # the biosphere flow for the activity time (td_producer) if isinstance(td_dates[0], np.datetime64): dates = td_producer # datetime array, same time as producer values = [ exc_amount * temporal_evolution_factor * td_values[ np.argmin( np.abs( td_dates.astype(self.time_res) - td_producer.astype(self.time_res) ) ) ] ] # look up the value correponding to the absolute producer time else: # we can add a datetime of len(1) to a timedelta of len(N) easily dates = td_producer + td_dates values = exc_amount * temporal_evolution_factor * td_values else: # exchange has no TD dates = td_producer # datetime array, same time as producer values = [exc_amount * temporal_evolution_factor] # Add entries to dynamic bio matrix for date, amount in zip(dates, values): # first create a row index for the tuple (bioflow_id, date) time_mapped_matrix_idx = self.biosphere_time_mapping.add( (input_id, date) ) # populate lists with which sparse matrix is constructed self.add_matrix_entry_for_biosphere_flows( row=time_mapped_matrix_idx, col=process_col_index, amount=amount, ) elif idx in self.node_collections["temporal_markets"]: self.temporal_market_cols.append(process_col_index) ( (original_db, original_code), time, ) = self.activity_time_mapping.reversed[idx] if expand_technosphere: demand = self.demand_from_technosphere(idx, process_col_index) else: demand = self.demand_from_timeline(row) if demand: for act, amount in demand.items(): unit_lci = self.get_background_unit_lci(act) # add lci of both background activities of the temporal market and save total lci if idx not in temporal_market_lcis.keys(): temporal_market_lcis[idx] = unit_lci * amount else: temporal_market_lcis[idx] += unit_lci * amount aggregated_inventory = temporal_market_lcis[idx].sum(axis=1) # multiply LCI with supply of temporal market temporal_market_lcis[idx] *= self.dynamic_supply_array[ process_col_index ] for row_idx, amount in enumerate(aggregated_inventory.A1): bioflow = self.lca_obj.dicts.biosphere.reversed[row_idx] ((_, _), time) = self.activity_time_mapping.reversed[idx] time_in_datetime = convert_date_string_to_datetime( self.temporal_grouping, str(time) ) # now time is a datetime td_producer = TemporalDistribution( date=np.array([str(time_in_datetime)], dtype=self.time_res), amount=np.array([1]), ).date date = td_producer[0] time_mapped_matrix_idx = self.biosphere_time_mapping.add( (bioflow, date) ) self.add_matrix_entry_for_biosphere_flows( row=time_mapped_matrix_idx, col=process_col_index, amount=amount, ) # now build the dynamic biosphere matrix if expand_technosphere: ncols = len(self.activity_time_mapping) else: ncols = len(self.timeline) if not self._matrix_entries: return sp.csr_matrix((0, ncols)), temporal_market_lcis rows = [] cols = [] values = [] for (row, col), amount in self._matrix_entries.items(): rows.append(row) cols.append(col) values.append(amount) shape = (max(rows) + 1, ncols) dynamic_biosphere_matrix = sp.coo_matrix( (values, (rows, cols)), shape ) dynamic_biosphere_matrix = dynamic_biosphere_matrix.tocsr() return dynamic_biosphere_matrix, temporal_market_lcis
[docs] def demand_from_timeline(self, row): """ Returns a demand dict directly from the timeline row and its temporal_market_shares. Parameters: ----------- row: pd.Series A row of the timeline DataFrame Returns ------- demand: dict A demand-dictionary with as keys the ids of the time-mapped activities and as values the share. """ demand = {} for db, amount in row.temporal_market_shares.items(): timed_act_id = self.interdatabase_activity_mapping.find_match( row.producer, db ) demand[timed_act_id] = amount return demand
[docs] def demand_from_technosphere(self, idx, process_col_index): """ Returns a demand dict of background processes based on the technosphere column. Foreground exchanges are skipped as these are added separately. Parameters: ----------- idx: int The time-mapped-activity id of the producer process_col_index: int The technosphere matrix id of the producer Returns ------- demand: dict A demand-dictionary with as keys the brightway ids of the consumed background activities and as values their consumed amount. """ col = self.technosphere_matrix[:, process_col_index] # Sparse column activity_row = self.activity_dict[idx] # Producer's row index foreground_nodes = self.node_collections["foreground"] demand = { self.activity_dict.reversed[row_idx]: -amount for row_idx, amount in zip(col.nonzero()[0], col.data) if row_idx != activity_row # Skip production exchange and self.activity_dict.reversed[row_idx] not in foreground_nodes # Only background } return demand
[docs] def add_matrix_entry_for_biosphere_flows(self, row, col, amount): """ Adds an entry to the internal matrix-entry mapping, which is then used to construct the dynamic biosphere matrix. Only unique entries are added, i.e. if the same row and col index already exists, the value is not added again. Parameters ---------- row : int A row index of a new element to the dynamic biosphere matrix col: int A column index of a new element to the dynamic biosphere matrix amount: float The amount of the new element to the dynamic biosphere matrix Returns ------- None the internal matrix-entry mapping is updated """ key = (row, col) if key not in self._matrix_entries: self._matrix_entries[key] = amount
[docs] def get_biosphere_exchanges(self, original_db, original_code): """Return cached biosphere exchanges for a producer. Keyed by the source database's `modified` token so foreground or background edits invalidate stale entries automatically. """ modified = ( bd.databases[original_db].get("modified") if original_db in bd.databases else None ) cache_key = (bd.projects.current, original_db, original_code, modified) if cache_key not in self._activity_biosphere_exchange_cache: if original_db == "temporalized": act = bd.get_node(code=original_code) else: act = bd.get_node(database=original_db, code=original_code) self._activity_biosphere_exchange_cache[cache_key] = [ (exc.input.id, exc["amount"], exc.get("temporal_distribution")) for exc in act.biosphere() ] return self._activity_biosphere_exchange_cache[cache_key]
[docs] def get_background_unit_lci(self, act): """ Return unit background LCI matrix for an activity, cached by process identity. Background activities can occur repeatedly with different exchange amounts. Reusing the unit LCI avoids repeated `redo_lci` solves for equivalent processes. """ cache_key = self.get_background_lci_cache_key(act) # Within this build the rebuilt matrix is stable; reuse it. if cache_key in self._rebuilt_unit_lci_cache: return self._rebuilt_unit_lci_cache[cache_key] # Only stable background-process identities may be reused across # TimexLCA objects; everything else stays in the per-object cache. cache = ( self._background_unit_lci_cache if cache_key[0] == "db_code" else self._instance_unit_lci_cache ) if cache_key not in cache: self.lca_obj.redo_lci({act: 1}) matrix = self.lca_obj.inventory # Snapshot triplets for cross-structure reuse, but keep the # freshly-solved matrix directly — it's already sized to *this* # lca_obj so no rebuild is needed on the miss path. cache[cache_key] = self._inventory_to_triplets(matrix) else: matrix = self._rebuild_unit_lci(cache[cache_key]) self._rebuilt_unit_lci_cache[cache_key] = matrix return matrix
[docs] def _inventory_to_triplets(self, inv): """Convert a CSR inventory matrix to structure-independent triplets. Translates row/col indices into stable bioflow / activity ids via the producing lca_obj's dicts, so the cache entry can be reused by lca_objs with different index spaces. Returns a tuple of three numpy arrays ``(bioflow_ids, activity_ids, values)`` — vectorized for speed (Python-tuple lists cost ~1.3 s on premise/ecoinvent). """ coo = inv.tocoo() bio_arr, act_arr = self._lca_obj_id_arrays() return ( bio_arr[coo.row].copy(), act_arr[coo.col].copy(), coo.data.copy(), )
[docs] def _lca_obj_id_arrays(self): """row/col index → bioflow_id / activity_id, cached per builder.""" if not hasattr(self, "_cached_lca_id_arrays"): n_bio = self.lca_obj.biosphere_matrix.shape[0] n_act = self.lca_obj.technosphere_matrix.shape[0] bio_arr = np.full(n_bio, -1, dtype=np.int64) for k, v in self.lca_obj.dicts.biosphere.reversed.items(): bio_arr[k] = v act_arr = np.full(n_act, -1, dtype=np.int64) for k, v in self.lca_obj.dicts.activity.reversed.items(): act_arr[k] = v self._cached_lca_id_arrays = (bio_arr, act_arr) return self._cached_lca_id_arrays
[docs] def _rebuild_unit_lci(self, triplets): """Rebuild a unit-LCI CSR sized to *this* lca_obj from cached triplets. Entries referring to bioflows or activities not present in the current lca_obj are silently skipped. For consumers using the same set of databases this never drops anything; for legitimately narrower scenarios it correctly excludes out-of-scope entries. """ # On a pure cache hit (no preceding redo_lci on this lca_obj) the # technosphere/biosphere matrices and dicts may not have been built # yet. Materialize them now. if not hasattr(self.lca_obj, "technosphere_matrix"): self.lca_obj.load_lci_data() bio_ids, act_ids, data = triplets bio_dict = self.lca_obj.dicts.biosphere act_dict = self.lca_obj.dicts.activity # Per-cache-entry list lookups are unavoidable (bw2calc dicts aren't # numpy-friendly), but the tight loop is still cheap relative to a # redo_lci solve. -1 sentinel marks missing keys; mask filters them. bio_rows = np.fromiter( (bio_dict.get(int(b), -1) for b in bio_ids), count=len(bio_ids), dtype=np.int64 ) act_cols = np.fromiter( (act_dict.get(int(a), -1) for a in act_ids), count=len(act_ids), dtype=np.int64 ) mask = (bio_rows >= 0) & (act_cols >= 0) rows = bio_rows[mask] cols = act_cols[mask] vals = data[mask] n_bio = self.lca_obj.biosphere_matrix.shape[0] n_act = self.lca_obj.technosphere_matrix.shape[0] return sp.csr_matrix((vals, (rows, cols)), shape=(n_bio, n_act))
[docs] def count_pending_background_solves(self): """Count uncached background unit-LCI solves that the matrix build will need. Walks the temporal-markets branch of `build_dynamic_biosphere_matrix` without performing any `redo_lci` solves — just consults the existing cache. TimexLCA uses this to decide whether LU-factorizing the technosphere upfront is worth it; factorization only pays off once the number of pending solves exceeds the break-even point. """ if not self._expand_technosphere: # Non-expand path: demands come from the timeline rows and we # cannot cheaply enumerate without effectively running the build. # Be conservative and report unknown-many so callers can fall # back to the default factorize policy if they care. return len(self.node_collections.get("temporal_markets", ())) pending_keys = set() for row in self.timeline.itertuples(): idx = row.time_mapped_producer if idx not in self.node_collections["temporal_markets"]: continue process_col_index = self.activity_dict[idx] demand = self.demand_from_technosphere(idx, process_col_index) if not demand: continue for act in demand: key = self.get_background_lci_cache_key(act) cache = ( self._background_unit_lci_cache if key[0] == "db_code" else self._instance_unit_lci_cache ) if key in cache or key in pending_keys: continue pending_keys.add(key) return len(pending_keys)
[docs] def get_background_lci_cache_key(self, act): """Build a stable cache key for background unit LCI reuse.""" mapping = self.activity_time_mapping.reversed.get(act) if mapping is None: return ("activity_id", act) process_key, _ = mapping if isinstance(process_key, tuple): db, code = process_key if db == "temporalized": return ("temporalized", code) # Include the background database's `modified` token so edits to # that database invalidate stale globally-cached unit LCIs. modified = bd.databases[db].get("modified") if db in bd.databases else None # Include the current bw2data project — activity / bioflow ids # are project-scoped, so a triplet cached under one project # must not be reused under another. return ("db_code", bd.projects.current, db, code, modified) return ("activity_id", act)