Source code for bw2data.compat

from typing import Dict, List, Union

from bw_processing.datapackage import DatapackageBase
from deprecated import deprecated

from bw2data import (
    Database,
    Method,
    Normalization,
    Weighting,
    databases,
    get_node,
    methods,
    normalizations,
    projects,
    weightings,
)
from bw2data.backends import Node
from bw2data.backends.schema import ActivityDataset as AD
from bw2data.backends.schema import get_id
from bw2data.errors import Brightway2Project, UnknownObject


[docs] class Mapping: """A dictionary that maps object ids, like ``("Ecoinvent 2.2", 42)``, to integers. Used only for backwards compatibility; preferred method is now to look up the ids of activities directly in the SQlite database. """ @deprecated("This method is no longer necessary, and does nothing.")
[docs] def add(self, keys): return
def __getitem__(self, key): return get_id(key) @deprecated("This method is no longer necessary, and does nothing.")
[docs] def delete(self, keys): return
def __str__(self): return "Obsolete mapping dictionary." def __len__(self): return AD.select().count()
[docs] def unpack(dct) -> str: for obj in dct: if isinstance(obj, AD): yield obj.database elif isinstance(obj, Node): yield obj["database"] elif isinstance(obj, tuple): yield obj[0] elif isinstance(obj, int): yield get_node(id=obj)["database"] else: raise ValueError
[docs] def translate_key(key): if isinstance(key, int): return key else: return AD.get(AD.database == key[0], AD.code == key[1]).id
[docs] def prepare_lca_inputs( demand=None, method=None, weighting=None, normalization=None, demands=None, remapping=True, demand_database_last=True, ): """Prepare LCA input arguments in Brightway 2.5 style.""" if not projects.dataset.data.get("25"): raise Brightway2Project( "Please use `projects.migrate_project_25` before calculating using Brightway 2.5" ) databases.clean() data_objs = [] remapping_dicts = None if demands: demand_database_names = sorted({db_label for dct in demands for db_label in unpack(dct)}) elif demand: demand_database_names = sorted({db_label for db_label in unpack(demand)}) else: demand_database_names = [] if demand_database_names: database_names = set.union( *[Database(db_label).find_graph_dependents() for db_label in demand_database_names] ) if demand_database_last: database_names = [ x for x in database_names if x not in demand_database_names ] + demand_database_names data_objs.extend([Database(obj).datapackage() for obj in database_names]) if remapping: # This is technically wrong - we could have more complicated queries # to determine what is truly a product, activity, etc. # However, for the default database schema, we know that each node # has a unique ID, so this won't produce incorrect responses, # just too many values. As the dictionary only exists once, this is # not really a problem. reversed_mapping = { i: (d, c) for d, c, i in AD.select(AD.database, AD.code, AD.id) .where(AD.database << database_names) .tuples() } remapping_dicts = { "activity": reversed_mapping, "product": reversed_mapping, "biosphere": reversed_mapping, } if method: if method not in methods: raise ValueError( f"Method {method} not found in this project. " "Use `bw2data.methods` to list available methods." ) data_objs.append(Method(method).datapackage()) if weighting: if weighting not in weightings: raise ValueError( f"Weighting {weighting} not found in this project. " "Use `bw2data.weightings` to list available weightings." ) data_objs.append(Weighting(weighting).datapackage()) if normalization: if normalization not in normalizations: raise ValueError( f"Normalization {normalization} not found in this project. " "Use `bw2data.normalizations` to list available normalizations." ) data_objs.append(Normalization(normalization).datapackage()) if demands: indexed_demand = [{get_id(k): v for k, v in dct.items()} for dct in demands] elif demand: indexed_demand = {get_id(k): v for k, v in demand.items()} else: indexed_demand = None return indexed_demand, data_objs, remapping_dicts
[docs] def get_database_filepath(functional_unit): """Get filepaths for all databases in supply chain of `functional_unit`""" dbs = set.union(*[Database(key[0]).find_graph_dependents() for key in functional_unit]) return [Database(obj).filepath_processed() for obj in dbs]
[docs] def get_multilca_data_objs( functional_units=Dict[str, dict], method_config=Dict[str, Union[list, dict]] ) -> List[DatapackageBase]: """Get all the datapackages needed for a complete MultiLCA calculation.""" input_database_names = set() if hasattr(method_config, "impact_categories") and hasattr(method_config, "model_dump"): method_config = method_config.model_dump(exclude_none=True) for v_dict in functional_units.values(): for obj in v_dict: if not isinstance(obj, int): raise ValueError( f"Functional unit inputs must be integers; got {obj} (type {type(obj)})" ) try: input_database_names.add(get_node(id=obj)["database"]) except UnknownObject: raise UnknownObject(f"Functional unit id {obj} is not in this project") complete_database_names = set.union( *[Database(db_label).find_graph_dependents() for db_label in input_database_names] ) data_objs = [Database(obj).datapackage() for obj in complete_database_names] if method_config.get("impact_categories", []): for ic in set(method_config.get("impact_categories", [])): if ic not in methods: raise ValueError(f"Impact category (`Method`) {ic} not in this project") data_objs.append(Method(ic).datapackage()) if method_config.get("normalizations", []): for n in method_config.get("normalizations", []): if n not in normalizations: raise ValueError(f"Normalization {n} not in this project") data_objs.append(Normalization(n).datapackage()) if method_config.get("weightings", []): for w in method_config.get("weightings", []): if w not in weightings: raise ValueError(f"Weighting {w} not in this project") data_objs.append(Weighting(w).datapackage()) return data_objs