from collections.abc import Iterator
from typing import Optional
import bw_processing as bwp
import matrix_utils as mu
import numpy as np
from bw2calc.errors import (
CyclicDependencyGraph,
DemandInStaticDatabase,
MissingDatabaseDependencies,
OutsideTechnosphere,
StaticDependsOnStochastic,
)
from bw2calc.lca import LCA
from bw2calc.utils import get_datapackage
[docs]
def _find_production_exchanges(mm: mu.MappedMatrix):
"""Run all production-exchange heuristics and return whatever was found.
Same logic as ``bw_graph_tools.guess_production_exchanges`` but does not raise
``UnclearProductionExchange`` when some columns are unresolved. This is intentional:
we call this on a non-square stochastic-only matrix whose unresolved rows are exactly
the interface products we want to identify.
Retains the other checks from ``guess_production_exchanges``: raises ``ValueError``
for an empty matrix or mismatched row/column result arrays.
Returns ``(row_indices, col_indices)`` — integer arrays of matrix row/column indices
where production exchanges were found. Unresolved columns are simply absent.
"""
from bw_graph_tools.matrix_tools import ( # noqa: PLC0415
gpe_fifth_heuristic,
gpe_first_heuristic,
gpe_fourth_heuristic,
gpe_second_heuristic,
gpe_third_heuristic,
)
if mm.matrix.shape[0] == 0:
raise ValueError("Empty matrix")
row, col = gpe_first_heuristic(mm)
row, col = gpe_second_heuristic(mm, row, col)
row, col = gpe_third_heuristic(mm, row, col)
row, col = gpe_fourth_heuristic(mm, row, col)
row, col = gpe_fifth_heuristic(mm, row, col)
if row.shape != col.shape:
raise ValueError("Guessed row indices do not match guessed column indices.")
return row, col
[docs]
class PartitionedMonteCarloLCA(Iterator):
"""Monte Carlo LCA that pre-solves a static background system once.
Splits the full system into a static (background) part and a stochastic (foreground) part.
The static system is solved deterministically for each product demanded across the
static/stochastic boundary (interface products), producing aggregated biosphere vectors.
These are stored in an in-memory dynamic datapackage that is combined with the stochastic
packages for each Monte Carlo iteration.
This avoids rebuilding and solving the (typically large) background matrix on every
iteration — only the foreground matrix is resampled.
Parameters
----------
demand : dict
Functional unit: ``{activity_or_product_id: amount}``. Must be in the stochastic system.
static_databases : list[str]
Names of databases to treat as static (e.g. ``["biosphere3", "ecoinvent 3.10"]``).
data_objs : list
All datapackages: stochastic LCI + static LCI + LCIA method. Packages for databases
listed in ``static_databases`` are identified by their ``metadata["name"]`` field, which
must equal ``bw_processing.clean_datapackage_name(database_name)``.
seed_override : int, optional
RNG seed passed to the inner stochastic LCA.
Notes
-----
All LCI datapackages must contain a ``database_dependencies`` key in their metadata,
which is written by ``bw2data >= 4.7``.
Design note: composition vs. subclassing
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
This class uses **composition**: it builds an internal ``._lca`` instance from the
stochastic packages plus the pre-solved dynamic datapackage, and delegates properties
to it. An alternative would be to subclass ``LCA`` and override ``load_lci_data()``
to perform the same partitioning — classify packages, pre-solve the static system,
inject the dynamic datapackage, replace ``self.packages``, then call
``super().load_lci_data()``. That approach would give full inheritance of all current
and future ``LCA`` attributes without explicit delegation.
The reason composition was chosen instead:
* **Package list mutation.** The override would need to replace ``self.packages``
(set from ``data_objs`` in ``__init__``) with the filtered stochastic+dynamic list
mid-lifecycle, which is non-obvious and makes ``self.packages`` inconsistent with
``self.data_objs``.
* **Matrix semantics.** Whether composed or subclassed, ``technosphere_matrix`` is
the *reduced* aggregated-proxy matrix, not the full combined static+stochastic
system. Subclassing makes this less visible rather than resolving it.
"""
def __init__(
self,
demand: dict[int, float],
static_databases: list,
data_objs: list,
seed_override: Optional[int] = None,
):
if not all(isinstance(k, int) for k in demand):
raise TypeError("All demand keys must be integers (activity/product database IDs).")
[docs]
self.static_databases = list(static_databases)
[docs]
self.seed_override = seed_override
packages = [get_datapackage(obj) for obj in data_objs]
(
self.static_packages,
self.stochastic_packages,
self.method_packages,
) = self._classify_packages(packages)
self._validate()
# ------------------------------------------------------------------
# Public interface
# ------------------------------------------------------------------
[docs]
def lci(self) -> None:
"""Pre-solve the static system, build the dynamic datapackage, and run the first LCI."""
dynamic_dp = self._build_dynamic_datapackage()
self._lca = LCA(
demand=self.demand,
data_objs=self.stochastic_packages + self.method_packages + [dynamic_dp],
use_distributions=True,
seed_override=self.seed_override,
)
self._lca.lci()
[docs]
def lcia(self) -> None:
self._lca.lcia()
[docs]
def keep_first_iteration(self) -> None:
self._lca.keep_first_iteration()
def __next__(self) -> None:
next(self._lca)
# ------------------------------------------------------------------
# Delegated properties
# ------------------------------------------------------------------
@property
[docs]
def score(self) -> float:
return self._lca.score
@property
[docs]
def inventory(self):
return self._lca.inventory
@property
[docs]
def supply_array(self):
return self._lca.supply_array
@property
[docs]
def characterized_inventory(self):
return self._lca.characterized_inventory
@property
[docs]
def dicts(self):
return self._lca.dicts
@property
[docs]
def technosphere_matrix(self):
return self._lca.technosphere_matrix
@property
[docs]
def biosphere_matrix(self):
return self._lca.biosphere_matrix
@property
[docs]
def characterization_matrix(self):
return self._lca.characterization_matrix
# ------------------------------------------------------------------
# Setup helpers
# ------------------------------------------------------------------
[docs]
def _classify_packages(self, packages):
"""Classify packages into static LCI, stochastic LCI, and method buckets.
Classification happens at the resource-group level so that a single datapackage
containing groups for different matrices or different databases is split correctly.
Each resource group has a single matrix label, so we use ``dp.groups`` to iterate
and ``dp.exclude({"group": name})`` to produce filtered sub-packages.
Group-to-database matching uses the bw2data naming convention where a group is
named ``clean_datapackage_name(db_name + " " + matrix_type)`` — i.e. it starts
with ``clean_datapackage_name(db_name)`` followed by ``_``. If no group-name
match is found, the check falls back to the package-level ``metadata["name"]``.
"""
static_names = {bwp.clean_datapackage_name(db) for db in self.static_databases}
static, stochastic, method = [], [], []
for dp in packages:
for group_name, group_dp in dp.groups.items():
matrix = group_dp.resources[0].get("matrix", "")
filtered = dp.filter_by_attribute("group", group_name)
if matrix == "characterization_matrix":
method.append(filtered)
elif matrix in ("technosphere_matrix", "biosphere_matrix"):
# bw2data names groups as clean_datapackage_name(db + " " + matrix_type),
# so a static group name starts with clean_datapackage_name(db) + "_".
# Fall back to the package-level name for the one-package-per-database case.
is_static = any(
group_name == sn or group_name.startswith(sn + "_") for sn in static_names
)
if not is_static:
is_static = dp.metadata.get("name", "") in static_names
(static if is_static else stochastic).append(filtered)
return static, stochastic, method
[docs]
def _validate(self) -> None:
for dp in self.static_packages + self.stochastic_packages:
if "database_dependencies" not in dp.metadata:
raise MissingDatabaseDependencies(
f"Package '{dp.metadata.get('name')}' is missing 'database_dependencies' "
"metadata. Reprocess the database with bw2data >= 4.7."
)
stochastic_names = {dp.metadata.get("name", "") for dp in self.stochastic_packages}
for dp in self.static_packages:
for dep in dp.metadata.get("database_dependencies", []):
if bwp.clean_datapackage_name(dep) in stochastic_names:
raise StaticDependsOnStochastic(
f"Static database '{dp.metadata.get('name')}' depends on stochastic "
f"database '{dep}'. Static databases must only depend on other static "
"databases."
)
graph: dict[str, set] = {}
for dp in self.static_packages + self.stochastic_packages:
name = dp.metadata.get("name", "")
deps = {
bwp.clean_datapackage_name(d) for d in dp.metadata.get("database_dependencies", [])
}
graph.setdefault(name, set()).update(deps)
self._check_for_cycles({k: list(v) for k, v in graph.items()})
@staticmethod
[docs]
def _check_for_cycles(graph: dict) -> None:
visited: set = set()
in_stack: set = set()
def dfs(node: str) -> None:
visited.add(node)
in_stack.add(node)
for neighbor in graph.get(node, []):
if neighbor not in visited:
dfs(neighbor)
elif neighbor in in_stack:
raise CyclicDependencyGraph(
f"Cycle detected in database dependency graph involving '{node}'"
)
in_stack.discard(node)
for node in graph:
if node not in visited:
dfs(node)
[docs]
def _build_dynamic_datapackage(self) -> bwp.DatapackageBase:
"""Solve the static system for each interface product; return dynamic datapackage."""
dynamic_dp = bwp.create_datapackage(name="partitioned_mc_static_background")
if not self.static_packages:
return dynamic_dp
# Build stochastic technosphere (deterministic) to identify interface products.
# We use _find_production_exchanges (not guess_production_exchanges) because the
# stochastic-only matrix is non-square — the unresolved rows are exactly the
# interface products we want.
stochastic_tech_mm = mu.MappedMatrix(
packages=self.stochastic_packages,
matrix="technosphere_matrix",
use_arrays=False,
use_distributions=False,
)
row_existing, col_existing = _find_production_exchanges(stochastic_tech_mm)
# Map stochastic matrix row index → product db id (used here and below)
stoch_product_reversed = {v: k for k, v in stochastic_tech_mm.row_mapper.to_dict().items()}
stochastic_produced_ids = {stoch_product_reversed[r] for r in row_existing}
for demand_key in self.demand:
if demand_key not in stochastic_produced_ids:
raise DemandInStaticDatabase(
f"Demand key {demand_key} does not have a production exchange in the "
"stochastic system. The functional unit must be in the stochastic "
"(foreground) system, not in a static background database."
)
all_rows = np.arange(stochastic_tech_mm.matrix.shape[0])
interface_row_indices = np.setdiff1d(all_rows, row_existing)
if not interface_row_indices.size:
return dynamic_dp
# Build static LCA (deterministic; dummy demand, only matrices needed)
from bw_graph_tools.matrix_tools import guess_production_exchanges # noqa: PLC0415
static_lca = LCA(
demand={0: 1.0},
data_objs=self.static_packages,
use_distributions=False,
)
static_lca.load_lci_data()
static_lca.decompose_technosphere()
# Map static product db id → static activity db id (via production exchanges)
static_row_existing, static_col_existing = guess_production_exchanges(
static_lca.technosphere_mm
)
static_product_reversed = static_lca.dicts.product.reversed # {matrix_idx: db_id}
static_activity_reversed = static_lca.dicts.activity.reversed # {matrix_idx: db_id}
static_producer: dict = {}
for r, c in zip(static_row_existing, static_col_existing):
static_producer[static_product_reversed[r]] = static_activity_reversed[c]
static_biosphere_reversed = static_lca.dicts.biosphere.reversed # {matrix_idx: db_id}
tech_rows, tech_cols, tech_data = [], [], []
bio_rows, bio_cols, bio_data = [], [], []
for row_idx in interface_row_indices:
product_id = stoch_product_reversed[row_idx]
if product_id not in static_producer:
raise OutsideTechnosphere(
f"Interface product with id {product_id} has no identified production "
"exchange in the static system. Check that all required databases are "
"listed in static_databases."
)
proxy_activity_id = static_producer[product_id]
# Solve static system for 1 unit of this interface product
demand_array = np.zeros(len(static_lca.dicts.product))
demand_array[static_lca.dicts.product[product_id]] = 1.0
supply_array = static_lca.solve_linear_system(demand_array)
aggregated_bio = np.asarray(static_lca.biosphere_matrix @ supply_array).flatten()
# Production exchange: proxy_activity produces 1 unit of the interface product
tech_rows.append(product_id)
tech_cols.append(proxy_activity_id)
tech_data.append(1.0)
# Aggregated biosphere: all non-zero flows from the static supply chain
for bio_idx in np.flatnonzero(aggregated_bio):
bio_rows.append(static_biosphere_reversed[bio_idx])
bio_cols.append(proxy_activity_id)
bio_data.append(float(aggregated_bio[bio_idx]))
tech_indices = np.array(list(zip(tech_rows, tech_cols)), dtype=bwp.INDICES_DTYPE)
dynamic_dp.add_persistent_vector(
matrix="technosphere_matrix",
indices_array=tech_indices,
data_array=np.array(tech_data),
)
if bio_rows:
bio_indices = np.array(list(zip(bio_rows, bio_cols)), dtype=bwp.INDICES_DTYPE)
dynamic_dp.add_persistent_vector(
matrix="biosphere_matrix",
indices_array=bio_indices,
data_array=np.array(bio_data),
)
return dynamic_dp