Source code for bw2io.strategies.generic

import numbers
import pprint
import warnings
from collections import defaultdict
from copy import deepcopy
from typing import Iterable, List, Optional, Union

import numpy as np
from bw2data import Database, databases, labels

from ..errors import StrategyError
from ..units import normalize_units as normalize_units_function
from ..utils import DEFAULT_FIELDS, activity_hash


[docs] def format_nonunique_key_error(obj: dict, fields: List[str], others: List[dict]) -> str: """ Generate a formatted error message for a dataset that can't be uniquely linked to the target database. Parameters ---------- obj : dict The problematic dataset that can't be uniquely linked to the target database. fields : list The list of fields to include in the error message. others : list A list of other similar datasets. Returns ------- str A formatted error message. See Also -------- pprint.pformat : Format a Python object into a pretty-printed string. Notes ----- This function is used to generate a formatted error message for a dataset that can't be uniquely linked to the target database. It takes the problematic dataset and a list of other similar datasets and returns an error message that includes the problematic dataset and a list of possible target datasets that may match the problematic dataset. Raises ------ None Examples -------- >>> obj = {'name': 'Electricity', 'location': 'CH'} >>> fields = ['name', 'location'] >>> others = [ ... {'name': 'Electricity', 'location': 'CH', 'filename': 'file1'}, ... {'name': 'Electricity', 'location': 'CH', 'filename': 'file2'} ... ] >>> format_nonunique_key_error(obj, fields, others) "Object in source database can't be uniquely linked to target database. Problematic dataset is: {'name': 'Electricity', 'location': 'CH'} Possible targets include (at least one not shown): [{'name': 'Electricity', 'location': 'CH', 'filename': 'file1'}, {'name': 'Electricity', 'location': 'CH', 'filename': 'file2'}]" """ template = """Object in source database can't be uniquely linked to target database.\nProblematic dataset is:\n{ds}\nPossible targets include (at least one not shown):\n{targets}""" fields_to_print = list(fields or DEFAULT_FIELDS) + ["filename"] _ = lambda x: {field: x.get(field, "(missing)") for field in fields_to_print} return template.format( ds=pprint.pformat(_(obj)), targets=pprint.pformat([_(x) for x in others]) )
[docs] def assign_only_product_as_production(db: Iterable[dict]) -> List[dict]: """ Assign only product as reference product. For each dataset in ``db``, this function checks if there is only one production exchange and no reference product already assigned. If this is the case, the reference product is set to the name of the production exchange, and the following fields are replaced if not already specified: * 'name' - name of reference product * 'unit' - unit of reference product * 'production amount' - amount of reference product Parameters ---------- db : iterable An iterable of dictionaries containing the datasets to process. Returns ------- iterable An iterable of dictionaries containing the processed datasets. Raises ------ AssertionError If a production exchange does not have a `name` attribute. Examples -------- >>> data = [{'name': 'Input 1', 'exchanges': [{'type': 'production', 'name': 'Product 1', 'amount': 1}, {'type': 'technosphere', 'name': 'Input 2', 'amount': 2}]}, {'name': 'Input 2', 'exchanges': [{'type': 'production', 'name': 'Product 2', 'amount': 3}, {'type': 'technosphere', 'name': 'Input 3', 'amount': 4}]}] >>> processed_data = assign_only_product_as_production(data) >>> processed_data[0]['reference product'] 'Product 1' >>> processed_data[0]['name'] 'Input 1' >>> processed_data[1]['reference product'] 'Product 2' >>> processed_data[1]['unit'] 'Unknown' """ for ds in db: if ds.get("reference product"): continue products = [x for x in ds.get("exchanges", []) if x.get("type") == "production"] if len(products) == 1: product = products[0] assert product["name"] ds["reference product"] = ( product.get("reference product", []) or product["name"] ) ds["production amount"] = product["amount"] ds["name"] = ds.get("name") or product["name"] ds["unit"] = ds.get("unit") or product.get("unit") or "Unknown" return db
[docs] def set_code_by_activity_hash(db: List[dict], overwrite: bool = False) -> List[dict]: """ Set the dataset code for each dataset in the given database using `activity_hash`. Parameters ---------- db : obj The database to set the dataset codes in. overwrite : bool, optional Whether to overwrite existing codes. Default is False. Returns ------- obj The modified database object with updated dataset codes. Notes ----- The dataset code is a unique identifier for each dataset in the database. It is generated by hashing the dataset dictionary with `activity_hash`. Examples -------- >>> db = Database('example_db') >>> set_code_by_activity_hash(db) """ for ds in db: if "code" not in ds or overwrite: ds["code"] = activity_hash(ds) return db
[docs] def tupleize_categories(db: List[dict]) -> List[dict]: """ Convert the "categories" fields in a given database and its exchanges to tuples. Parameters ---------- db : obj The database to convert categories in. Returns ------- obj The modified database object with converted category fields. Examples -------- >>> from bw2data import Database >>> db = Database('example_db') >>> tupleize_categories(db) """ for ds in db: if ds.get("categories"): ds["categories"] = tuple(ds["categories"]) for exc in ds.get("exchanges", []): if exc.get("categories"): exc["categories"] = tuple(exc["categories"]) return db
[docs] def drop_unlinked(db: List[dict]) -> List[dict]: """ Remove all exchanges in a given database that don't have inputs. Exchanges that don't have any inputs are often referred to as "unlinked exchanges". These exchanges can be a sign of an incomplete or poorly structured database. Parameters ---------- db : obj The database to remove unlinked exchanges from. Returns ------- obj The modified database object with removed unlinked exchanges. Notes ----- This is the nuclear option - use at your own risk! ⚠️ Examples -------- >>> db = [ ... {"name": "Product A", "unit": "kg", "exchanges": [{"input": True, "amount": 1, "name": "Input 1", "unit": "kg"}]}, ... {"name": "Product B", "unit": "kg", "exchanges": [{"input": True, "amount": 1, "name": "Input 2", "unit": "kg"}, {"input": False, "amount": 0.5, "name": "Product A", "unit": "kg"}]}, ... {"name": "Product C", "unit": "kg", "exchanges": [{"input": False, "amount": 0.75, "name": "Product A", "unit": "kg"}]} ... ] >>> drop_unlinked(db) [ {'name': 'Product A', 'unit': 'kg', 'exchanges': [{'input': True, 'amount': 1, 'name': 'Input 1', 'unit': 'kg'}]}, ... {'name': 'Product B', 'unit': 'kg', 'exchanges': [{'input': True, 'amount': 1, 'name': 'Input 2', 'unit': 'kg'}, ... {'input': False, 'amount': 0.5, 'name': 'Product A', 'unit': 'kg'}]}, ... {'name': 'Product C', 'unit': 'kg', 'exchanges': []} ] """ for ds in db: if "exchanges" not in ds: continue ds["exchanges"] = [obj for obj in ds["exchanges"] if obj.get("input")] return db
[docs] def normalize_units(db: List[dict]) -> List[dict]: """ Normalize units in datasets and their exchanges. Parameters ---------- db : list[dict] The database that needs to be normalized. Returns ------- list[dict] The normalized database. Examples -------- Example 1: Normalize the units of a given database. >>> db = {'name': 'test_db', 'unit': 'kg'} >>> normalize_units(db) {'name': 'test_db', 'unit': 'kilogram'} Example 2: Normalize the units of a dataset and its exchanges. >>> db = { ... 'name': 'test_db', ... 'unit': 'kg', ... 'exchanges': [ ... {'name': 'input', 'unit': 't'}, ... {'name': 'output', 'unit': 'lb'}, ... ] ... } >>> normalize_units(db) {'name': 'test_db', 'unit': 'kilogram', 'exchanges': [ {'name': 'input', 'unit': 'tonne'}, {'name': 'output', 'unit': 'pound'} ]} """ for ds in db: if "unit" in ds: ds["unit"] = normalize_units_function(ds["unit"]) for exc in ds.get("exchanges", []): if "unit" in exc: exc["unit"] = normalize_units_function(exc["unit"]) if "reference unit" in exc: exc["reference unit"] = normalize_units_function(exc["reference unit"]) for param in ds.get("parameters", {}).values(): if "unit" in param: param["unit"] = normalize_units_function(param["unit"]) return db
[docs] def add_database_name(db: List[dict], name: str) -> List[dict]: """ Adds a database name to each dataset in a list of datasets. Parameters ---------- db : list[dict] The list of datasets to add the database name to. name : str The name of the database to be added to each dataset. Returns ------- list[dict] The updated list of datasets with the database name added to each dataset. Examples -------- >>> db = [{"id": 1, "name": "A"}, {"id": 2, "name": "B"}] >>> add_database_name(db, "X") [{'id': 1, 'name': 'A', 'database': 'X'}, {'id': 2, 'name': 'B', 'database': 'X'}] An empty list input returns an empty list. >>> add_database_name([], "Y") [] """ for ds in db: ds["database"] = name return db
[docs] def convert_uncertainty_types_to_integers(db: List[dict]) -> List[dict]: """ Convert uncertainty types in a list of datasets to integers. Parameters ---------- db : list[dict] The list of datasets containing uncertainty types to convert. Returns ------- list[dict] The updated list of datasets with uncertainty types converted to integers where possible. Examples -------- >>> db = [{"name": "A", "exchanges": [{"uncertainty type": "triangular"}]}, {"name": "B", "exchanges": [{"uncertainty type": "lognormal"}]}] >>> convert_uncertainty_types_to_integers(db) [{'name': 'A', 'exchanges': [{'uncertainty type': 'triangular'}]}, {'name': 'B', 'exchanges': [{'uncertainty type': 'lognormal'}]}] Float values are rounded down to integers. >>> db = [{"name": "C", "exchanges": [{"uncertainty type": "1"}, {"uncertainty type": "2.0"}]}] >>> convert_uncertainty_types_to_integers(db) [{'name': 'C', 'exchanges': [{'uncertainty type': 1}, {'uncertainty type': 2}]}] """ for ds in db: for exc in ds["exchanges"]: try: exc["uncertainty type"] = int(exc["uncertainty type"]) except: pass return db
[docs] def drop_falsey_uncertainty_fields_but_keep_zeros(db: List[dict]) -> List[dict]: """ Drop uncertainty fields that are falsey (e.g. '', None, False) but keep zero and NaN. Note that this function doesn't strip `False`, which behaves exactly like 0. Parameters ---------- db : list[dict] The list of datasets to drop uncertainty fields from. Returns ------- list[dict] The updated list of datasets with falsey uncertainty fields dropped. Examples -------- >>> db = [{"name": "A", "exchanges": [{"amount": 1, "minimum": 0, "maximum": None, "shape": ""}]}] >>> drop_falsey_uncertainty_fields_but_keep_zeros(db) [{'name': 'A', 'exchanges': [{'amount': 1, 'minimum': 0}]}] Float values of NaN are kept in the dictionary. >>> db = [{"name": "B", "exchanges": [{"loc": 0.0, "scale": 0.5, "minimum": float('nan')},... {"loc": 0.0, "scale": 0.5}]}] >>> drop_falsey_uncertainty_fields_but_keep_zeros(db) [{'name': 'B', 'exchanges': [{'loc': 0.0, 'scale': 0.5, 'minimum': nan},{'loc': 0.0, 'scale': 0.5}]}] """ uncertainty_fields = [ "minimum", "maximum", "scale", "shape", "loc", ] def drop_if_appropriate(exc): for field in uncertainty_fields: if field not in exc or exc[field] == 0: continue elif isinstance(exc[field], numbers.Number) and np.isnan(exc[field]): continue elif not exc[field]: del exc[field] for ds in db: for exc in ds["exchanges"]: drop_if_appropriate(exc) return db
[docs] def convert_activity_parameters_to_list(data: List[dict]) -> List[dict]: """ " Convert activity parameters from a dictionary to a list of dictionaries. Parameters ---------- data : list[dict] The list of activities to convert parameters from. Returns ------- list[dict] The updated list of activities with parameters converted to a list of dictionaries. Examples -------- >>> data = [{"name": "A", "parameters": {"param1": 1, "param2": 2}}, {"name": "B", "parameters": {"param3": 3, "param4": 4}}] >>> convert_activity_parameters_to_list(data) [{'name': 'A', 'parameters': [{'name': 'param1', 1}, {'name': 'param2', 2}]}, {'name': 'B', 'parameters': [{'name': 'param3', 3}, {'name': 'param4', 4}]}] Activities without parameters remain unchanged. >>> data = [{"name": "C"}] >>> convert_activity_parameters_to_list(data) [{'name': 'C'}] """ def _(key, value): dct = deepcopy(value) dct["name"] = key return dct for ds in data: if "parameters" in ds: ds["parameters"] = [_(x, y) for x, y in ds["parameters"].items()] return data
[docs] def split_exchanges( data: List[dict], filter_params: dict, changed_attributes: List[dict], allocation_factors: Optional[List[float]] = None, ) -> List[dict]: """ Split unlinked exchanges in ``data`` which satisfy ``filter_params`` into new exchanges with changed attributes. ``changed_attributes`` is a list of dictionaries with the attributes that should be changed. ``allocation_factors`` is an optional list of floats to allocate the original exchange amount to the respective copies defined in ``changed_attributes``. They don't have to sum to one. If ``allocation_factors`` are not defined, then exchanges are split equally. Resets uncertainty to ``UndefinedUncertainty`` (0). To use this function as a strategy, you will need to curry it first using ``functools.partial``. Parameters ---------- data : list[dict] The list of activities to split exchanges in. filter_params : dict A dictionary of filter parameters to apply to the exchanges that will be split. changed_attributes : list[dict] A list of dictionaries with the attributes that should be changed in the new exchanges. allocation_factors : Optional[List[float]], optional An optional list of floats to allocate the original exchange amount to the respective copies defined in ``changed_attributes``, by default None. If ``allocation_factors`` are not defined, then exchanges are split equally. Returns ------- list[dict] The updated list of activities with exchanges split. Examples -------- >>> data = [{"name": "A", "exchanges": [{"name": "foo", "location": "bar", "amount": 20}, {"name": "food", "location": "bar", "amount": 12}]}] >>> split_exchanges(data, {"name": "foo"}, [{"location": "A"}, {"location": "B", "cat": "dog"}]) [{'name': 'A', 'exchanges': [{'name': 'food', 'location': 'bar', 'amount': 12}, {'name': 'foo', 'location': 'A', 'amount': 12.0, 'uncertainty_type': 0}, {'name': 'foo', 'location': 'B', 'amount': 8.0, 'uncertainty_type': 0, 'cat': 'dog'}]}] >>> data = [{"name": "B", "exchanges": [{"name": "bar", "location": "foo", "amount": 25}, {"name": "bard", "location": "foo", "amount": 13}]}] >>> split_exchanges(data, {"name": "bard", "location": "foo"}, [{"name": "new", "location": "bar"}], [0.3]) [{'name': 'B', 'exchanges': [{'name': 'bar', 'location': 'foo', 'amount': 25}, {'name': 'new', 'location': 'bar', 'amount': 3.9000000000000004, 'uncertainty_type': 0}]}] """ if allocation_factors is None: allocation_factors = [1] * len(changed_attributes) total = sum(allocation_factors) if len(changed_attributes) != len(allocation_factors): raise ValueError( "`changed_attributes` and `allocation_factors` must have same length" ) for ds in data: to_delete, to_add = [], [] for index, exchange in enumerate(ds.get("exchanges", [])): if exchange.get("input"): continue if all(exchange.get(key) == value for key, value in filter_params.items()): to_delete.append(index) for factor, obj in zip(allocation_factors, changed_attributes): exc = deepcopy(exchange) exc["amount"] = exc["amount"] * factor / total exc["uncertainty_type"] = 0 for key, value in obj.items(): exc[key] = value to_add.append(exc) if to_delete: for index in to_delete[::-1]: del ds["exchanges"][index] ds["exchanges"].extend(to_add) return data
[docs] def match_against_top_level_context( data: List[dict], other_db_name: str, fields: List[str] = ["name", "unit", "categories"], kinds: List[str] = labels.biosphere_edge_types, ) -> List[dict]: """ For unlinked edges with a `categories` context `('a', 'b', ...)`, try to match against flows in `other_db_name` with `categories` context `('a',)`. To use this function as a strategy, you will need to curry it first using ``functools.partial``. Parameters ---------- data : list[dict] The list of activities to split exchanges in. other_db_name : str The name of the database with flows to link to. fields : list[str] List of field names to use when determining if there is a match kinds : list[str] Try to match exchanges with these `type` values """ if other_db_name not in databases: raise StrategyError(f"Can't find other database {other_db_name}") if "categories" not in fields: raise StrategyError("`fields` must include `categories`") ffields = [field for field in fields if field != "categories"] mapping = { tuple( [obj.get(field) for field in ffields] + [tuple(obj.get("categories", []))] ): obj.key for obj in Database(other_db_name) } for ds in data: for exc in filter( lambda x: "input" not in x and x.get("type") in kinds, ds.get("exchanges", []), ): if "categories" not in exc: continue context = deepcopy(list(exc.get("categories", []))) while context: try: exc["input"] = mapping[ tuple([exc.get(field) for field in ffields] + [tuple(context)]) ] except KeyError: pass context = context[:-1] return data
[docs] def match_against_only_available_in_given_context_tree( data: List[dict], other_db_name: str, fields: List[str] = ["name", "unit", "categories"], kinds: List[str] = labels.biosphere_edge_types, ) -> List[dict]: """ For unlinked edges with a `categories` context `('a', 'b', ...)`, try to match against flows in `other_db_name` with `categories` context `('a', 'c'')` if that flow is the only one available in `other_db_name` within the context tree `('a',)`. To use this function as a strategy, you will need to curry it first using ``functools.partial``. Parameters ---------- data : list[dict] The list of activities to split exchanges in. other_db_name : str The name of the database with flows to link to. fields : list[str] List of field names to use when determining if there is a match kinds : list[str] Try to match exchanges with these `type` values """ if other_db_name not in databases: raise StrategyError(f"Can't find other database {other_db_name}") if "categories" not in fields: raise StrategyError("`fields` must include `categories`") ffields = [field for field in fields if field != "categories"] mapping_draft = defaultdict(list) for obj in Database(other_db_name): if not obj.get("categories"): continue mapping_draft[ tuple([obj.get(field) for field in ffields] + [obj["categories"][0]]) ].append(obj.key) mapping = {key: value[0] for key, value in mapping_draft.items() if len(value) == 1} for ds in data: for exc in filter( lambda x: "input" not in x and x.get("type") in kinds, ds.get("exchanges", []), ): if "categories" not in exc: continue try: exc["input"] = mapping[ tuple( [exc.get(field) for field in ffields] + [exc["categories"][0]] ) ] except KeyError: continue return data