Source code for bw2io.strategies.ecospold2

import math
import warnings

from bw2data import Database
from bw2data.logs import close_log, get_io_logger
from stats_arrays import LognormalUncertainty, UndefinedUncertainty

from ..utils import es2_activity_hash, format_for_logging
from .migrations import migrate_exchanges, migrations






[docs] def remove_zero_amount_coproducts(db): """ Iterate through datasets in the given database. Filter out coproducts with zero production amounts from the 'exchanges' list of each dataset. Return the updated list of datasets. Parameters ---------- db : list A list of datasets, where each dataset is a dictionary containing an 'exchanges' key with a list of exchange dictionaries. The structure of a dataset is as follows: { "exchanges": [ { "type": "production" or "non-production", "amount": float, }, ... ] } Returns ------- list The updated list of datasets with coproducts with zero production amounts removed from the 'exchanges' list. Examples -------- >>> db = [ ... { ... "exchanges": [ ... {"type": "production", "amount": 0}, ... {"type": "production", "amount": 5}, ... {"type": "non-production", "amount": 0}, ... ] ... } ... ] >>> remove_zero_amount_coproducts(db) [ { "exchanges": [ {"type": "production", "amount": 5}, {"type": "non-production", "amount": 0}, ] } ] """ for ds in db: ds["exchanges"] = [ exc for exc in ds["exchanges"] if (exc["type"] != "production" or exc["amount"]) ] return db
[docs] def remove_zero_amount_inputs_with_no_activity(db): """ Filter out technosphere exchanges with zero amounts and no uncertainty from the 'exchanges' list of each dataset in the given database. These exchanges are the result of the ecoinvent linking algorithm and can be safely discarded. Parameters ---------- db : list A list of datasets, where each dataset is a dictionary containing an 'exchanges' key with a list of exchange dictionaries. The structure of a dataset is as follows: { "exchanges": [ { "uncertainty type": int, "amount": float, "type": "technosphere", }, ... ] } Returns ------- list The updated list of datasets with technosphere exchanges with zero amounts and no uncertainty removed from the 'exchanges' list. Examples -------- >>> db = [ ... { ... "exchanges": [ ... {"uncertainty type": UndefinedUncertainty.id, "amount": 0, "type": "technosphere"}, ... {"uncertainty type": UndefinedUncertainty.id, "amount": 5, "type": "technosphere"}, ... {"uncertainty type": 2, "amount": 0, "type": "technosphere"}, ... ] ... } ... ] >>> remove_zero_amount_inputs_with_no_activity(db) [ { "exchanges": [ {"uncertainty type": UndefinedUncertainty.id, "amount": 5, "type": "technosphere"}, {"uncertainty type": 2, "amount": 0, "type": "technosphere"}, ] } ] """ for ds in db: ds["exchanges"] = [ exc for exc in ds["exchanges"] if not ( exc["uncertainty type"] == UndefinedUncertainty.id and exc["amount"] == 0 and exc["type"] == "technosphere" ) ] return db
[docs] def remove_unnamed_parameters(db): """ Iterate through datasets in the given database and remove unnamed parameters from the 'parameters' dictionary of each dataset. Unnamed parameters can't be used in formulas or referenced. Parameters ---------- db : list List of datasets, each as a dictionary containing a 'parameters' key with a dictionary of parameter name-value pairs. The structure of a dataset is as follows: { "parameters": { "parameter_name": {"value": parameter_value, "unnamed": boolean}, ... } } Returns ------- list Updated list of datasets with unnamed parameters removed from the 'parameters' dictionary. Examples -------- >>> db = [ ... { ... "parameters": { ... "named_param": {"value": 42}, ... "unnamed_param": {"value": 10, "unnamed": True}, ... } ... } ... ] >>> remove_unnamed_parameters(db) [ { "parameters": { "named_param": {"value": 42}, } } ] """ for ds in db: if "parameters" in ds: ds["parameters"] = { key: value for key, value in ds["parameters"].items() if not value.get("unnamed") } return db
[docs] def es2_assign_only_product_with_amount_as_reference_product(db): """ If a multioutput process has one product with a non-zero amount, this function assigns that product as the reference product. This is typically called after `remove_zero_amount_coproducts`, which will delete the zero-amount coproducts. However, the zero-amount logic is still kept in case users want to keep all coproducts. Parameters ---------- db : list A list of datasets, where each dataset is a dictionary containing an 'exchanges' key with a list of exchange dictionaries. The structure of a dataset is as follows: { "exchanges": [ { "type": "production", "amount": float, "name": str, "flow": str, "unit": str, }, ... ] } Returns ------- list The updated list of datasets with the non-zero amount product assigned as the reference product for multioutput processes. Examples -------- >>> db = [ ... { ... "exchanges": [ ... {"type": "production", "amount": 0, "name": "A", "flow": "flow_A", "unit": "kg"}, ... {"type": "production", "amount": 5, "name": "B", "flow": "flow_B", "unit": "kg"}, ... ] ... } ... ] >>> es2_assign_only_product_with_amount_as_reference_product(db) [ { "exchanges": [ {"type": "production", "amount": 0, "name": "A", "flow": "flow_A", "unit": "kg"}, {"type": "production", "amount": 5, "name": "B", "flow": "flow_B", "unit": "kg"}, ], "reference product": "B", "flow": "flow_B", "unit": "kg", "production amount": 5, } ] """ for ds in db: amounted = [ prod for prod in ds["exchanges"] if prod["type"] == "production" and prod["amount"] ] # OK if it overwrites existing reference product; need flow as well if len(amounted) == 1: ds["reference product"] = amounted[0]["name"] ds["flow"] = amounted[0]["flow"] if not ds.get("unit"): ds["unit"] = amounted[0]["unit"] ds["production amount"] = amounted[0]["amount"] return db
[docs] def assign_single_product_as_activity(db): """ Assign the activity of a dataset to the 'activity' field of the production exchange for datasets with only one production exchange. Parameters ---------- db : list A list of datasets, where each dataset is a dictionary containing an 'exchanges' key with a list of exchange dictionaries and an 'activity' key with the activity name. The dataset dictionary has the following structure: dataset: dict = { "activity": "activity_A", "exchanges": [ {"type": "production", "name": "product_A"}, {"type": "non-production", "name": "input_A"}, ], } Returns ------- list The updated list of datasets with the activity assigned to the single production exchange. Examples -------- >>> db = [ ... { ... "activity": "activity_A", ... "exchanges": [ ... {"type": "production", "name": "product_A"}, ... {"type": "non-production", "name": "input_A"}, ... ], ... } ... ] >>> assign_single_product_as_activity(db) [ { "activity": "activity_A", "exchanges": [ {"type": "production", "name": "product_A", "activity": "activity_A"}, {"type": "non-production", "name": "input_A"}, ], } ] """ for ds in db: prod_exchanges = [ exc for exc in ds.get("exchanges") if exc["type"] == "production" ] # raise ValueError if len(prod_exchanges) == 1: prod_exchanges[0]["activity"] = ds["activity"] return db
[docs] def create_composite_code(db): """ Generate a composite code for each dataset in the given database using the activity and flow names. Assign the composite code to the 'code' field of the dataset. Parameters ---------- db : list A list of datasets, where each dataset is a dictionary containing 'activity' and 'flow' keys with their respective names. The dataset dictionary has the following structure: dataset: dict = { "activity": "activity_A", "flow": "flow_A", } Returns ------- list The updated list of datasets with the composite code assigned to the 'code' field. Examples -------- >>> db = [ ... { ... "activity": "activity_A", ... "flow": "flow_A", ... } ... ] >>> create_composite_code(db) [ { "activity": "activity_A", "flow": "flow_A", "code": es2_activity_hash("activity_A", "flow_A"), } ] """ for ds in db: ds["code"] = es2_activity_hash(ds["activity"], ds["flow"]) return db
[docs] def delete_exchanges_missing_activity(db): """ Remove exchanges that are missing the "activityLinkId" attribute and have flows that are not produced as the reference product of any activity. See the `known data issues <http://www.ecoinvent.org/database/ecoinvent-version-3/reports-of-changes/known-data-issues/>`__ report. Parameters ---------- db : list A list of datasets, where each dataset is a dictionary containing an 'exchanges' key with a list of exchange dictionaries. The dataset dictionary has a nested structure for the 'exchanges' key, as follows: dataset: dict = { "filename": "file_A", "exchanges": [ { "type": "technosphere", "name": "unlinked_exchange", }, ... ], } Returns ------- list The updated list of datasets with unlinked exchanges removed. Examples -------- >>> db = [ ... { ... "filename": "file_A", ... "exchanges": [ ... { ... "type": "technosphere", ... "name": "unlinked_exchange", ... }, ... { ... "type": "technosphere", ... "name": "linked_exchange", ... "input": ("db_A", "code_A"), ... }, ... ], ... } ... ] >>> delete_exchanges_missing_activity(db) [ { "filename": "file_A", "exchanges": [ { "type": "technosphere", "name": "linked_exchange", "input": ("db_A", "code_A"), }, ], } ] """ log, logfile = get_io_logger("Ecospold2-import-error") count = 0 for ds in db: exchanges = ds.get("exchanges", []) if not exchanges: continue skip = [] for exc in exchanges: if exc.get("input"): continue if not exc.get("activity") and exc["type"] in { "technosphere", "production", "substitution", }: log.critical( "Purging unlinked exchange:\nFilename: {}\n{}".format( ds["filename"], format_for_logging(exc) ) ) count += 1 skip.append(exc) ds["exchanges"] = [exc for exc in exchanges if exc not in skip] close_log(log) if count: print( ( "{} exchanges couldn't be linked and were deleted. See the " "logfile for details:\n\t{}" ).format(count, logfile) ) return db
[docs] def delete_ghost_exchanges(db): """ Remove ghost exchanges from the given database. A ghost exchange is one that links to a combination of activity and flow which aren't provided in the database. Parameters ---------- db : list A list of datasets, where each dataset is a dictionary containing an 'exchanges' key with a list of exchange dictionaries. The dataset dictionary has a nested structure for the 'exchanges' key, as follows: dataset: dict = { "filename": "file_A", "exchanges": [ { "type": "technosphere", "name": "ghost_exchange", }, ... ], } Returns ------- list The updated list of datasets with ghost exchanges removed. Examples -------- >>> db = [ ... { ... "filename": "file_A", ... "exchanges": [ ... { ... "type": "technosphere", ... "name": "ghost_exchange", ... }, ... { ... "type": "technosphere", ... "name": "linked_exchange", ... "input": ("db_A", "code_A"), ... }, ... ], ... } ... ] >>> delete_ghost_exchanges(db) [ { "filename": "file_A", "exchanges": [ { "type": "technosphere", "name": "linked_exchange", "input": ("db_A", "code_A"), }, ], } ] """ log, logfile = get_io_logger("Ecospold2-import-error") count = 0 for ds in db: exchanges = ds.get("exchanges", []) if not exchanges: continue skip = [] for exc in exchanges: if exc.get("input") or exc.get("type") != "technosphere": continue log.critical( "Purging unlinked exchange:\nFilename: {}\n{}".format( ds["filename"], format_for_logging(exc) ) ) count += 1 skip.append(exc) ds["exchanges"] = [exc for exc in exchanges if exc not in skip] close_log(log) if count: print( ( "{} exchanges couldn't be linked and were deleted. See the " "logfile for details:\n\t{}" ).format(count, logfile) ) return db
[docs] def remove_uncertainty_from_negative_loss_exchanges(db): """ Address cases where basic uncertainty and pedigree matrix are applied blindly, producing strange net production values. Assume these loss factors are static and only apply to exchanges that decrease net production. Parameters ---------- db : list A list of datasets, where each dataset is a dictionary containing an 'exchanges' key with a list of exchange dictionaries. The structure of a dataset is as follows: { "exchanges": [ { "type": str, "name": str, "amount": float, "uncertainty type": int, "loc": float, "scale": float, }, ... ] } Returns ------- list The updated list of datasets with uncertainty removed from negative lognormal exchanges. Examples -------- >>> db = [ ... { ... "exchanges": [ ... { ... "type": "production", ... "name": "product_A", ... "amount": 10, ... }, ... { ... "type": "technosphere", ... "name": "product_A", ... "amount": -2, ... "uncertainty type": 2, ... "loc": -2, ... "scale": 0.1, ... }, ... ], ... } ... ] >>> remove_uncertainty_from_negative_loss_exchanges(db) [ { "exchanges": [ { "type": "production", "name": "product_A", "amount": 10, }, { "type": "technosphere", "name": "product_A", "amount": -2, "uncertainty type": 0, "loc": -2, }, ], } ] Notes -------- There are 15699 of these in ecoinvent 3.3 cutoff. """ for ds in db: production_names = { exc["name"] for exc in ds.get("exchanges", []) if exc["type"] == "production" } for exc in ds.get("exchanges", []): if ( exc["amount"] < 0 and exc["uncertainty type"] == LognormalUncertainty.id and exc["name"] in production_names ): exc["uncertainty type"] = UndefinedUncertainty.id exc["loc"] = exc["amount"] del exc["scale"] return db
[docs] def set_lognormal_loc_value(db): """ Ensure loc value is correct for lognormal uncertainty distributions. Parameters ---------- db : list A list of datasets, where each dataset is a dictionary containing an 'exchanges' key with a list of exchange dictionaries. The structure of a dataset is as follows: { "exchanges": [ { "type": str, "name": str, "amount": float, "uncertainty type": int, "loc": float, "scale": float, }, ... ] } Returns ------- list The updated list of datasets with correct lognormal uncertainty distribution loc values. Examples -------- >>> import math >>> db = [ ... { ... "exchanges": [ ... { ... "type": "technosphere", ... "name": "input_A", ... "amount": 5, ... "uncertainty type": 2, ... "loc": 1, ... "scale": 0.5, ... }, ... ], ... } ... ] >>> set_lognormal_loc_value(db) [ { "exchanges": [ { "type": "technosphere", "name": "input_A", "amount": 5, "uncertainty type": 2, "loc": math.log(5), "scale": 0.5, }, ], } ] """ for ds in db: for exc in ds.get("exchanges", []): if exc["uncertainty type"] == LognormalUncertainty.id: exc["loc"] = math.log(abs(exc["amount"])) return db
[docs] def reparametrize_lognormal_to_agree_with_static_amount(db): """ For lognormal distributions, choose the mean of the underlying normal distribution (loc) such that the expected value (mean) of the resulting distribution is equal to the (static) amount defined for the exchange. Parameters ---------- db : list A list of datasets, where each dataset is a dictionary containing an 'exchanges' key with a list of exchange dictionaries. The structure of a dataset is as follows: { "exchanges": [ { "type": str, "name": str, "amount": float, "uncertainty type": int, "loc": float, "scale": float, }, ... ] } Returns ------- list The updated list of datasets with adjusted lognormal uncertainty distribution loc values. Examples -------- >>> import math >>> db = [ ... { ... "exchanges": [ ... { ... "type": "technosphere", ... "name": "input_A", ... "amount": 5, ... "uncertainty type": 2, ... "loc": 1, ... "scale": 0.5, ... }, ... ], ... } ... ] >>> reparametrize_lognormals_to_agree_with_static_amount(db) [ { "exchanges": [ { "type": "technosphere", "name": "input_A", "amount": 5, "uncertainty type": 2, "loc": math.log(5) - 0.5**2 / 2, "scale": 0.5, }, ], } ] """ for ds in db: for exc in ds.get("exchanges", []): if exc["uncertainty type"] == LognormalUncertainty.id: exc["loc"] = math.log(abs(exc["amount"])) - exc["scale"] ** 2 / 2 return db
[docs] def fix_unreasonably_high_lognormal_uncertainties(db, cutoff=2.5, replacement=0.25): """ Replace unreasonably high lognormal uncertainties in the given database with a specified replacement value. With the default cutoff value of 2.5 and a median of 1, the 95% confidence interval has a high to low ratio of 20.000. Parameters ---------- db : list A list of datasets, where each dataset is a dictionary containing an 'exchanges' key with a list of exchange dictionaries. The dataset dictionary has a nested structure for the 'exchanges' key, as follows: dataset: dict = { "exchanges": [ { "type": "technosphere", "name": "input_A", "amount": 5, "uncertainty type": 2, "loc": 5, "scale": 3, }, ... ], } cutoff : float, optional The cutoff value above which an uncertainty value is considered unreasonably high (default is 2.5). replacement : float, optional The replacement value for unreasonably high uncertainties (default is 0.25). Returns ------- list The updated list of datasets with unreasonably high uncertainties fixed. Examples -------- >>> db = [ ... { ... "exchanges": [ ... { ... "type": "technosphere", ... "name": "input_A", ... "amount": 5, ... "uncertainty type": 2, ... "loc": 5, ... "scale": 3, ... }, ... ], ... } ... ] >>> fix_unreasonably_high_lognormal_uncertainties(db) [ { "exchanges": [ { "type": "technosphere", "name": "input_A", "amount": 5, "uncertainty type": 2, "loc": 5, "scale": 0.25, }, ], } ] """ for ds in db: for exc in ds.get("exchanges", []): if exc["uncertainty type"] == LognormalUncertainty.id: if exc["scale"] > cutoff: exc["scale"] = replacement return db
[docs] def fix_ecoinvent_flows_pre35(db): """ Apply the 'fix-ecoinvent-flows-pre-35' migration to the given database if available; otherwise, raise a warning and return the unmodified database. Parameters ---------- db : list A list of datasets, where each dataset is a dictionary containing an 'exchanges' key with a list of exchange dictionaries. The dataset dictionary has a nested structure for the 'exchanges' key, as follows: dataset: dict = { "exchanges": [ { "type": "technosphere", "name": "input_A", "amount": 5, }, ... ], } Returns ------- list The updated list of datasets with ecoinvent flows fixed, or the original list of datasets if the migration is not available. Examples -------- >>> db = [ ... { ... "exchanges": [ ... { ... "type": "technosphere", ... "name": "input_A", ... "amount": 5, ... }, ... ], ... } ... ] >>> fix_ecoinvent_flows_pre35(db) [ { "exchanges": [ { "type": "technosphere", "name": "input_A", "amount": 5, }, ], } ] """ if "fix-ecoinvent-flows-pre-35" in migrations: return migrate_exchanges(db, "fix-ecoinvent-flows-pre-35") else: warnings.warn( ( "Skipping migration 'fix-ecoinvent-flows-pre-35' " "because it isn't installed" ) ) return db
[docs] def drop_temporary_outdated_biosphere_flows(db): """ Removes exchanges with specific temporary biosphere flow names from the given database. Drop biosphere exchanges which aren't used and are outdated. Parameters ---------- db : list A list of datasets, where each dataset is a dictionary containing an 'exchanges' key with a list of exchange dictionaries. The structure of a dataset is as follows: { "exchanges": [ { "type": str, "name": str, "amount": float, }, ... ] } Returns ------- list The updated list of datasets with outdated temporary biosphere exchanges removed. Examples -------- >>> db = [ ... { ... "exchanges": [ ... { ... "type": "biosphere", ... "name": "Fluorene_temp", ... "amount": 5, ... }, ... { ... "type": "biosphere", ... "name": "valid_biosphere_flow", ... "amount": 10, ... }, ... ], ... } ... ] >>> drop_temporary_outdated_biosphere_flows(db) [ { "exchanges": [ { "type": "biosphere", "name": "valid_biosphere_flow", "amount": 10, }, ], } ] """ names = { "Fluorene_temp", "Fluoranthene_temp", "Dibenz(a,h)anthracene_temp", "Benzo(k)fluoranthene_temp", "Benzo(ghi)perylene_temp", "Benzo(b)fluoranthene_temp", "Benzo(a)anthracene_temp", "Acenaphthylene_temp", "Chrysene_temp", "Pyrene_temp", "Phenanthrene_temp", "Indeno(1,2,3-c,d)pyrene_temp", } for ds in db: ds["exchanges"] = [ obj for obj in ds["exchanges"] if not (obj.get("name") in names and obj.get("type") == "biosphere") ] return db
[docs] def add_cpc_classification_from_single_reference_product(db): """ Add CPC classification to a dataset's classifications if it has only one reference product with a CPC classification. Parameters ---------- db : list A list of datasets, where each dataset is a dictionary containing an 'exchanges' key with a list of exchange dictionaries and a 'classifications' key with a list of classification tuples. The dataset dictionary has a nested structure for the 'exchanges' key, as follows: dataset: dict = { "exchanges": [ { "type": "production", "classifications": {"CPC": ["code"]}, }, ... ], "classifications": [], } The classifications dictionnary may have as values lists or single strings. Returns ------- list The updated list of datasets with CPC classification added to datasets from their single reference product. Examples -------- >>> db = [ ... { ... "classifications": [], ... "exchanges": [ ... { ... "type": "production", ... "classifications": {"CPC": ["code"]}, ... }, ... ], ... } ... ] >>> add_cpc_classification_from_single_reference_product(db) [ { "classifications": [("CPC", "code")], "exchanges": [ { "type": "production", "classifications": {"CPC": ["code"]}, }, ], } ] """ def has_cpc(exc): return ( "classifications" in exc and "CPC" in exc["classifications"] and exc["classifications"]["CPC"] ) for ds in db: assert "classifications" in ds products = [exc for exc in ds["exchanges"] if exc["type"] == "production"] if len(products) == 1 and has_cpc(products[0]): if isinstance(products[0]["classifications"]["CPC"], list): cpc_classif = products[0]["classifications"]["CPC"][0] else: cpc_classif = products[0]["classifications"]["CPC"] ds["classifications"].append(("CPC", cpc_classif)) return db
[docs] def delete_none_synonyms(db): """ Remove `None` values from the 'synonyms' list of each dataset. Parameters ---------- db : list A list of datasets, where each dataset is a dictionary containing a 'synonyms' key with a list of synonyms. The dataset dictionary has a nested structure for the 'parameters' key, as follows: dataset: dict = { "parameters": { "parameter1": {"synonyms": ["synonym1", None, "synonym2"]}, "parameter2": {"synonyms": ["synonym3", "synonym4"]}, ... } } Returns ------- list The updated list of datasets with None values removed from the 'synonyms' list. Examples -------- >>> db = [ ... { ... "parameters": { ... "parameter1": {"synonyms": ["synonym1", None, "synonym2"]}, ... "parameter2": {"synonyms": ["synonym3", "synonym4"]}, ... } ... }, ... ] >>> delete_none_synonyms(db) [ { "parameters": { "parameter1": {"synonyms": ["synonym1", "synonym2"]}, "parameter2": {"synonyms": ["synonym3", "synonym4"]}, } }, ] """ for ds in db: ds["synonyms"] = [s for s in ds["synonyms"] if s is not None] return db
[docs] def update_social_flows_in_older_consequential(db, biosphere_db): """ Update the UUIDs of specific biosphere flows with the category 'social' in older consequential datasets. These flows are not used, and their UUIDs change with each release. The ecoinvent centre recommends dropping them, but this function replaces their UUIDs instead. Parameters ---------- db : list A list of datasets, where each dataset is a dictionary containing an 'exchanges' key with a list of exchange dictionaries. These datasets represent the main data to be updated. biosphere_db : list A list of biosphere datasets, where each dataset is a dictionary containing flow information. These datasets provide the updated UUIDs for the specific social flows. Returns ------- list The updated list of datasets with the UUIDs of the specified social flows replaced. Examples -------- >>> db = [ ... { ... "exchanges": [ ... { ... "name": "residual wood, dry", ... "input": "old_uuid", ... }, ... ], ... }, ... ] >>> biosphere_db = [ ... { ... "name": "residual wood, dry", ... "key": "new_uuid", ... }, ... ] >>> update_social_flows_in_older_consequential(db, biosphere_db) [ { "exchanges": [ { "name": "residual wood, dry", "input": "new_uuid", }, ], }, ] """ FLOWS = { "residual wood, dry", "venting of argon, crude, liquid", "venting of nitrogen, liquid", } cache = {} def get_cache(cache, biosphere_db): for flow in biosphere_db: if flow["name"] in FLOWS: cache[flow["name"]] = flow.key for ds in db: for exc in ds["exchanges"]: if not exc.get("input") and exc["name"] in FLOWS: if not cache: get_cache(cache, biosphere_db) exc["input"] = cache[exc["name"]] return db