from pprint import pformat
from typing import List
from uuid import uuid4
import bw2data as bd
from bw2data.logs import stdout_feedback_logger
[docs]
EDGE_CORE_COLUMNS = [
"name",
"amount",
"database",
"location",
"unit",
"functional",
"type",
"uncertainty type",
"loc",
"scale",
"shape",
"minimum",
"maximum",
]
[docs]
def create_products_as_new_nodes(data: List[dict]) -> List[dict]:
"""
Create new product nodes and link to them if needed.
We create new `product` if the following conditions are met:
* The dataset is not multifunctional (
`dataset.get("type") != bd.labels.multifunctional_node_default`). Multifunctional datasets
handle product creation separately.
* The edge is functional (`obj.get("functional") is True`)
* The edge is unlinked (`obj.get("input")` is falsey)
* The given edge has a `name`, and that `name` is different than the dataset `name`
* The combination of `name` and `location` is not present in the other dataset nodes. If no
`location` attribute is given for the edge under consideration, we use the `location` of the
dataset.
Create new nodes, and links the originating edges to the new product nodes.
Modifies data in-place, and returns the modified `data`.
"""
combos = {(ds.get("name"), ds.get("location")) for ds in data}
nodes = []
for ds in data:
if ds.get("type") == bd.labels.multifunctional_node_default:
# Has its own product handling
continue
for edge in ds.get("exchanges", []):
if (
edge.get("functional")
and not edge.get("input")
and edge.get("name")
and edge["name"] != ds.get("name")
):
if not ds.get("database"):
raise KeyError(
"""
Can't create a new `product` node, as dataset is missing `database` attribute:
{}""".format(
pformat(ds)
)
)
key = (edge["name"], edge.get("location") or ds.get("location"))
if key not in combos:
code = uuid4().hex
nodes.append(
{
"name": edge["name"],
"location": key[1] or bd.config.global_location,
"unit": edge.get("unit") or ds.get("unit"),
"exchanges": [],
"code": code,
"type": bd.labels.product_node_default,
"database": ds["database"],
}
| {k: v for k, v in edge.items() if k not in EDGE_CORE_COLUMNS}
)
edge["input"] = (ds["database"], code)
combos.add(key)
if nodes:
data.extend(nodes)
return data
[docs]
def separate_processes_from_products(
data: List[dict], field_exclusions=["location"], code_suffix: str = "-product"
) -> List[dict]:
"""Given a set of processes, and no separate data on products, create copies of the processes
as products and re-link the local supply chain.
Designed for use in importing databases where processes are not strongly typed as different from
products.
Copies over all attributes from the source processes except:
- Those listed in `field_exclusions`
- `type` is set to `bw2data.labels.product_node_default`
- Uses the `code_suffix` to generate a new `code` value (previous plus code suffix)
- No edges are copied over
- If the attribute is present `reference product`, this is used as the product name
Should come late in the import process, when internal links are all present."""
if bd.labels.product_node_default in {ds.get("type") for ds in data}:
raise ValueError(
"This function requires no product nodes in the imported database"
)
processes = [
ds
for ds in data
if ds.get("type")
in (bd.labels.process_node_default, bd.labels.chimaera_node_default)
]
codes = {ds["code"] for ds in processes}
if intersection := {code + code_suffix for code in codes}.intersection(codes):
raise ValueError(
f"Given `code_suffix` results in code overlaps for the following process codes: {intersection}"
)
product_mapping, products = {}, []
for ds in processes:
if not ds.get("exchanges"):
stdout_feedback_logger.warning(
f"Skipping dataset {ds.get('name')} | `{ds['code']}` with no edges"
)
self_production = [
exc
for exc in ds["exchanges"]
if exc.get("input") == (ds.get("database"), ds["code"])
and (
exc.get("type") == bd.labels.production_edge_default
or exc.get("functional")
)
]
if len(self_production) > 1:
stdout_feedback_logger.info(
f"Process dataset {ds.get('name')} | `{ds['code']}` has {len(self_production)} functional edges which will all be linked to a new product"
)
elif not self_production:
stdout_feedback_logger.warning(
f"Skipping process dataset {ds.get('name')} | `{ds['code']}` as it has no self-referential production edges"
)
continue
product = {
"type": bd.labels.product_node_default,
"exchanges": [],
"code": ds["code"] + code_suffix,
"database": ds["database"],
}
product_mapping[(ds['database'], ds["code"])] = (ds['database'], product["code"])
for key, value in ds.items():
if key.lower() in ("product", "reference_product", "reference product"):
product["name"] = value
if key in field_exclusions or key in (
"exchanges",
"code",
"database",
"type",
):
continue
elif key == "name" and product.get("name"):
continue
else:
product[key] = value
products.append(product)
ds['type'] = bd.labels.process_node_default
for ds in data:
for edge in ds.get("exchanges", []):
if edge.get("input"):
edge["input"] = product_mapping.get(edge["input"], edge["input"])
return data + products