Source code for bw_simapro_csv.brightway

import datetime
import itertools
from copy import deepcopy
from typing import Union
from uuid import uuid4

from loguru import logger
from multifunctional import allocation_before_writing

from .blocks import (
    DatabaseCalculatedParameters,
    DatabaseInputParameters,
    LiteratureReference,
    Process,
    ProjectCalculatedParameters,
    ProjectInputParameters,
)
from .main import SimaProCSV

[docs] OPTIONAL_TAG_MAPPING = [ ("Type", "simapro_type"), ("Time period", "time_period"), ("Technology", "technology"), ("Representativeness", "representativeness"), ("Multiple output allocation", "allocation_method"), ("Boundary with nature", "ecosphere_boundary"), ("Category type", "category_type"), ("Substitution allocation", "substitution_method"), ("Cut off rules", "cutoff_rules"), ("Capital goods", "capital_goods"), ("System description", "system_description"), ]
[docs] AVOIDED_PRODUCTS_WARNING = """Processing avoided products block. Please check exchanges with type `substitution` carefully - we don't have data to test this."""
[docs] TECHNOSPHERE_EDGES = ("Materials/fuels", "Electricity/heat")
[docs] BIOSPHERE_EDGES = ( "Economic issues", "Emissions to air", "Emissions to soil", "Emissions to water", "Final waste flows", "Non material emissions", "Resources", "Social issues", )
[docs] OPTIONAL_PROCESS_FIELDS = [ ("Comment", "comment"), ("Generator", "data_generator"), ("Record", "data_entry"), ("External documents", "data_links"), ("Collection method", "collection_method"), ("Verification", "verification"), ("Allocation rules", "allocation_rules"), ]
[docs] def substitute_unspecified(s: Union[str, None]) -> Union[str, None]: if s and isinstance(s, str) and s.lower() == "unspecified": return None return s
[docs] def allocation_as_manual_property(exc: dict) -> dict: """If allocation field is present, add it as manual property as well""" if "allocation" in exc: if "properties" not in exc: exc["properties"] = {} exc["properties"]["manual_allocation"] = exc["allocation"] return exc
[docs] def name_for_process(process: Process, missing_string: str, shorten_names: bool = True) -> str: """Try several ways to generate a sensible name.""" def clean_name(name: str) -> str: """Cleanup awkward name endings if needed.""" name = name.strip() if name.endswith(","): name = name[:-1] return name if given_name := substitute_unspecified(process.parsed["metadata"].get("Process name")): return given_name if "Products" in process.blocks: names = [edge["name"] for edge in process.blocks["Products"].parsed] if len(names) == 1: return names[0] else: return clean_name( "MFP: {}".format( "⧺".join([(name[:25] if shorten_names else name) for name in names]) ) ) if "Waste treatment" in process.blocks: names = [edge["name"] for edge in process.blocks["Waste treatment"].parsed] if len(names) == 1: return names[0] else: return clean_name( "MFP: {}".format( "⧺".join([(name[:25] if shorten_names else name) for name in names]) ) ) return missing_string
[docs] def as_product_dct(edge: dict, node: dict) -> dict: """Take an edge on a node and generate a new product node.""" NODE_ATTRS = ("name", "unit", "simapro_project", "location", "tags", "database", "comment") EDGE_ATTRS = ( "name", "unit", "line_no", "category", "waste_type", "comment", "properties", "simapro_category", ) return ( { "type": "product", "code": uuid4().hex, "reference process": (node["database"], node["code"]), } | {key: node[key] for key in NODE_ATTRS if node.get(key)} | {key: edge[key] for key in EDGE_ATTRS if edge.get(key)} )
[docs] def reference_to_product(process_edge: dict, product: dict) -> dict: """Add explicit link from process edge to new product node""" process_edge["input"] = (product["database"], product["code"]) return process_edge
[docs] def lci_to_brightway( spcsv: SimaProCSV, missing_string: str = "(unknown)", separate_products: bool = False, shorten_names: bool = True, ) -> dict: """Turn an extracted SimaPro CSV extract into metadata that can be imported into Brightway. Doesn't do any normalization or other data changes, just reorganizes the existing data.""" issued_warnings = set() data = { "database": { "name": spcsv.database_name, "simapro_filepath": spcsv.filepath, "simapro_project": spcsv.header.get("project"), "simapro_libraries": spcsv.header.get("libraries"), "simapro_version": spcsv.header.get("simapro_version"), "simapro_csv_version": spcsv.header.get("simapro_csv_version"), "created": spcsv.header["created"].isoformat()[:19], }, "processes": [], # Note reversing of database and project terms here # In SimaPro, the project is lower priority than the database # but in Brightway it's the opposite. "products": [], "project_parameters": [ param for block in spcsv.blocks for param in block.parsed if isinstance(block, (DatabaseCalculatedParameters, DatabaseInputParameters)) ], "database_parameters": [ param for block in spcsv.blocks for param in block.parsed if isinstance(block, (ProjectCalculatedParameters, ProjectInputParameters)) ], } literature_mapping = { obj.parsed["Name"]: obj.parsed for obj in filter(lambda b: isinstance(b, LiteratureReference), spcsv) } known_missing_references = set() for process in filter(lambda b: isinstance(b, Process), spcsv): multifunctional = ( len(process.blocks.get("Products", [])) + len(process.blocks.get("Waste treatment", [])) ) > 1 code = process.parsed["metadata"].get("Process identifier") if not code or not code.strip() or code.strip() in {'""', "''"}: code = uuid4().hex process_dataset = { "database": spcsv.database_name, "simapro_project": substitute_unspecified(spcsv.header["project"]) or missing_string, "code": code, "exchanges": [], "type": "multifunctional" if multifunctional else "process", "name": name_for_process(process, missing_string, shorten_names), "location": substitute_unspecified(process.parsed["metadata"].get("Geography")), "publication_date": ( process.parsed["metadata"].get("Date") or datetime.date.today() ).isoformat()[:19], "tags": {}, } for sp_label, bw_label in OPTIONAL_PROCESS_FIELDS: if val := substitute_unspecified(process.parsed["metadata"].get(sp_label)): process_dataset[bw_label] = val if process.parsed["metadata"].get("Literature references"): process_dataset["references"] = [] for reference in process.parsed["metadata"]["Literature references"]: if reference["reference"] in known_missing_references: continue elif reference["reference"] not in literature_mapping: logger.warning( "Skipping missing reference {r}; not present in given references {g}", r=reference["reference"], g=list(literature_mapping), ) known_missing_references.add(reference["reference"]) else: literature = literature_mapping[reference["reference"]] process_dataset["references"].append( { "year": substitute_unspecified(literature.get("Year")) or missing_string, "authors": substitute_unspecified(literature.get("Authors")) or missing_string, "comment": substitute_unspecified(reference.get("comment")) or missing_string, } | { k.lower().replace(" ", "_"): v for k, v in literature.items() if k != "Name" and v } ) for tag_in, tag_out in OPTIONAL_TAG_MAPPING: if tag_in in process.parsed["metadata"] and substitute_unspecified( process.parsed["metadata"][tag_in] ): process_dataset["tags"][tag_out] = process.parsed["metadata"][tag_in] if "Avoided products" in process.blocks: if AVOIDED_PRODUCTS_WARNING not in issued_warnings: logger.info(AVOIDED_PRODUCTS_WARNING) issued_warnings.add(AVOIDED_PRODUCTS_WARNING) for edge in process.blocks["Avoided products"].parsed: process_dataset["exchanges"].append( edge | {"type": "substitution", "functional": False} ) if "Waste to treatment" in process.blocks: for edge in process.blocks["Waste to treatment"].parsed: process_dataset["exchanges"].append( edge | {"type": "production", "functional": False} ) for label in TECHNOSPHERE_EDGES: if label in process.blocks: for edge in process.blocks[label].parsed: process_dataset["exchanges"].append( edge | {"type": "technosphere", "simapro_category": label} ) for label in BIOSPHERE_EDGES: if label in process.blocks: for edge in process.blocks[label].parsed: process_dataset["exchanges"].append(edge | {"type": "biosphere"}) if "Products" in process.blocks: for edge in process.blocks["Products"].parsed: production_dct = allocation_as_manual_property( edge | {"type": "production", "functional": True} ) if separate_products: product_dct = as_product_dct(production_dct, process_dataset) data["products"].append(product_dct) process_dataset["exchanges"].append( reference_to_product(production_dct, product_dct) ) else: process_dataset["exchanges"].append(production_dct) elif "Waste treatment" in process.blocks: for edge in process.blocks["Waste treatment"].parsed: waste_edge = edge | {"type": "technosphere", "functional": True} if separate_products: waste_dct = as_product_dct(waste_edge, process_dataset) data["products"].append(waste_dct) process_dataset["exchanges"].append(reference_to_product(waste_edge, waste_dct)) else: process_dataset["exchanges"].append(waste_edge) if not any(e for e in process_dataset["exchanges"] if e["type"] == "production"): dummy = deepcopy(edge) dummy.update( { "amount": 0, "type": "production", "functional": False, "comment": "Dummy edge inserted to stop auto-generation of unitary production edge", } ) process_dataset["exchanges"].append(dummy) data["processes"].append(process_dataset) if any( sum(1 for exc in ds.get("exchanges") if exc.get("functional")) > 1 for ds in data["processes"] ): as_dict = allocation_before_writing( {(spcsv.database_name, ds["code"]): ds for ds in data["processes"]}, "manual_allocation" ) for (database, code), ds in as_dict.items(): ds["code"] = code ds["database"] = database data["processes"] = list(as_dict.values()) return data