Source code for bw2calc.multi_lca

import logging
import warnings
from pathlib import Path
from typing import Iterable, Optional, Union

import bw_processing as bwp
import matrix_utils as mu
import numpy as np
from fsspec import AbstractFileSystem
from pydantic import BaseModel
from scipy import sparse

from bw2calc import PYPARDISO, __version__, spsolve
from bw2calc.dictionary_manager import DictionaryManager
from bw2calc.errors import OutsideTechnosphere
from bw2calc.lca_base import LCABase
from bw2calc.method_config import MethodConfig
from bw2calc.restricted_sparse_matrix_dict import RestrictedSparseMatrixDict
from bw2calc.single_value_diagonal_matrix import SingleValueDiagonalMatrix
from bw2calc.utils import consistent_global_index, convert_tuple_to_list, get_datapackage, utc_now

[docs] logger = logging.getLogger("bw2calc")
[docs] class DemandsValidator(BaseModel):
[docs] demands: dict[str, dict[int, float]]
[docs] class MultiLCA(LCABase): """ Perform LCA on multiple demands, impact categories, and normalization and weighting sets. Builds only *one* technosphere and biosphere matrix which can cover all demands. Main differences from the base `LCA` class: * Many attributes are plural, such as `supply_arrays`, `inventories`, characterization_matrices` * `demands` must be a dictionary with `str` keys, e.g. `{'my truck': {12345: 1}}` * `demands` must have integer IDs; you can't pass `('database', 'code')` or `Activity` objects. * Calculation results are a dictionary with keys of functional units and impact categories The calculation procedure is the same as for singular LCA: `lci()`, `lcia()`, and `next()`. See the LCA documentation for these methods and their input arguments. Parameters ---------- demands : dict[str, dict[int, float]] The demands for which the LCA will be calculated. The keys identify functional unit sets. method_config : dict | MethodConfig Dictionary satisfying the `MethodConfig` specification or `MethodConfig` instance. data_objs : list[bw_processing.Datapackage] List of `bw_processing.Datapackage` objects. Should include data for all needed matrices. remapping_dicts : dict[str, dict] Dict of remapping dictionaries that link Brightway `Node` ids to `(database, code)` tuples. `remapping_dicts` can provide such remapping for any of `activity`, `product`, `biosphere`. log_config : dict Optional arguments to pass to logging. Not yet implemented. seed_override : int RNG seed to use in place of `Datapackage` seed, if any. use_arrays : bool Use arrays instead of vectors from the given `data_objs` use_distributions : bool Use probability distributions from the given `data_objs` selective_use : dict[str, dict] Dictionary that gives more control on whether `use_arrays` or `use_distributions` should be used. Has the form `{matrix_label: {"use_arrays"|"use_distributions": bool}`. Standard matrix labels are `technosphere_matrix`, `biosphere_matrix`, and `characterization_matrix`. """
[docs] matrix_labels = [ "technosphere_mm", "biosphere_mm", ]
[docs] matrix_list_labels = [ "characterization_mm_dict", "normalization_mm_dict", "weighting_mm_dict", ]
def __init__( self, demands: dict[str, dict[int, float]], method_config: Union[dict, MethodConfig], data_objs: Iterable[Union[Path, AbstractFileSystem, bwp.DatapackageBase]], remapping_dicts: Optional[Iterable[dict]] = None, log_config: Optional[dict] = None, seed_override: Optional[int] = None, use_arrays: Optional[bool] = False, use_distributions: Optional[bool] = False, selective_use: Optional[dict] = None, ): # Validation checks DemandsValidator(demands=demands) if isinstance(method_config, MethodConfig): method_config = { key: value for key, value in method_config.model_dump().items() if value is not None } MethodConfig(**method_config)
[docs] self.demands = demands
[docs] self.config = method_config
[docs] self.packages = [get_datapackage(obj) for obj in data_objs]
[docs] self.dicts = DictionaryManager()
[docs] self.use_arrays = use_arrays
[docs] self.use_distributions = use_distributions
[docs] self.selective_use = selective_use or {}
[docs] self.remapping_dicts = remapping_dicts or {}
[docs] self.seed_override = seed_override
message = ( """Initialized MultiLCA object. Demands: {demands}, data_objs: {data_objs}""".format( demands=self.demands, data_objs=self.packages ) ) logger.info( message, extra={ "demands": self.demands, "data_objs": str(self.packages), "bw2calc": __version__, "pypardiso": PYPARDISO, "numpy": np.__version__, "matrix_utils": mu.__version__, "bw_processing": bwp.__version__, "utc": utc_now(), }, ) #################### # Modified methods # #################### # Don't allow new demand
[docs] def redo_lci(self) -> None: return super().redo_lci()
[docs] def lci(self) -> None: return super().lci()
[docs] def redo_lcia(self) -> None: return super().redo_lcia()
[docs] def lcia(self) -> None: return super().lcia()
#################### # LCA Calculations # #################### def __next__(self) -> None: skip_first_iteration = getattr(self, "keep_first_iteration_flag", False) if not skip_first_iteration: self._delete_solver_state() for matrix in self.matrix_labels: if hasattr(self, matrix): obj = getattr(self, matrix) next(obj) message = """Iterating matrix {matrix}. Indexers: {indexer_state}""".format( matrix=matrix, indexer_state=[(str(p), p.indexer.index) for p in obj.packages], ) logger.debug( message, extra={ "matrix": matrix, "indexers": [(str(p), p.indexer.index) for p in obj.packages], "matrix_sum": obj.matrix.sum(), "utc": utc_now(), }, ) for matrix_dict in self.matrix_list_labels: if hasattr(self, matrix_dict): obj = getattr(self, matrix_dict) next(obj) message = """Iterating matrix dict {matrix}. Indexer: {indexer_state}""".format( matrix=matrix, indexer_state=obj.global_indexer.index ) logger.debug( message, extra={ "matrix_dict": matrix_dict, "indexer": obj.global_indexer.index, "matrix_sums": [mm.matrix.sum() for mm in obj.values()], "utc": utc_now(), }, ) if hasattr(self, "after_matrix_iteration"): self.after_matrix_iteration() # Avoid this conversion each time we do a calculation in the future # See https://github.com/haasad/PyPardiso/issues/75#issuecomment-2186825609 if PYPARDISO: self.technosphere_matrix = self.technosphere_matrix.tocsr() if skip_first_iteration: delattr(self, "keep_first_iteration_flag") self._calculation()
[docs] def _calculation(self) -> None: """Factored out to allow subclasses to easily modify behaviour.""" if hasattr(self, "inventories"): self.lci_calculation() if hasattr(self, "characterized_inventories"): self.lcia_calculation()
[docs] def build_demand_array(self, demands: Optional[dict] = None) -> None: """Turn the demand dictionary into a *NumPy* array of correct size. Args: * *demand* (dict, optional): Demand dictionary. Optional, defaults to ``self.demand``. Returns: A 1-dimensional NumPy array """ demands = self.demands if demands is None else demands self.demand_arrays = {} for key, value in demands.items(): array = np.zeros(len(self.dicts.product)) for process_id, process_amount in value.items(): try: array[self.dicts.product[process_id]] = process_amount except KeyError as exc: if process_id in self.dicts.activity: raise ValueError( f"LCA can only be performed on products, not processes ({process_id} " + "is a process id)" ) from exc else: raise OutsideTechnosphere( f"Can't find key {process_id} in product dictionary" ) from exc self.demand_arrays[key] = array
################## # Data retrieval # ##################
[docs] def filter_package_by_identifier( self, data_objs: Iterable[bwp.DatapackageBase], identifier: Union[list[str], tuple[str, ...], str], ) -> list[bwp.DatapackageBase]: """Filter the datapackage resources in `data_objs` whose "identifier" attribute equals input argument `identifier`. Used in splitting up impact categories, normalization, and weighting matrices.""" identifier = convert_tuple_to_list(identifier) return [dp.filter_by_attribute("identifier", identifier) for dp in data_objs]
[docs] def load_lcia_data(self, data_objs: Optional[Iterable[bwp.DatapackageBase]] = None) -> None: """Load data and create characterization matrices. This method will filter out regionalized characterization factors. """ global_index = consistent_global_index(data_objs or self.packages) fltr = (lambda x: x["col"] == global_index) if global_index is not None else None use_arrays, use_distributions = self.check_selective_use("characterization_matrix") self.characterization_mm_dict = mu.MappedMatrixDict( packages={ ic: self.filter_package_by_identifier( data_objs=data_objs or self.packages, identifier=convert_tuple_to_list(ic) ) for ic in self.config["impact_categories"] }, matrix="characterization_matrix", use_arrays=use_arrays, use_distributions=use_distributions, seed_override=self.seed_override, row_mapper=self.biosphere_mm.row_mapper, col_mapper=None, diagonal=True, custom_filter=fltr, ) for key, value in self.characterization_mm_dict.items(): if len(value.matrix.data) == 0: warnings.warn(f"All values in characterization matrix for {key} are zero") self.characterization_matrices = mu.SparseMatrixDict( [(key, value.matrix) for key, value in self.characterization_mm_dict.items()] )
[docs] def load_normalization_data( self, data_objs: Optional[Iterable[bwp.DatapackageBase]] = None ) -> None: """Load normalization data.""" use_arrays, use_distributions = self.check_selective_use("normalization_matrix") self.normalization_mm_dict = mu.MappedMatrixDict( packages={ nrml: self.filter_package_by_identifier( data_objs=data_objs or self.packages, identifier=convert_tuple_to_list(nrml) ) for nrml in self.config["normalizations"] }, matrix="normalization_matrix", use_arrays=use_arrays, use_distributions=use_distributions, seed_override=self.seed_override, row_mapper=self.biosphere_mm.row_mapper, diagonal=True, ) for key, value in self.normalization_mm_dict.items(): if len(value.matrix.data) == 0: warnings.warn(f"All values in normalization matrix for {key} are zero") self.normalization_matrices = RestrictedSparseMatrixDict( self.config["normalizations"], [(key, value.matrix) for key, value in self.normalization_mm_dict.items()], )
[docs] def load_weighting_data( self, data_objs: Optional[Iterable[bwp.DatapackageBase]] = None ) -> None: """Load weighting data.""" use_arrays, use_distributions = self.check_selective_use("weighting_matrix") self.weighting_mm_dict = mu.MappedMatrixDict( packages={ wng: self.filter_package_by_identifier( data_objs=data_objs or self.packages, identifier=convert_tuple_to_list(wng) ) for wng in self.config["weightings"] }, matrix="weighting_matrix", row_mapper=None, dimension=len(self.biosphere_mm.row_mapper), use_arrays=use_arrays, use_distributions=use_distributions, seed_override=self.seed_override, matrix_class=SingleValueDiagonalMatrix, diagonal=True, ) for key, value in self.weighting_mm_dict.items(): if len(value.matrix.data) == 0: warnings.warn(f"All values in weighting matrix for {key} are zero") self.weighting_matrices = RestrictedSparseMatrixDict( self.config["weightings"], [(key, value.matrix) for key, value in self.weighting_mm_dict.items()], )
################ # Calculations # ################
[docs] def decompose_technosphere(self) -> None: raise NotImplementedError
[docs] def lci_calculation(self) -> None: """The actual LCI calculation. Separated from ``lci`` to be reusable in cases where the matrices are already built, e.g. ``redo_lci`` and Monte Carlo classes. """ count = len(self.dicts.activity) demand_matrix = np.vstack([arr for arr in self.demand_arrays.values()]).T self.supply_arrays = { name: arr for name, arr in zip( self.demands, spsolve(self.technosphere_matrix, demand_matrix).reshape(count, -1).T ) } # Turn 1-d array into diagonal matrix self.inventories = mu.SparseMatrixDict( [ (name, self.biosphere_matrix @ sparse.spdiags([arr], [0], count, count)) for name, arr in self.supply_arrays.items() ] )
[docs] def lcia_calculation(self) -> None: """The actual LCIA calculation. Separated from ``lcia`` to be reusable in cases where the matrices are already built, e.g. ``redo_lcia`` and Monte Carlo classes. """ self.characterized_inventories = self.characterization_matrices @ self.inventories if hasattr(self, "normalization_matrices"): self.normalization_calculation() if hasattr(self, "weighting_matrices"): self.weighting_calculation()
[docs] def normalization_calculation(self) -> None: """The actual normalization calculation. Creates ``self.normalized_inventories``.""" self.normalized_inventories = self.normalization_matrices @ self.characterized_inventories
[docs] def weighting_calculation(self) -> None: """The actual weighting calculation. Multiplies weighting value by normalized inventories, if available, otherwise by characterized inventories. Creates ``self.weighted_inventories``.""" if hasattr(self, "normalized_inventories"): self.weighted_inventories = self.weighting_matrices @ self.normalized_inventories else: self.weighted_inventories = self.weighting_matrices @ self.characterized_inventories
@property
[docs] def scores(self) -> dict: """ The LCIA score as a ``float``. Note that this is a `property <http://docs.python.org/2/library/functions.html#property>`_, so it is ``foo.lca``, not ``foo.score()`` """ if not hasattr(self, "characterized_inventories"): raise ValueError("Must do LCIA first") if hasattr(self, "weighted_inventories"): return {key: arr.sum() for key, arr in self.weighted_inventories.items()} elif hasattr(self, "normalized_inventories"): return {key: arr.sum() for key, arr in self.normalized_inventories.items()} else: return {key: arr.sum() for key, arr in self.characterized_inventories.items()}