Source code for bw2calc.single_matrix

# -*- coding: utf-8 -*-
from __future__ import print_function, unicode_literals, division
from eight import *

from scipy import sparse
from io import BytesIO
import numpy as np
from .errors import (
    NonsquareTechnosphere,
    OutsideTechnosphere,
)
from .log_utils import create_logger
from .matrices import SingleMatrixBuilder, MatrixBuilder
from .utils import (
    global_index,
    clean_databases,
    get_filepaths,
    load_arrays,
    mapping,
)
import copy
import numpy as np
import json
import logging
import tarfile
import warnings

try:
    from pypardiso import factorized, spsolve
except ImportError:
    from scipy.sparse.linalg import factorized, spsolve
try:
    import pandas
except ImportError:
[docs] pandas = None
try: from collections.abc import Mapping except ImportError: from collections import Mapping try: from presamples import PackagesDataLoader except ImportError:
[docs] PackagesDataLoader = None
[docs] class SingleMatrixLCA(object): """An LCA which puts everything into one matrix. Comes with advantages and disadvantages, and designed exclusively for `BONSAI <https://bonsai.uno/>`__ via `beebee <https://github.com/BONSAMURAIS/beebee/>`__.""" def __init__(self, demand, data_filepath, log_config=None, presamples=None, seed=None, override_presamples_seed=False): """Create a new single-matrix LCA calculation. Args: * *demand* (dict): The demand or functional unit. Needs to be a dictionary to indicate amounts, e.g. ``{("my database", "my process"): 2.5}``. Returns: A new ``SingleMatrixLCA`` object """ if not isinstance(demand, Mapping): raise ValueError("Demand must be a dictionary") if log_config: create_logger(**log_config)
[docs] self.logger = logging.getLogger('bw2calc')
[docs] self.demand = demand
[docs] self.filepath = data_filepath
[docs] self.seed = seed
if presamples and PackagesDataLoader is None: warnings.warn("Skipping presamples; `presamples` not installed") self.presamples = None elif presamples: # Iterating over a `Campaign` object will also return the presample filepaths self.presamples = PackagesDataLoader( dirpaths=presamples, seed=self.seed if override_presamples_seed else None, lca=self ) else: self.presamples = None self.logger.info("Created LCA object", extra={ 'demand': str(self.demand), 'data_filepath': self.filepath, 'presamples': str(self.presamples), })
[docs] def build_demand_array(self, demand=None): """Turn the demand dictionary into a *NumPy* array of correct size. Args: * *demand* (dict, optional): Demand dictionary. Optional, defaults to ``self.demand``. Returns: A 1-dimensional NumPy array """ demand = demand or self.demand self.demand_array = np.zeros(len(self.row_dict)) for key in demand: try: self.demand_array[self.row_dict[key]] = demand[key] except KeyError: if key in self.col_dict: raise ValueError((u"LCA can only be performed on products," u" not activities ({} is the wrong dimension)" ).format(key) ) else: raise OutsideTechnosphere("Can't find key {} in product dictionary".format(key))
######################### ### Data manipulation ### #########################
[docs] def fix_dictionaries(self, row_mapping, col_mapping): """Fix the row and column dictionaries from ``{integer: row/col index}`` to ``{label: row/col index}``.""" self.row_dict = {label: self.row_dict[index] for label, index in row_mapping.items()} self.col_dict = {label: self.col_dict[index] for label, index in col_mapping.items()}
[docs] def reverse_dict(self): """Construct reverse dicts from technosphere and biosphere row and col indices to activity_dict/product_dict/biosphere_dict keys. Returns: (reversed ``self.activity_dict``, ``self.product_dict`` and ``self.biosphere_dict``) """ rev_row = {v: k for k, v in self.row_dict.items()} rev_col = {v: k for k, v in self.col_dict.items()} return rev_row, rev_col
###################### ### Data retrieval ### ######################
[docs] def load_beebee_data(self, builder=SingleMatrixBuilder): """Load ``beebee`` export data package. This is a compressed file which contains: * A `stats_arrays <https://bitbucket.org/cmutel/stats_arrays>`__ file used to create the single matrix. * A mapping dictionary from meaningful labels to the integer row ids * A mapping dictionary from meaningful labels to the integer column ids * A mapping dictionary from ``{"method URI": {labels}}`` which allows for LCIA sums """ with tarfile.open(self.filepath, 'r:bz2') as f: # Hack needed because of https://github.com/numpy/numpy/issues/7989 array_file = BytesIO() array_file.write(f.extractfile("array.npy").read()) array_file.seek(0) self.params, self.row_dict, self.col_dict, \ self.matrix = builder.build(array_file) row_mapping = json.load(f.extractfile("row.mapping")) col_mapping = json.load(f.extractfile("col.mapping")) categories = json.load(f.extractfile("categories.mapping")) if len(self.row_dict) != len(self.col_dict): raise NonsquareTechnosphere(( "Single matrix is not square: {} columns and {} rows." ).format(len(self.row_dict), len(self.col_dict)) ) self.fix_dictionaries(row_mapping, col_mapping) self.category_index_arrays = { name: np.array( [(self.row_dict[label], 1) for label in labels], dtype=[('INDEX', np.uint32), ('CONSTANT', np.uint32)] ) for name, labels in categories.items() } # Only need to index here for traditional LCA if self.presamples: self.presamples.index_arrays(self) self.presamples.update_matrices( matrices=('matrix',) )
[docs] def decompose_technosphere(self): """ Factorize the technosphere matrix into lower and upper triangular matrices, :math:`A=LU`. Does not solve the linear system :math:`Ax=B`. Doesn't return anything, but creates ``self.solver``. .. warning:: Incorrect results could occur if a technosphere matrix was factorized, and then a new technosphere matrix was constructed, as ``self.solver`` would still be the factorized older technosphere matrix. You are responsible for deleting ``self.solver`` when doing these types of advanced calculations. """ self.solver = factorized(self.matrix.tocsc())
[docs] def solve_linear_system(self): """ Master solution function for linear system :math:`Ax=B`. To most numerical analysts, matrix inversion is a sin. -- Nicolas Higham, Accuracy and Stability of Numerical Algorithms, Society for Industrial and Applied Mathematics, Philadelphia, PA, USA, 2002, p. 260. We use `UMFpack <http://www.cise.ufl.edu/research/sparse/umfpack/>`_, which is a very fast solver for sparse matrices. If the technosphere matrix has already been factorized, then the decomposed technosphere (``self.solver``) is reused. Otherwise the calculation is redone completely. """ if hasattr(self, "solver"): return self.solver(self.demand_array) else: return spsolve( self.matrix, self.demand_array)
[docs] def lcia(self, *args, **kwargs): warnings.warn("LCIA does nothing in a SingleMatrixLCA")
[docs] def calculate(self, factorize=False, builder=SingleMatrixBuilder): """Calculate an LCA score. Creates ``self.supply_array``, a vector of activities, flows, and characterization pathways which satisfy the demand. Creates ``self.scores``, a dictionary of ``{'LCIA identifier': LCA score}``. Create ``self.contributions``, a dictionary of ``{'LCIA identifier': []}``. Args: * *factorize* (bool, optional): Factorize the technosphere matrix. Makes additional calculations with the same technosphere matrix much faster. Default is ``False``; not useful is only doing one LCI calculation. * *builder* (``SingleMatrixBuilder`` object, optional): Custom matrix builders can be used to manipulate data in creative ways before building the matrices. Doesn't return anything. """ self.load_beebee_data(builder) self.build_demand_array() if factorize: self.decompose_technosphere() self.calculate_scores()
[docs] def calculate_scores(self): self.supply_array = self.solve_linear_system() self.scores, self.contributions = {}, {} for name, array in self.category_index_arrays.items(): matrix = MatrixBuilder.build_diagonal_matrix( array, self.row_dict, 'INDEX', data_label='CONSTANT' ) self.scores[name] = float((self.matrix * matrix * self.supply_array).sum())
[docs] def rebuild_matrix(self, vector): """Build a new technosphere matrix using the same row and column indices, but different values. Useful for Monte Carlo iteration or sensitivity analysis. Args: * *vector* (array): 1-dimensional NumPy array with length (# of technosphere parameters), in same order as ``self.tech_params``. Doesn't return anything, but overwrites ``self.technosphere_matrix``. """ self.matrix = SingleMatrixBuilder.build_single_matrix( self.params, self.row_dict, self.col_dict, vector )
[docs] def redo_calculate(self, demand=None): """Redo LCI with same databases but different demand. Args: * *demand* (dict): A demand dictionary. Doesn't return anything, but overwrites ``self.demand_array``, ``self.supply_array``, and ``self.inventory``. .. warning:: If you want to redo the LCIA as well, use ``redo_lcia(demand)`` directly. """ assert hasattr(self, "matrix"), "Must do `calculate` first" if demand is not None: self.build_demand_array(demand) self.calculate_scores() self.logger.info("Redoing LCI", extra={'demand': str(demand or self.demand)})