Source code for bw2data.backends.proxies

import copy
import sys
import uuid
import warnings
from collections.abc import Iterable
from typing import Callable, List, Optional

if sys.version_info < (3, 11):
    from typing_extensions import Self
else:
    from typing import Self

import pandas as pd

try:
    from bw_temporalis import TemporalDistribution
except ImportError:
[docs] TemporalDistribution = None
from bw2data import databases, geomapping, get_node, projects from bw2data.backends import sqlite3_lci_db from bw2data.backends.schema import ActivityDataset, ExchangeDataset from bw2data.backends.typos import ( check_activity_keys, check_activity_type, check_exchange_keys, check_exchange_type, ) from bw2data.backends.utils import dict_as_activitydataset, dict_as_exchangedataset from bw2data.configuration import labels from bw2data.errors import ValidityError from bw2data.logs import stdout_feedback_logger from bw2data.proxies import ActivityProxyBase, ExchangeProxyBase from bw2data.search import IndexManager from bw2data.signals import on_activity_code_change, on_activity_database_change if sys.version_info < (3, 10): from typing_extensions import TypeAlias else: from typing import TypeAlias
[docs] class Exchanges(Iterable): """Iterator for exchanges with some additional methods. This is not a generator; ``next()`` is not supported. Everything time you start to iterate over the object you get a new list starting from the beginning. However, to get a single item you can do ``next(iter(foo))``. Ordering is by database row id. Supports the following: .. code-block:: python exchanges = activity.exchanges() # Iterate for exc in exchanges: pass # Length len(exchanges) # Delete all exchanges.delete() """ def __init__(self, key, kinds=None, reverse=False):
[docs] self._key = key
if reverse: self._args = [ ExchangeDataset.input_database == self._key[0], ExchangeDataset.input_code == self._key[1], # No production exchanges - these two clauses have to be together, # not individually. Note: DO NOT wrap these two clauses in # parentheses, this somehow breaks the functionality! ExchangeDataset.output_database != self._key[0] & ExchangeDataset.output_code != self._key[1], ] else: self._args = [ ExchangeDataset.output_database == self._key[0], ExchangeDataset.output_code == self._key[1], ] if kinds: self._args.append(ExchangeDataset.type << kinds)
[docs] def filter(self, expr): self._args.append(expr)
[docs] def delete(self, allow_in_sourced_project: bool = False): if projects.dataset.is_sourced and not allow_in_sourced_project: raise NotImplementedError("Mass exchange deletion not supported in sourced projects") databases.set_dirty(self._key[0]) ExchangeDataset.delete().where(*self._args).execute()
[docs] def _get_queryset(self): return ExchangeDataset.select().where(*self._args).order_by(ExchangeDataset.id)
def __iter__(self): for obj in self._get_queryset(): yield Exchange(obj) def __len__(self): return self._get_queryset().count()
[docs] def to_dataframe( self, categorical: bool = True, formatters: Optional[List[Callable]] = None ) -> pd.DataFrame: """Return a pandas DataFrame with all node exchanges. Standard DataFrame columns are: target_id: int, target_database: str, target_code: str, target_name: Optional[str], target_reference_product: Optional[str], target_location: Optional[str], target_unit: Optional[str], target_type: Optional[str] source_id: int, source_database: str, source_code: str, source_name: Optional[str], source_product: Optional[str], # Note different label source_location: Optional[str], source_unit: Optional[str], source_categories: Optional[str] # Tuple concatenated with "::" as in `bw2io` edge_amount: float, edge_type: str, Target is the node consuming the edge, source is the node or flow being consumed. The terms target and source were chosen because they also work well for biosphere edges. Args: ``categorical`` will turn each string column in a `pandas Categorical Series <https://pandas.pydata.org/docs/reference/api/pandas.Categorical.html>`__. This takes 1-2 extra seconds, but saves around 50% of the memory consumption. ``formatters`` is a list of callables that modify each row. These functions must take the following keyword arguments, and use the `Wurst internal data format <https://wurst.readthedocs.io/#internal-data-format>`__: * ``node``: The target node, as a dict * ``edge``: The edge, including attributes of the source node * ``row``: The current row dict being modified. The functions in ``formatters`` don't need to return anything, they modify ``row`` in place. Returns a pandas ``DataFrame``. """ result = [] for edge in self: row = { "target_id": edge.output["id"], "target_database": edge.output["database"], "target_code": edge.output["code"], "target_name": edge.output.get("name"), "target_reference_product": edge.output.get("reference product"), "target_location": edge.output.get("location"), "target_unit": edge.output.get("unit"), "target_type": edge.output.get("type", labels.process_node_default), "source_id": edge.input["id"], "source_database": edge.input["database"], "source_code": edge.input["code"], "source_name": edge.input.get("name"), "source_product": edge.input.get("product"), "source_location": edge.input.get("location"), "source_unit": edge.input.get("unit"), "source_categories": ( "::".join(edge.input["categories"]) if edge.input.get("categories") else None ), "edge_amount": edge["amount"], "edge_type": edge["type"], } if formatters is not None: for func in formatters: func(node=edge.output, edge=edge, row=row) result.append(row) df = pd.DataFrame(result) if categorical: categorical_columns = [ "target_database", "target_name", "target_reference_product", "target_location", "target_unit", "target_type", "source_database", "source_code", "source_name", "source_product", "source_location", "source_unit", "source_categories", "edge_type", ] for column in categorical_columns: if column in df.columns: df[column] = df[column].astype("category") return df
[docs] class Activity(ActivityProxyBase):
[docs] ORMDataset: TypeAlias = ActivityDataset
def __init__(self, document=None, **kwargs): """Create an `Activity` proxy object. If this is a new activity, can pass `kwargs`. If the activity exists in the database, `document` should be an `ActivityDataset`. """ if document is None: self._document = self.ORMDataset() self._data = kwargs else: self._document = document self._data = self._document.data self._data["code"] = self._document.code self._data["database"] = self._document.database self._data["id"] = self._document.id @property
[docs] def id(self): return self._document.id
def __getitem__(self, key): if key == 0: return self["database"] elif key == 1: return self["code"] elif key in self._data: return self._data[key] for section in ("classifications", "properties"): if section in self._data: if isinstance(self._data[section], list): try: return {k: v for k, v in self._data[section]}[key] except: pass elif key in self._data[section]: return self._data[section][key] # Removed in 4.3. A correct implementation of a search for a reference product # exchange led to sever performance degradation. # if self._data.get("type", labels.process_node_default) in labels.process_node_types: # try: # rp = self.rp_exchange() # except ValueError: # raise KeyError # if key in rp.get("classifications", []): # return rp["classifications"][key] # if key in rp.get("properties", []): # return rp["properties"][key] raise KeyError def __setitem__(self, key, value): if key == "id": raise ValueError("`id` is read-only") elif key == "code" and "code" in self._data: self._change_code(value) stdout_feedback_logger.info( "Successfully switched activity dataset to new code `%s`", value ) elif key == "database" and "database" in self._data: self._change_database(value) stdout_feedback_logger.info( "Successfully switch activity dataset to database `%s`", value ) else: super(Activity, self).__setitem__(key, value) @property
[docs] def key(self): return (self.get("database"), self.get("code"))
[docs] def delete(self, signal: bool = True): from bw2data import Database, calculation_setups from bw2data.parameters import ActivityParameter, ParameterizedExchange def purge(obj: Activity, dct: dict) -> dict: return { key: value for key, value in dct.items() if key != obj.id and key != obj.key } try: ap = ActivityParameter.get(database=self[0], code=self[1]) ParameterizedExchange.delete().where(ParameterizedExchange.group == ap.group).execute() ActivityParameter.delete().where( ActivityParameter.database == self[0], ActivityParameter.code == self[1] ).execute() except ActivityParameter.DoesNotExist: pass IndexManager(Database(self["database"]).filename).delete_dataset(self._data) self.exchanges().delete(allow_in_sourced_project=True) self.upstream().delete(allow_in_sourced_project=True) for name, setup in calculation_setups.items(): if any( (key == self.id or key == self.key) for func_unit in setup["inv"] for key in func_unit ): stdout_feedback_logger.warning("Removing this node from calculation setup %s", name) setup["inv"] = [purge(self, dct) for dct in setup["inv"] if purge(self, dct)] calculation_setups.flush() self._document.delete_instance(signal=signal) self = None
[docs] def save(self, signal: bool = True, data_already_set: bool = False, force_insert: bool = False): """ Saves the current activity to the database after performing various checks. This method validates the activity, updates the database status, and handles geographical and indexing updates. It raises an error if the activity is not valid and updates relevant data in the database. Raises ------ ValidityError If the activity is not valid, an error is raised detailing the reasons. Notes ----- The method performs the following operations: - Checks if the activity is valid. - Marks the database as 'dirty', indicating changes. - Checks for type and key validity of the activity. - Updates the activity's associated document in the database. - Updates the geographical mapping if needed. - Updates the index if the database is searchable. Examples -------- >>> activity.save() Saves the activity if it's valid, otherwise raises ValidityError. """ from bw2data import Database if not data_already_set and not self.valid(): raise ValidityError( "This activity can't be saved for the " + "following reasons\n\t* " + "\n\t* ".join(self.valid(why=True)[1]) ) databases.set_dirty(self["database"]) if not data_already_set: if "type" not in self._data: warnings.warn( f"Activity {self.get('code')} in database '{self.get('database')}' has no " f"`type` specified; defaulting to '{labels.process_node_default}'", UserWarning, ) self._data["type"] = labels.process_node_default check_activity_type(self._data.get("type")) check_activity_keys(self) for key, value in dict_as_activitydataset(self._data).items(): # ID value is either already in `._document` (update) or will be created by # `SnowflakeIDBaseClass.save()`. if key != "id": setattr(self._document, key, value) self._document.save(signal=signal, force_insert=force_insert) if self.get("location") and self["location"] not in geomapping: geomapping.add([self["location"]]) if databases[self["database"]].get("searchable", True): IndexManager(Database(self["database"]).filename).update_dataset(self._data)
[docs] def _change_code(self, new_code: str, signal: bool = True): if self["code"] == new_code: return previous = self["code"] if ( self.ORMDataset.select() .where( self.ORMDataset.database == self["database"], self.ORMDataset.code == new_code, ) .count() ): raise ValueError("Activity database with code `{}` already exists".format(new_code)) with sqlite3_lci_db.atomic() as txn: self.ORMDataset.update(code=new_code).where(self.ORMDataset.id == self.id).execute() ExchangeDataset.update(output_code=new_code).where( ExchangeDataset.output_database == self["database"], ExchangeDataset.output_code == self["code"], ).execute() ExchangeDataset.update(input_code=new_code).where( ExchangeDataset.input_database == self["database"], ExchangeDataset.input_code == self["code"], ).execute() if databases[self["database"]].get("searchable"): from bw2data import Database IndexManager(Database(self["database"]).filename).delete_dataset(self) self._data["code"] = new_code IndexManager(Database(self["database"]).filename).add_datasets([self]) else: self._data["code"] = new_code if signal: on_activity_code_change.send( old={"id": self.id, "code": previous}, new={"id": self.id, "code": new_code}, )
[docs] def _change_database(self, new_database: str, signal: bool = True): if self["database"] == new_database: return previous = self["database"] if new_database not in databases: raise ValueError("Database {} does not exist".format(new_database)) with sqlite3_lci_db.atomic() as txn: self.ORMDataset.update(database=new_database).where( self.ORMDataset.id == self.id ).execute() ExchangeDataset.update(output_database=new_database).where( ExchangeDataset.output_database == self["database"], ExchangeDataset.output_code == self["code"], ).execute() ExchangeDataset.update(input_database=new_database).where( ExchangeDataset.input_database == self["database"], ExchangeDataset.input_code == self["code"], ).execute() if databases[self["database"]].get("searchable"): from bw2data import Database IndexManager(Database(self["database"]).filename).delete_dataset(self) self._data["database"] = new_database IndexManager(Database(self["database"]).filename).add_datasets([self]) else: self._data["database"] = new_database if signal: on_activity_database_change.send( old={"id": self.id, "database": previous}, new={"id": self.id, "database": new_database}, )
[docs] def exchanges(self, exchanges_class=Exchanges): return exchanges_class(self.key)
[docs] def edges(self): return self.exchanges()
[docs] def technosphere(self, exchanges_class=Exchanges): return exchanges_class(self.key, kinds=labels.technosphere_negative_edge_types)
[docs] def biosphere(self, exchanges_class=Exchanges): return exchanges_class( self.key, kinds=labels.biosphere_edge_types, )
[docs] def production(self, include_substitution=False, exchanges_class=Exchanges): kinds = labels.technosphere_positive_edge_types if not include_substitution: kinds = [obj for obj in kinds if obj not in labels.substitution_edge_types] return exchanges_class(self.key, kinds=kinds)
[docs] def rp_exchange(self): """Return an ``Exchange`` object corresponding to the reference production. Uses the following in order: * The ``production`` exchange, if only one is present * The ``production`` exchange with the same name as the activity ``reference product``. Raises ``ValueError`` if no suitable exchange is found.""" candidates = list(self.exchanges()) functional = [exc for exc in candidates if exc.get("functional")] if len(functional) > 1: # Multifunctional process, but can try with "reference product" candidates = functional elif len(functional) == 1: return functional[0] candidates = [ exc for exc in candidates if exc.get("type") in set(labels.technosphere_positive_edge_types).difference( labels.substitution_edge_types ) ] if len(candidates) == 1: return candidates[0] candidates2 = [ exc for exc in candidates if exc.input._data.get("name") == self._data.get("reference product") ] if len(candidates2) == 1: return candidates2[0] else: raise ValueError( "Can't find a single reference product exchange (found {} candidates)".format( len(candidates) ) )
[docs] def producers(self): return self.production()
[docs] def substitution(self, exchanges_class=Exchanges): return exchanges_class( self.key, kinds=labels.substitution_edge_types, )
[docs] def upstream(self, kinds=labels.technosphere_negative_edge_types, exchanges_class=Exchanges): return exchanges_class(self.key, kinds=kinds, reverse=True)
[docs] def consumers(self, kinds=labels.technosphere_negative_edge_types): return self.upstream(kinds=kinds)
[docs] def new_exchange(self, **kwargs): return self.new_edge(**kwargs)
[docs] def new_edge(self, **kwargs): """Create a new exchange linked to this activity""" exc = Exchange() exc.output = self.key for key, value in kwargs.items(): if key == "id": raise ValueError(f"`id` must be created automatically, but `id={value}` given.") exc[key] = value return exc
[docs] def create_aggregated_process( self, database: Optional[str] = None, signal: bool = True, **kwargs ) -> tuple[Self, Self]: """Create a new aggregated process representing the life cycle inventory of this process. This method performs a life cycle inventory (LCI) calculation for the reference product of this process and creates a new aggregated process node with all biosphere exchanges from the upstream supply chain. The aggregated process represents the cumulative environmental impacts of producing the reference product. The method: 1. Performs an LCA calculation for the reference product 2. Creates a copy of this process (and optionally the product node if different) 3. Creates a production exchange linking the new process to the new product 4. Creates biosphere exchanges for all non-zero inventory flows Args: database: Name of the target database where the new process will be created. If ``None``, uses the current process's database. The database must already exist. Defaults to ``None``. signal: Whether to emit database signals during save operations. Defaults to ``True``. **kwargs: Additional keyword arguments passed to ``_create_activity_copy()`` to customize the new process attributes (e.g., ``name``, ``location``, etc.). Returns: tuple[Activity, Activity]: A tuple containing: - The new aggregated process node - The new product node (or the same as the process if the process produces itself) Raises: ValueError: If ``database`` is provided and doesn't exist, or if this activity is not a process node type (must be "process" or "processwithreferenceproduct"). ImportError: If ``bw2calc`` is not installed (required for LCA calculations). Examples: Create an aggregated process in the same database: >>> process = Database("my_db").get("process_code") >>> new_process, new_product = process.create_aggregated_process() Create an aggregated process in a different database with custom attributes: >>> new_process, new_product = process.create_aggregated_process( ... database="aggregated_db", ... name="Aggregated Steel Production", ... location="GLO" ... ) The new process will contain all biosphere exchanges from the upstream supply chain: >>> for exc in new_process.exchanges(): ... if exc["type"] == "biosphere": ... print(f"{exc['input'][1]}: {exc['amount']}") """ from bw2calc import LCA from bw2data.compat import prepare_lca_inputs if database and database not in databases: raise ValueError(f"Database {database} doesn't exist") if self["type"] not in labels.process_node_types: raise ValueError("Only works with a `process` or `processwithreferenceproduct` node") rpe = self.rp_exchange() product = rpe.input amount = rpe["amount"] fu, data_objs, _ = prepare_lca_inputs({product: amount}, remapping=False) lca = LCA(demand=fu, data_objs=data_objs) lca.lci() new = self._create_activity_copy(**kwargs) new["database"] = database new.save(signal=signal) if product != self: new_p = product._create_activity_copy() new_p["database"] = database new_p.save(signal=signal) else: new_p = new ExchangeDataset( data={ "amount": amount, "uncertainty_type": 0, "type": rpe["type"], "functional": bool(rpe.get("functional")), "input": (new_p["database"], new_p["code"]), "output": (new["database"], new["code"]), }, input_code=new_p["code"], input_database=new_p["database"], output_code=new["code"], output_database=new["database"], type=rpe["type"], ).save(signal=signal) inventory = lca.inventory.sum(axis=1) for node_id, row_index in lca.dicts.biosphere.items(): if (amount := inventory[row_index, 0]) != 0: node = get_node(id=node_id) ExchangeDataset( data={ "amount": amount, "uncertainty_type": 0, "type": labels.biosphere_edge_default, "input": (node["database"], node["code"]), "output": (new["database"], new["code"]), }, input_code=node["code"], input_database=node["database"], output_code=new["code"], output_database=new["database"], type=labels.biosphere_edge_default, ).save(signal=signal) return new, new_p
[docs] def _create_activity_copy(self, **kwargs) -> Self: if "id" in kwargs: raise ValueError(f"`id` must be created automatically, but `id={kwargs['id']}` given.") data = {k: v for k, v in self._data.items() if k not in ("id", "document")} for key, value in kwargs.items(): if value: data[key] = value if data["database"] == self["database"] and data["code"] == self["code"]: data["code"] = str(uuid.uuid4().hex) stdout_feedback_logger.info( "Changed code to avoid conflict with existing value: {self['code']} to {data['code']}" ) return Activity(**data)
[docs] def copy(self, code: Optional[str] = None, signal: bool = True, **kwargs): """Copy the activity. Returns a new `Activity`. `code` is the new activity code; if not given, a UUID is used. `kwargs` are additional new fields and field values, e.g. name='foo' """ if code is not None: kwargs["code"] = code activity = self._create_activity_copy(**kwargs) activity.save(signal=signal) for exc in self.exchanges(): data = {k: v for k, v in exc._data.items() if k not in ("id", "output")} data["output"] = activity.key if exc["input"] == self.key: data["input"] = activity.key ExchangeDataset(**dict_as_exchangedataset(data)).save(signal=signal) return activity
[docs] class Exchange(ExchangeProxyBase):
[docs] ORMDataset: TypeAlias = ExchangeDataset
def __init__(self, document=None, **kwargs): """Create an `Exchange` proxy object. If this is a new exchange, can pass `kwargs`. If the exchange exists in the database, `document` should be an `ExchangeDataset`. """ if document is None: self._document = self.ORMDataset() self._data = kwargs else: self._document = document self._data = self._document.data self._data["input"] = ( self._document.input_database, self._document.input_code, ) self._data["output"] = ( self._document.output_database, self._document.output_code, ) # Restore temporal_distribution from JSON if present self._data = self._restore_temporal_distributions(self._data) @property
[docs] def id(self): return self._document.id
[docs] def _process_temporal_distributions(self, data): """Process temporal_distribution attributes by converting TemporalDistribution instances to JSON. This method creates a copy of the data and converts any TemporalDistribution instances found in 'temporal_distribution' keys to their JSON representation using the to_json method. The original data is not modified. Args: data: The exchange data dictionary Returns: A copy of the data with TemporalDistribution instances converted to JSON """ if TemporalDistribution is None: # bw_temporalis not available, return data as-is return data # Create a deep copy to avoid modifying the original data processed_data = copy.deepcopy(data) # Check if there's a temporal_distribution key with a TemporalDistribution value if "temporal_distribution" in processed_data: value = processed_data["temporal_distribution"] if isinstance(value, TemporalDistribution): processed_data["temporal_distribution"] = value.to_json() return processed_data
[docs] def _restore_temporal_distributions(self, data): """Restore temporal_distribution attributes by converting JSON back to TemporalDistribution instances. This method creates a copy of the data and converts any JSON representations of TemporalDistribution instances back to TemporalDistribution objects. The original data is not modified. Args: data: The exchange data dictionary Returns: A copy of the data with JSON temporal_distribution converted back to TemporalDistribution instances """ if TemporalDistribution is None: # bw_temporalis not available, return data as-is if "temporal_distribution" in data: value = data["temporal_distribution"] if isinstance(value, dict) and value.get("type") == "temporal_distribution": warnings.warn( "Found temporal_distribution JSON data but bw_temporalis library is not installed. " "TemporalDistribution object will not be restored. Install bw_temporalis to enable " "temporal distribution functionality.", UserWarning, ) return data # Create a deep copy to avoid modifying the original data processed_data = copy.deepcopy(data) # Check if there's a temporal_distribution key with JSON data if "temporal_distribution" in processed_data: value = processed_data["temporal_distribution"] if isinstance(value, dict) and value.get("type") == "temporal_distribution": # This looks like JSON from a TemporalDistribution.to_json() call try: # Create a new TemporalDistribution instance from the JSON data processed_data["temporal_distribution"] = TemporalDistribution(value["data"]) except (KeyError, TypeError, AttributeError, ValueError) as e: # If conversion fails, leave the data as-is and warn about the error warnings.warn( f"Failed to restore TemporalDistribution from JSON data: {value}. " f"Data will remain as JSON. Error: {str(e)}", UserWarning, ) return processed_data
[docs] def save(self, signal: bool = True, data_already_set: bool = False, force_insert: bool = False): if not data_already_set and not self.valid(): raise ValidityError( "This exchange can't be saved for the " "following reasons\n\t* " + "\n\t* ".join(self.valid(why=True)[1]) ) databases.set_dirty(self["output"][0]) if not data_already_set: check_exchange_type(self._data.get("type")) check_exchange_keys(self) # Process temporal_distribution attributes before saving processed_data = self._process_temporal_distributions(self._data) for key, value in dict_as_exchangedataset(processed_data).items(): setattr(self._document, key, value) self._document.save(signal=signal, force_insert=force_insert)
[docs] def delete(self, signal: bool = True): from bw2data.parameters import ParameterizedExchange ParameterizedExchange.delete().where( ParameterizedExchange.exchange == self._document.id ).execute() self._document.delete_instance(signal=signal) databases.set_dirty(self["output"][0]) self = None