Source code for bw2io.importers.json_ld

from functools import partial

from bw2data import Database, config

from ..errors import NonuniqueCode
from ..extractors.json_ld import JSONLDExtractor
from ..strategies import (
    add_database_name,
    json_ld_add_activity_unit,
    json_ld_add_products_as_activities,
    json_ld_allocate_datasets,
    json_ld_convert_unit_to_reference_unit,
    json_ld_fix_process_type,
    json_ld_get_activities_list_from_rawdata,
    json_ld_get_normalized_exchange_locations,
    json_ld_get_normalized_exchange_units,
    json_ld_label_exchange_type,
    json_ld_location_name,
    json_ld_prepare_exchange_fields_for_linking,
    json_ld_remove_fields,
    json_ld_rename_metadata_fields,
    link_iterable_by_fields,
    normalize_units,
)
from .base_lci import LCIImporter


[docs] class JSONLDImporter(LCIImporter): """ Importer for the `OLCD JSON-LD data format <https://github.com/GreenDelta/olca-schema>`__. See `discussion with linked issues here <https://github.com/brightway-lca/brightway2-io/issues/15>`__. """
[docs] format = "OLCA JSON-LD"
[docs] extractor = JSONLDExtractor
def __init__(self, dirpath, database_name, preferred_allocation=None):
[docs] self.data = self.extractor.extract(dirpath)
[docs] self.db_name = database_name
[docs] self._biosphere_database_warned = False
[docs] self.biosphere_database = self.flows_as_biosphere_database( self.data, database_name )
[docs] self.products = self.flows_as_products(self.data)
[docs] self.strategies = [ partial( json_ld_allocate_datasets, preferred_allocation=preferred_allocation ), json_ld_get_normalized_exchange_locations, # Transform uncertainties json_ld_convert_unit_to_reference_unit, json_ld_get_activities_list_from_rawdata, partial(json_ld_add_products_as_activities, products=self.products), json_ld_get_normalized_exchange_units, json_ld_add_activity_unit, json_ld_rename_metadata_fields, json_ld_location_name, json_ld_remove_fields, json_ld_fix_process_type, json_ld_label_exchange_type, json_ld_prepare_exchange_fields_for_linking, partial(add_database_name, name=database_name), partial( link_iterable_by_fields, fields=["code"], edge_kinds=["production", "technosphere"], internal=True, ), partial( link_iterable_by_fields, other=self.biosphere_database, fields=["code"], edge_kinds=["biosphere"], ), normalize_units, ]
[docs] def apply_strategies(self, *args, **kwargs): no_warning = kwargs.pop("no_warning") if "no_warning" in kwargs else False super().apply_strategies(*args, **kwargs) if self.biosphere_database and not self._biosphere_database_warned: if not no_warning: MESSAGE = """\n\tCreated {} biosphere flows in separate database '{}'.\n\tUse either `.merge_biosphere_flows()` or `.write_separate_biosphere_database()` to write these flows.""" print( MESSAGE.format( len(self.biosphere_database), self.biosphere_database[0]["database"], ) ) self._biosphere_database_warned = True
[docs] def merge_biosphere_flows(self): """Add flows in ``self.biosphere_database`` to ``self.data``.""" old_db = self.biosphere_database[0]["database"] num_flows = len(self.biosphere_database) bio_keys = {(self.db_name, obj["code"]) for obj in self.biosphere_database} act_keys = {(self.db_name, obj["code"]) for obj in self.data} if bio_keys.intersection(act_keys): raise NonuniqueCode( "Can't merge biosphere flows to main database due to duplicate codes:\n\t{}\n\nFix these codes or use `.write_separate_biosphere_database()`".format( bio_keys.intersection(act_keys) ) ) for obj in self.biosphere_database: obj["database"] = self.db_name for obj in self.data: for exc in obj.get("exchanges"): if exc.get("input") and exc["input"][0] == old_db: exc["input"] = (self.db_name, exc["input"][1]) self.data.extend(self.biosphere_database) self.biosphere_database = [] print("Moved {} biosphere flows to `self.data`".format(num_flows))
[docs] def write_separate_biosphere_database(self): db_name = self.biosphere_database[0]["database"] self.write_database(data=self.biosphere_database, db_name=db_name)
[docs] def flows_as_biosphere_database(self, data, database_name, suffix=" biosphere"): # def boolcheck(lst): # return tuple([elem for elem in lst if elem is not None]) # category_mapping = { # obj["@id"]: boolcheck( # obj.get("category", {}).get("categoryPath", []) # + [obj.get("category", {}).get("name")] # + [obj["name"]] # ) # for obj in data["categories"].values() # } return [ { "code": obj["@id"], "name": obj["name"], "categories": obj.get("category", "Unknown").split("/"), "CAS number": obj.get("cas"), "database": database_name + suffix, "exchanges": [], "unit": "", "type": "emission", } for obj in data["flows"].values() if obj["flowType"] == "ELEMENTARY_FLOW" ]
[docs] def flows_as_products(self, data): # def boolcheck(lst): # return tuple([elem for elem in lst if elem is not None]) # category_mapping = { # obj["@id"]: boolcheck( # obj.get("category", {}).get("categoryPath", []) # + [obj.get("category", {}).get("name")] # + [obj["name"]] # ) # for obj in data["categories"].values() # } return [ { "code": obj["@id"], "name": obj["name"], "categories": obj.get("category", "Unknown").split("/"), "location": obj["location"]["name"] if "location" in obj else None, "exchanges": [], "unit": "", "type": "product", } for obj in data["flows"].values() if obj["flowType"] == "PRODUCT_FLOW" ]