Source code for bw2calc.lca

import logging
import warnings
from collections.abc import Mapping
from numbers import Number
from pathlib import Path
from typing import Callable, Iterable, Optional, Union

import bw_processing as bwp
import matrix_utils as mu
import numpy as np
import pandas as pd
from fsspec import AbstractFileSystem
from scipy import sparse

from bw2calc import PYPARDISO, __version__
from bw2calc.dictionary_manager import DictionaryManager
from bw2calc.errors import OutsideTechnosphere
from bw2calc.lca_base import LCABase
from bw2calc.single_value_diagonal_matrix import SingleValueDiagonalMatrix
from bw2calc.utils import consistent_global_index, get_datapackage, utc_now, wrap_functional_unit

try:
    from bw2data import get_node
except ImportError:
[docs] get_node = None
[docs] logger = logging.getLogger("bw2calc")
[docs] class LCA(LCABase):
[docs] matrix_labels = [ "technosphere_mm", "biosphere_mm", "characterization_mm", "normalization_mm", "weighting_mm", ]
######### # Setup # ######### def __init__( self, demand: dict, # Brightway 2 calling convention method: Optional[tuple] = None, weighting: Optional[str] = None, normalization: Optional[str] = None, # Brightway 2.5 calling convention data_objs: Optional[Iterable[Union[Path, AbstractFileSystem, bwp.DatapackageBase]]] = None, remapping_dicts: Optional[Iterable[dict]] = None, log_config: Optional[dict] = None, seed_override: Optional[int] = None, use_arrays: Optional[bool] = False, use_distributions: Optional[bool] = False, selective_use: Optional[dict] = False, ): """Create a new LCA calculation object. Compatible with Brightway2 and 2.5 semantics. Can be static, stochastic, or iterative (scenario-based), depending on the ``data_objs`` input data.. This class supports both stochastic and static LCA, and can use a variety of ways to describe uncertainty. The input flags `use_arrays` and `use_distributions` control some of this stochastic behaviour. See the [documentation for `matrix_utils`](https://github.com/brightway-lca/matrix_utils) for more information on the technical implementation. Parameters ---------- demand : dict[object: float] The demand for which the LCA will be calculated. The keys can be Brightway `Node` instances, `(database, code)` tuples, or integer ids. method : tuple Tuple defining the LCIA method, such as `('foo', 'bar')`. Only needed if not passing `data_objs`. weighting : tuple Tuple defining the LCIA weighting, such as `('foo', 'bar')`. Only needed if not passing `data_objs`. weighting : string String defining the LCIA normalization, such as `'foo'`. Only needed if not passing `data_objs`. data_objs : list[bw_processing.Datapackage] List of `bw_processing.Datapackage` objects. Can be loaded via `bw2data.prepare_lca_inputs` or constructed manually. Should include data for all needed matrices. remapping_dicts : dict[str, dict] Dict of remapping dictionaries that link Brightway `Node` ids to `(database, code)` tuples. `remapping_dicts` can provide such remapping for any of `activity`, `product`, `biosphere`. log_config : dict Optional arguments to pass to logging. Not yet implemented. seed_override : int RNG seed to use in place of `Datapackage` seed, if any. use_arrays : bool Use arrays instead of vectors from the given `data_objs` use_distributions : bool Use probability distributions from the given `data_objs` selective_use : dict[str, dict] Dictionary that gives more control on whether `use_arrays` or `use_distributions` should be used. Has the form `{matrix_label: {"use_arrays"|"use_distributions": bool}`. Standard matrix labels are `technosphere_matrix`, `biosphere_matrix`, and `characterization_matrix`. """ if not isinstance(demand, Mapping): raise ValueError("Demand must be a dictionary") if data_objs is None: try: # Something breaks when this happens too early, i.e. in the file imports... from bw2data import prepare_lca_inputs except ImportError: raise ImportError("bw2data version >= 4 not found") demand, self.packages, remapping_dicts = prepare_lca_inputs( demand=demand, method=method, weighting=weighting, normalization=normalization, ) self.method = method self.weighting = weighting self.normalization = normalization else: self.packages = [get_datapackage(obj) for obj in data_objs]
[docs] self.dicts = DictionaryManager()
[docs] self.demand = demand
[docs] self.use_arrays = use_arrays
[docs] self.use_distributions = use_distributions
[docs] self.selective_use = selective_use or {}
[docs] self.remapping_dicts = remapping_dicts or {}
[docs] self.seed_override = seed_override
message = """Initialized LCA object. Demand: {demand}, data_objs: {data_objs}""".format( demand=self.demand, data_objs=self.packages ) logger.info( message, extra={ "demand": wrap_functional_unit(self.demand), "data_objs": str(self.packages), "bw2calc": __version__, "pypardiso": PYPARDISO, "numpy": np.__version__, "matrix_utils": mu.__version__, "bw_processing": bwp.__version__, "utc": utc_now(), }, ) def __next__(self) -> None: skip_first_iteration = getattr(self, "keep_first_iteration_flag", False) if not skip_first_iteration: self._delete_solver_state() for matrix in self.matrix_labels: if hasattr(self, matrix): obj = getattr(self, matrix) next(obj) message = """Iterating {matrix}. Indexers: {indexer_state}""".format( matrix=matrix, indexer_state=[(str(p), p.indexer.index) for p in obj.packages], ) logger.debug( message, extra={ "matrix": matrix, "indexers": [(str(p), p.indexer.index) for p in obj.packages], "matrix_sum": obj.matrix.sum(), "utc": utc_now(), }, ) if hasattr(self, "after_matrix_iteration"): self.after_matrix_iteration() # Avoid this conversion each time we do a calculation in the future # See https://github.com/haasad/PyPardiso/issues/75#issuecomment-2186825609 if PYPARDISO: self.technosphere_matrix = self.technosphere_matrix.tocsr() if skip_first_iteration: delattr(self, "keep_first_iteration_flag") if hasattr(self, "inventory"): self.lci_calculation() if hasattr(self, "characterized_inventory"): self.lcia_calculation()
[docs] def build_demand_array(self, demand: Optional[dict] = None) -> None: """Turn the demand dictionary into a *NumPy* array of correct size. Args: * *demand* (dict, optional): Demand dictionary. Optional, defaults to ``self.demand``. Returns: A 1-dimensional NumPy array """ demand = demand or self.demand self.demand_array = np.zeros(len(self.dicts.product)) for key in demand: try: self.demand_array[self.dicts.product[key]] = demand[key] except KeyError: if key in self.dicts.activity: raise ValueError( f"LCA can only be performed on products, not activities ({key} is the" + " wrong dimension)" ) else: raise OutsideTechnosphere(f"Can't find key {key} in product dictionary")
################## # Data retrieval # ##################
[docs] def load_lcia_data( self, data_objs: Optional[Iterable[Union[AbstractFileSystem, bwp.DatapackageBase]]] = None ) -> None: """Load data and create characterization matrix. This method will filter out regionalized characterization factors. """ global_index = consistent_global_index(data_objs or self.packages) fltr = (lambda x: x["col"] == global_index) if global_index is not None else None use_arrays, use_distributions = self.check_selective_use("characterization_matrix") try: self.characterization_mm = mu.MappedMatrix( packages=data_objs or self.packages, matrix="characterization_matrix", use_arrays=use_arrays, use_distributions=use_distributions, seed_override=self.seed_override, row_mapper=self.biosphere_mm.row_mapper, diagonal=True, custom_filter=fltr, ) except mu.errors.AllArraysEmpty: raise ValueError("Given `method` or `data_objs` have no characterization data") self.characterization_matrix = self.characterization_mm.matrix if len(self.characterization_matrix.data) == 0: warnings.warn("All values in characterization matrix are zero")
[docs] def load_normalization_data( self, data_objs: Optional[Iterable[Union[AbstractFileSystem, bwp.DatapackageBase]]] = None ) -> None: """Load normalization data.""" use_arrays, use_distributions = self.check_selective_use("normalization_matrix") self.normalization_mm = mu.MappedMatrix( packages=data_objs or self.packages, matrix="normalization_matrix", use_arrays=use_arrays, use_distributions=use_distributions, seed_override=self.seed_override, row_mapper=self.biosphere_mm.row_mapper, diagonal=True, ) self.normalization_matrix = self.normalization_mm.matrix
[docs] def load_weighting_data( self, data_objs: Optional[Iterable[Union[AbstractFileSystem, bwp.DatapackageBase]]] = None ) -> None: """Load normalization data.""" use_arrays, use_distributions = self.check_selective_use("weighting_matrix") self.weighting_mm = SingleValueDiagonalMatrix( packages=data_objs or self.packages, matrix="weighting_matrix", row_mapper=None, dimension=len(self.biosphere_mm.row_mapper), use_arrays=use_arrays, use_distributions=use_distributions, seed_override=self.seed_override, ) self.weighting_matrix = self.weighting_mm.matrix
################ # Calculations # ################
[docs] def lci_calculation(self) -> None: """The actual LCI calculation. Separated from ``lci`` to be reusable in cases where the matrices are already built, e.g. ``redo_lci`` and Monte Carlo classes. """ self.supply_array = self.solve_linear_system() # Turn 1-d array into diagonal matrix count = len(self.dicts.activity) self.inventory = self.biosphere_matrix @ sparse.spdiags( [self.supply_array], [0], count, count )
[docs] def lcia_calculation(self) -> None: """The actual LCIA calculation. Separated from ``lcia`` to be reusable in cases where the matrices are already built, e.g. ``redo_lcia`` and Monte Carlo classes. """ self.characterized_inventory = self.characterization_matrix @ self.inventory
[docs] def normalization_calculation(self) -> None: """The actual normalization calculation. Creates ``self.normalized_inventory``.""" self.normalized_inventory = self.normalization_matrix @ self.characterized_inventory
[docs] def weighting_calculation(self) -> None: """The actual weighting calculation. Multiples weighting value by normalized inventory, if available, otherwise by characterized inventory. Creates ``self.weighted_inventory``.""" if hasattr(self, "normalized_inventory"): obj = self.normalized_inventory else: obj = self.characterized_inventory self.weighted_inventory = self.weighting_matrix @ obj
@property
[docs] def score(self) -> float: """ The LCIA score as a ``float``. Note that this is a `property <http://docs.python.org/2/library/functions.html#property>`_, so it is ``foo.lca``, not ``foo.score()`` """ assert hasattr(self, "characterized_inventory"), "Must do LCIA first" if hasattr(self, "weighted_inventory"): return float(self.weighted_inventory.sum()) elif hasattr(self, "normalized_inventory"): return float(self.normalized_inventory.sum()) else: return float(self.characterized_inventory.sum())
##################### # Redo calculations # #####################
[docs] def _switch( self, obj: Union[tuple, Iterable[Union[AbstractFileSystem, bwp.DatapackageBase]]], label: str, matrix: str, func: Callable, ) -> None: """Switch a method, weighting, or normalization""" if isinstance(obj, tuple): from bw2data import prepare_lca_inputs _, data_objs, _ = prepare_lca_inputs(**{label: obj}) setattr(self, label, obj) else: data_objs = list(obj) # Drop packages that become empty after exclusion (e.g. a method-only zip package # after switch_method) so their filesystems are released and file handles closed. self.packages = [ p for pkg in self.packages if (p := pkg.exclude({"matrix": matrix})).resources ] + data_objs func(data_objs=data_objs) logger.info( f"""Switched LCIA {label}. data_objs: {data_objs}""", extra={ "data_objs": str(data_objs), "utc": utc_now(), }, )
[docs] def switch_method( self, method=Union[tuple, Iterable[Union[AbstractFileSystem, bwp.DatapackageBase]]] ) -> None: """Load a new method and replace ``.characterization_mm`` and ``.characterization_matrix``. Does not do any new calculations or change ``.characterized_inventory``.""" self._switch( obj=method, label="method", matrix="characterization_matrix", func=self.load_lcia_data, )
[docs] def switch_normalization( self, normalization=Union[tuple, Iterable[Union[AbstractFileSystem, bwp.DatapackageBase]]] ) -> None: """Load a new normalization and replace ``.normalization_mm`` and ``.normalization_matrix``. Does not do any new calculations or change ``.normalized_inventory``.""" self._switch( obj=normalization, label="normalization", matrix="normalization_matrix", func=self.load_normalization_data, )
[docs] def switch_weighting( self, weighting=Union[tuple, Iterable[Union[AbstractFileSystem, bwp.DatapackageBase]]] ) -> None: """Load a new weighting and replace ``.weighting_mm`` and ``.weighting_matrix``. Does not do any new calculations or change ``.weighted_inventory``.""" self._switch( obj=weighting, label="weighting", matrix="weighting_matrix", func=self.load_weighting_data, )
[docs] def check_demand(self, demand: Optional[dict] = None): if demand is None: return else: for key in demand: if key not in self.dicts.product and not isinstance(key, int): raise KeyError( f"Key '{key}' not in product dictionary; make sure to pass the integer id " + "property of the `Node` or `Activity` object, not a key tuple like " + "`('foo', 'bar')`, or the entire `Node` object." )
[docs] def to_dataframe( self, matrix_label: str = "characterized_inventory", row_dict: Optional[dict] = None, col_dict: Optional[dict] = None, annotate: bool = True, cutoff: Number = 200, cutoff_mode: str = "number", ) -> pd.DataFrame: """Return all nonzero elements of the given matrix as a Pandas dataframe. The LCA class instance must have the matrix ``matrix_label`` already; common labels are: * characterized_inventory * inventory * technosphere_matrix * biosphere_matrix * characterization_matrix For these common matrices, we already have ``row_dict`` and ``col_dict`` which link row and column indices to database ids. For other matrices, or if you have a custom mapping dictionary, override ``row_dict`` and/or ``col_dict``. They have the form ``{matrix index: identifier}``. If ``bw2data`` is installed, this function will try to look up metadata on the row and column objects. To turn this off, set ``annotate`` to ``False``. Instead of returning all possible values, you can apply a cutoff. This cutoff can be specified in two ways, controlled by ``cutoff_mode``, which should be either ``fraction`` or ``number``. If ``cutoff_mode`` is ``number`` (the default), then ``cutoff`` is the number of rows in the DataFrame. Data values are first sorted by their absolute value, and then the largest ``cutoff`` are taken. If ``cutoff_mode`` is ``fraction``, then only values whose absolute value is greater than ``cutoff * total_score`` are taken. ``cutoff`` must be between 0 and 1. The returned DataFrame will have the following columns: * amount * col_index * row_index If row or columns dictionaries are available, the following columns are added: * col_id * row_id If ``bw2data`` is available, then the following columns are added: * col_code * col_database * col_location * col_name * col_reference_product * col_type * col_unit * row_categories * row_code * row_database * row_location * row_name * row_type * row_unit * source_product Returns a pandas ``DataFrame``. """ matrix = getattr(self, matrix_label).tocoo() dict_mapping = { "characterized_inventory": ( self.dicts.biosphere.reversed, self.dicts.activity.reversed, ), "inventory": (self.dicts.biosphere.reversed, self.dicts.activity.reversed), "technosphere_matrix": ( self.dicts.product.reversed, self.dicts.activity.reversed, ), "biosphere_matrix": ( self.dicts.biosphere.reversed, self.dicts.activity.reversed, ), "characterization_matrix": ( self.dicts.biosphere.reversed, self.dicts.biosphere.reversed, ), } if not row_dict: try: row_dict, _ = dict_mapping[matrix_label] except KeyError: row_dict = None if not col_dict: try: _, col_dict = dict_mapping[matrix_label] except KeyError: col_dict = None sorter = np.argsort(np.abs(matrix.data))[::-1] matrix.data = matrix.data[sorter] matrix.row = matrix.row[sorter] matrix.col = matrix.col[sorter] if cutoff is not None: if cutoff_mode == "fraction": if not (0 < cutoff < 1): raise ValueError("fraction `cutoff` value must be between 0 and 1") total = matrix.data.sum() mask = np.abs(matrix.data) > (total * cutoff) matrix.data = matrix.data[mask] matrix.row = matrix.row[mask] matrix.col = matrix.col[mask] elif cutoff_mode == "number": matrix.data = matrix.data[: int(cutoff)] matrix.row = matrix.row[: int(cutoff)] matrix.col = matrix.col[: int(cutoff)] else: raise ValueError("Can't understand cutoff mode") df_data = { "row_index": matrix.row, "col_index": matrix.col, "amount": matrix.data, } if row_dict: df_data["row_id"] = np.array([row_dict[i] for i in matrix.row]) if col_dict: df_data["col_id"] = np.array([col_dict[i] for i in matrix.col]) df = pd.DataFrame(df_data) def metadata_dataframe(objs, prefix): def dict_for_obj(obj, prefix): dct = { f"{prefix}id": obj["id"], f"{prefix}database": obj["database"], f"{prefix}code": obj["code"], f"{prefix}name": obj.get("name"), f"{prefix}location": obj.get("location"), f"{prefix}unit": obj.get("unit"), f"{prefix}type": obj.get("type", "process"), } if prefix == "col_": dct["col_reference_product"] = obj.get("reference product") else: dct["row_categories"] = ( "::".join(obj["categories"]) if obj.get("categories") else None ) dct["row_product"] = obj.get("reference product") return dct return pd.DataFrame([dict_for_obj(obj, prefix) for obj in objs]) if get_node and annotate: if row_dict: row_metadata_df = metadata_dataframe( objs=[get_node(id=i) for i in np.unique(df_data["row_id"])], prefix="row_", ) df = df.merge(row_metadata_df, on="row_id") if col_dict: col_metadata_df = metadata_dataframe( objs=[get_node(id=i) for i in np.unique(df_data["col_id"])], prefix="col_", ) df = df.merge(col_metadata_df, on="col_id") return df
################ # Contribution # ################ # def top_emissions(self, **kwargs): # """Call ``bw2analyzer.ContributionAnalyses.annotated_top_emissions``""" # try: # from bw2analyzer import ContributionAnalysis # except ImportError: # raise ImportError("`bw2analyzer` is not installed") # return ContributionAnalysis().annotated_top_emissions(self, **kwargs) # def top_activities(self, **kwargs): # """Call ``bw2analyzer.ContributionAnalyses.annotated_top_processes``""" # try: # from bw2analyzer import ContributionAnalysis # except ImportError: # raise ImportError("`bw2analyzer` is not installed") # return ContributionAnalysis().annotated_top_processes(self, **kwargs)