import functools
import warnings
from io import StringIO
from pathlib import Path
from typing import Optional, Union, Callable, List
from uuid import uuid4
from bw2data import Database, config, databases, get_node, labels
from bw2data.errors import UnknownObject
from bw_simapro_csv import SimaProCSV
from ..strategies import (
change_electricity_unit_mj_to_kwh,
create_products_as_new_nodes,
drop_unspecified_subcategories,
fix_localized_water_flows,
link_iterable_by_fields,
link_technosphere_based_on_name_unit_location,
match_internal_simapro_simapro_with_unit_conversion,
migrate_datasets,
migrate_exchanges,
normalize_simapro_biosphere_categories,
normalize_simapro_biosphere_names,
normalize_simapro_labels_to_brightway_standard,
normalize_units,
override_process_name_using_single_functional_exchange,
set_code_by_activity_hash,
set_metadata_using_single_functional_exchange,
split_simapro_name_geo,
strip_biosphere_exc_locations,
update_ecoinvent_locations,
)
from ..utils import activity_hash
from .base_lci import LCIImporter
[docs]
class SimaProBlockCSVImporter(LCIImporter):
def __init__(
self,
path_or_stream: Union[Path, StringIO],
database_name: Optional[str] = None,
biosphere_database_name: Optional[str] = None,
separate_products: bool = True,
shorten_names: bool = True,
):
spcsv = SimaProCSV(path_or_stream=path_or_stream, database_name=database_name)
data = spcsv.to_brightway(separate_products=separate_products, shorten_names=shorten_names)
[docs]
self.db_name = spcsv.database_name
[docs]
self.default_biosphere_database_name = biosphere_database_name
[docs]
self.data = data["processes"]
if separate_products:
self.data.extend(data["products"])
[docs]
self.database_parameters = data["database_parameters"]
[docs]
self.project_parameters = data["project_parameters"]
[docs]
self.strategies: List[Callable] = [
set_metadata_using_single_functional_exchange]
if not separate_products:
self.strategies.append(override_process_name_using_single_functional_exchange)
self.strategies.extend([
drop_unspecified_subcategories,
split_simapro_name_geo,
])
if not separate_products:
self.strategies.extend(
[
create_products_as_new_nodes,
link_technosphere_based_on_name_unit_location,
]
)
self.strategies.extend(
[
functools.partial(
link_iterable_by_fields,
other=Database(biosphere_database_name or config.biosphere),
edge_kinds=labels.biosphere_edge_types,
fields=("name", "categories", "unit", "location"),
),
match_internal_simapro_simapro_with_unit_conversion,
]
)
[docs]
def create_regionalized_biosphere_proxies(self, database_name: str) -> None:
"""
Create proxy nodes for regionalized biosphere flows in a separate database.
In Brightway, regionalized biosphere flows get their locations from the processes which
produce them. In other systems, this isn't always the case. For example, in SimaPro, you
can have a flow like "Water, Europe".
This method aligns data imports with the Brightway ontology by getting or creating a new
proxy process for "Water", taking place in "Europe", in the database "database_name". This
process has a unitary production exchange, and a biosphere edge to an actual "Water" flow.
This method **assumes you have already found and linked** a suitable biosphere flow - we
need this during proxy node creation.
``database_name`` is the database to store the proxies; it can already exist. It will
use existing proxy nodes if possible.
"""
proxy_db = Database(database_name)
if database_name not in databases:
print(
f"Creating new proxy regionalized biosphere database `{database_name}`"
)
proxy_db.register(
format=self.format,
comment="Database for proxies for regionalized biosphere flows. Generated by `bw2io` method `create_regionalized_biosphere_proxies`",
)
for ds in self.data:
for exc in filter(
lambda x: (
"unit" in x
and "categories" in x
and "location" in x
and "input" in x
and x["type"] in labels.biosphere_edge_types
),
ds.get("exchanges", []),
):
try:
node = get_node(
name=exc["name"],
unit=exc["unit"],
categories=exc["categories"],
location=exc["location"],
database=database_name,
)
except UnknownObject:
node = proxy_db.new_node(
name=exc["name"],
unit=exc["unit"],
categories=exc["categories"],
location=exc["location"],
code=uuid4().hex,
comment="Proxy node created by `create_regionalized_biosphere_proxies`",
)
node.save()
node.new_edge(
type=labels.production_edge_default, amount=1, input=node
).save()
node.new_edge(
type=labels.process_node_default,
amount=1,
input=get_node(database=exc["input"][0], code=exc["input"][1]),
).save()
exc["input"] = node.key
[docs]
def create_technosphere_placeholders(self, database_name: str):
"""Create new placeholder database from unlinked technosphere flows in ``self.data``"""
if database_name in databases:
raise ValueError(f"{database_name} database already exists")
def reformat(exc):
new_exc = exc | {
"type": labels.process_node_default,
"exchanges": [],
"database": database_name,
"code": activity_hash(exc),
}
if not new_exc.get("location"):
# Also update original for correct linking
# Location is required
exc["location"] = new_exc["location"] = "GLO"
return new_exc
proc_data = {
(ds["database"], ds["code"]): ds
for ds in [
reformat(exc)
for ds in self.data
for exc in ds.get("exchanges", [])
if exc["type"] not in labels.biosphere_edge_types
and not exc.get("input")
and not exc.get("functional")
]
}
if not proc_data:
print(
"Skipping placeholder database creation as all technosphere flows are linked"
)
return
print(
f"Creating new placeholder database {database_name} with {len(proc_data)} processes"
)
with warnings.catch_warnings():
warnings.simplefilter("ignore")
new_db = Database(database_name)
new_db.register(
format=self.format,
comment=f"Database for unlinked technosphere flows from {self.db_name}",
)
new_db.write(proc_data)
self.apply_strategies(
[
functools.partial(
link_iterable_by_fields,
fields=["name", "unit", "location"],
other=list(proc_data.values()),
),
]
)
[docs]
def use_ecoinvent_strategies(self) -> None:
"""Switch strategy selection to normalize data to ecoinvent flow lists"""
self.strategies = [
set_metadata_using_single_functional_exchange,
drop_unspecified_subcategories,
normalize_units,
update_ecoinvent_locations,
split_simapro_name_geo,
strip_biosphere_exc_locations,
functools.partial(migrate_datasets, migration="default-units"),
functools.partial(migrate_exchanges, migration="default-units"),
functools.partial(set_code_by_activity_hash, overwrite=True),
change_electricity_unit_mj_to_kwh,
link_technosphere_based_on_name_unit_location,
normalize_simapro_biosphere_categories,
normalize_simapro_biosphere_names,
functools.partial(migrate_exchanges, migration="simapro-water"),
fix_localized_water_flows,
functools.partial(
link_iterable_by_fields,
other=Database(
self.default_biosphere_database_name or config.biosphere
),
edge_kinds=["biosphere"],
),
]
[docs]
def normalize_labels_to_brightway_standard(self) -> None:
self.apply_strategy(normalize_simapro_labels_to_brightway_standard)
[docs]
def write_database(
self,
backend: Optional[str] = None,
activate_parameters: bool = True,
searchable: bool = True,
) -> Database:
if activate_parameters:
self.write_project_parameters(delete_existing=False)
return super().write_database(
backend=backend,
activate_parameters=activate_parameters,
searchable=searchable,
)