import itertools
import json
import sys
from typing import (
TYPE_CHECKING,
Any,
Callable,
Iterable,
Iterator,
Optional,
Sequence,
TypeVar,
Union,
)
import deepdiff
from bw2data import databases
from bw2data.backends.proxies import Activity, Exchange
from bw2data.backends.schema import ActivityDataset, ExchangeDataset
from bw2data.backends.utils import dict_as_activitydataset, dict_as_exchangedataset
from bw2data.database import DatabaseChooser
from bw2data.errors import DifferentObjects, IncompatibleClasses
from bw2data.parameters import (
ActivityParameter,
DatabaseParameter,
Group,
ParameterBase,
ParameterizedExchange,
ProjectParameter,
)
from bw2data.signals import SignaledDataset
from bw2data.snowflake_ids import snowflake_id_generator
from bw2data.utils import get_node
if sys.version_info < (3, 10):
from typing_extensions import TypeAlias
else:
from typing import TypeAlias
if sys.version_info < (3, 11):
from typing_extensions import Self
else:
from typing import Self
if TYPE_CHECKING:
import typing
import peewee
[docs]
def _id(revision: Revision) -> ID:
return revision["metadata"]["revision"]
[docs]
def _parent(revision: Revision) -> Optional[ID]:
return revision["metadata"].get("parent_revision")
[docs]
def _last(x: Iterable[T]) -> T:
"""Returns the last element of a (non-empty) series."""
i = iter(x)
ret = next(i)
for ret in i:
pass
return ret
[docs]
def _interleave(i0: Iterator[T], i1: Iterator[U]) -> Iterator[Union[T, U]]:
"""Similar to a flat zip, but also yields remaining elements."""
c0: Iterator[Union[T, U]] = i0
c1: Iterator[Union[T, U]] = i1
while True:
try:
yield next(c0)
except StopIteration:
yield from c1
break
c0, c1 = c1, c0
[docs]
class RevisionGraph:
"""Graph of revisions, edges are based on `metadata.parent_revision`."""
[docs]
class Iterator:
"""Helper class implementing iteration from child to parent."""
def __init__(self, g: "RevisionGraph", head: Optional[ID] = None):
[docs]
self.head: Optional[ID] = head if head is not None else g.head
def __iter__(self) -> "typing.Iterator":
return self
def __next__(self) -> Optional[dict]:
if self.head is None:
raise StopIteration
ret = self.id_map[self.head]
self.head = _parent(ret)
return ret
def __init__(self, head: ID, revisions: Sequence[Revision]):
[docs]
self.revisions = revisions
[docs]
self.id_map = {_id(r): r for r in revisions}
def __iter__(self):
"""Iterates the graph from head to root."""
return self.Iterator(self)
[docs]
def range(
self,
r0: Optional[ID] = None,
r1: Optional[ID] = None,
) -> Iterable[Revision]:
"""
Creates an iterator for a revision range (reversed).
- `range()`: same as `range(self.head)`
- `range(r)`: all revisions starting from `r`
- `range(None, r)`: same as `range(r)`
- `range(r0, r1)`: `r0..r1`
"""
if r1 is None:
return self.Iterator(self, r0 if r0 is not None else self.head)
i = self.Iterator(self, r1)
# Redundant, but avoids the cost of unnecessary predicate applications.
if r0 is None:
return i
p = self.id_map[r0]
return itertools.takewhile(lambda x: x is not p, i)
[docs]
def is_ancestor(self, parent: Optional[ID], child: ID) -> bool:
"""Checks whether a revision can be reached by another."""
return parent is None or self.id_map[parent] in self.range(child)
[docs]
def merge_base(
self,
revision0: Optional[ID],
revision1: Optional[ID],
) -> Optional[ID]:
"""Finds the nearest common ancestor between two revisions."""
if revision0 is None or revision1 is None:
return None
seen = set()
# Iteration order doesn't matter, but forks are expected to be much
# shorter than the full history, so interleave them as a heuristic.
for r in _interleave(
self.Iterator(self, revision0),
self.Iterator(self, revision1),
):
r = _id(r)
if r in seen:
return r
seen.add(r)
return None
[docs]
def set_head(self, revision: ID):
self.head = revision
[docs]
def rebase(self, onto: ID, upstream: ID, revision: ID) -> Revision:
"""Transplants the sequence `upstream..revision` on top of `onto`."""
r = _last(self.range(upstream, revision))
assert _parent(r) == upstream, f"invalid range {upstream}..{revision}"
r["metadata"]["parent_revision"] = onto
return r
[docs]
class Delta:
"""
The difference between two versions of an object.
Can be serialized, transferred, and applied to the same previous version to
change it to the new state.
"""
def __init__(
self,
delta: Optional[Union[deepdiff.Delta, dict]],
obj_type: Optional[str] = None,
obj_id: Optional[Union[int, str]] = None,
change_type: Optional[str] = None,
):
"""
Private, exists only for type-checking.
Use one of the class-method constructors to create objects.
"""
[docs]
self.change_type = change_type
[docs]
def apply(self, obj):
return obj + self.delta
@classmethod
[docs]
def from_dict(cls, d: dict) -> Self:
return cls(
delta=deepdiff.Delta(
JSONEncoder().encode(d), deserializer=deepdiff.serialization.json_loads
),
)
@classmethod
[docs]
def from_difference(
cls,
obj_type: str,
obj_id: Optional[Union[int, str]],
change_type: str,
diff: deepdiff.DeepDiff,
) -> Self:
return cls(
obj_type=obj_type,
obj_id=obj_id,
change_type=change_type,
delta=deepdiff.Delta(diff),
)
@classmethod
[docs]
def _direct_lci_node_difference(cls, old: dict, new: dict, change_type: str) -> Self:
return cls.from_difference(
"lci_node",
old["id"],
change_type,
deepdiff.DeepDiff(
old,
new,
verbose_level=2,
),
)
@classmethod
[docs]
def activity_code_change(cls, old: dict, new: dict) -> Self:
"""Special handling to change the `database` attribute of an activity node."""
return cls._direct_lci_node_difference(old, new, "activity_code_change")
@classmethod
[docs]
def activity_database_change(cls, old: dict, new: dict) -> Self:
"""Special handling to change the `database` attribute of an activity node."""
return cls._direct_lci_node_difference(old, new, "activity_database_change")
@classmethod
@classmethod
[docs]
def generate(
cls,
old: Optional[SignaledDataset],
new: Optional[SignaledDataset],
operation: Optional[str] = None,
) -> Optional[Self]:
"""
Generates a patch object from one version of an object to another.
Both `old` and `new` should be instances of `bw2data.backends.schema.SignaledDataset`.
`old` can be `None` if an object is being created.
`new` can be `None` is an object is being deleted.
Raises `IncompatibleClasses` is `old` and `new` have different classes.
"""
if operation is not None:
return getattr(cls, operation)(old, new)
if old is not None:
obj_id = old.id
obj_type = old.__class__
elif new is not None:
obj_id = new.id
obj_type = new.__class__
else:
raise ValueError("Both `new` and `old` are `None`")
if old is not None and new is not None:
if old.__class__ != new.__class__:
raise IncompatibleClasses(f"Can't diff {old.__class__} and {new.__class__}")
if old.id != new.id:
raise DifferentObjects(f"Can't diff different objects (ids {old.id} & {new.id})")
if old is not None and new is not None:
change_type = "update"
elif old is None and new is not None:
change_type = "create"
elif old is not None and new is None:
change_type = "delete"
label = SIGNALLEDOBJECT_TO_LABEL[obj_type]
handler = REVISIONED_LABEL_AS_OBJECT[label]
diff = deepdiff.DeepDiff(
handler.current_state_as_dict(old) if old else None,
handler.current_state_as_dict(new) if new else None,
verbose_level=2,
)
if not diff:
return None
return cls.from_difference(label, obj_id, change_type, diff)
[docs]
class JSONEncoder(json.JSONEncoder):
[docs]
def default(self, obj):
if not isinstance(obj, Delta):
return super().default(obj)
# XXX
obj.delta.serializer = deepdiff.serialization.json_dumps
return json.loads(obj.delta.dumps())
[docs]
def generate_revision(metadata: dict, delta: Sequence[Delta]) -> dict:
return {
"metadata": metadata,
"data": [
{"type": d.type, "id": d.id, "change_type": d.change_type, "delta": d} for d in delta
],
}
[docs]
generate_delta = Delta.generate
[docs]
class RevisionedORMProxy:
"""
Class that encapsulates logic around applying revisions. Used for `Activity` and `Exchange`.
We need a separate class because we apply the changes to `ActivityDataset`, but need to save
`Node` (and similar for edges).
"""
[docs]
ORM_CLASS: type["peewee.Model"]
[docs]
PROXY_CLASS: type["peewee.Model"]
[docs]
orm_as_dict: Callable[["peewee.Model"], dict]
@classmethod
[docs]
def handle(cls, revision_data: dict) -> None:
getattr(cls, revision_data["change_type"])(revision_data)
@classmethod
[docs]
def previous_state_as_dict(cls, revision_data: dict) -> dict:
orm_object = cls.ORM_CLASS.get_by_id(revision_data["id"])
return cls.orm_as_dict(orm_object)
@classmethod
[docs]
def current_state_as_dict(cls, obj: SignaledDataset) -> dict:
return cls.orm_as_dict(obj)
@classmethod
[docs]
def update(cls, revision_data: dict) -> None:
previous = cls.previous_state_as_dict(revision_data)
updated_data = Delta.from_dict(revision_data["delta"]).apply(previous)
updated_orm_object = cls.ORM_CLASS(**cls.prepare_data_dict_for_orm_class(updated_data))
updated_orm_object.id = revision_data["id"]
cls.PROXY_CLASS(document=updated_orm_object).save(signal=False, data_already_set=True)
@classmethod
[docs]
def delete(cls, revision_data: dict) -> None:
cls.PROXY_CLASS(cls.ORM_CLASS.get_by_id(revision_data["id"])).delete(signal=False)
@classmethod
[docs]
def prepare_data_dict_for_orm_class(cls, data: dict) -> dict:
return data
@classmethod
[docs]
def create(cls, revision_data: dict) -> None:
data = Delta.from_dict(revision_data["delta"]).apply({})
orm_object = cls.ORM_CLASS(**cls.prepare_data_dict_for_orm_class(data))
orm_object.id = revision_data["id"]
# Force insert because we specify the primary key already but object not in database
cls.PROXY_CLASS(document=orm_object).save(
signal=False, data_already_set=True, force_insert=True
)
[docs]
class RevisionedParameter(RevisionedORMProxy):
@classmethod
[docs]
def _state_as_dict(cls, obj: ParameterBase) -> dict:
return {key: getattr(obj, key) for key in cls.KEYS}
@classmethod
[docs]
def current_state_as_dict(cls, obj: ParameterBase) -> dict:
return cls._state_as_dict(obj)
@classmethod
[docs]
def previous_state_as_dict(cls, revision_data: dict) -> dict:
orm_object = cls.ORM_CLASS.get_by_id(revision_data["id"])
return cls._state_as_dict(orm_object)
@classmethod
[docs]
def update(cls, revision_data: dict) -> None:
previous = cls.previous_state_as_dict(revision_data)
updated_data = Delta.from_dict(revision_data["delta"]).apply(previous)
updated_orm_object = cls.ORM_CLASS(**updated_data)
updated_orm_object.id = revision_data["id"]
updated_orm_object.save(signal=False)
@classmethod
[docs]
def delete(cls, revision_data: dict) -> None:
cls.ORM_CLASS.get_by_id(revision_data["id"]).delete_instance(signal=False)
@classmethod
[docs]
def create(cls, revision_data: dict) -> None:
data = Delta.from_dict(revision_data["delta"]).apply({})
orm_object = cls.ORM_CLASS(**data)
orm_object.id = revision_data["id"]
# Force insert because we specify the primary key already but object not in database
orm_object.save(signal=False, force_insert=True)
@classmethod
[docs]
def _unwrap_diff_dict(cls, data: dict) -> dict:
return {
"old": data["delta"]["dictionary_item_removed"]["root['old']"],
"new": data["delta"]["dictionary_item_added"]["root['new']"],
}
[docs]
class RevisionedGroup(RevisionedParameter):
[docs]
KEYS = ("id", "name", "order")
# Implicitly skips `fresh` and `updated` fields because they are in `KEYS`.
[docs]
class RevisionedParameterizedExchange(RevisionedParameter):
[docs]
KEYS = ("id", "group", "formula", "exchange")
[docs]
ORM_CLASS = ParameterizedExchange
[docs]
class RevisionedProjectParameter(RevisionedParameter):
[docs]
KEYS = ("id", "name", "formula", "amount", "data")
[docs]
ORM_CLASS = ProjectParameter
@classmethod
[docs]
def project_parameter_recalculate(cls, revision_data: dict) -> None:
cls.ORM_CLASS.recalculate(signal=False)
@classmethod
[docs]
class RevisionedDatabaseParameter(RevisionedParameter):
[docs]
KEYS = ("id", "database", "name", "formula", "amount", "data")
[docs]
ORM_CLASS = DatabaseParameter
@classmethod
[docs]
def database_parameter_recalculate(cls, revision_data: dict) -> None:
cls.ORM_CLASS.recalculate(database=revision_data["id"], signal=False)
@classmethod
@classmethod
[docs]
class RevisionedActivityParameter(RevisionedParameter):
[docs]
KEYS = ("id", "group", "database", "code", "name", "formula", "amount", "data")
[docs]
ORM_CLASS = ActivityParameter
@classmethod
[docs]
def activity_parameter_recalculate(cls, revision_data: dict) -> None:
cls.ORM_CLASS.recalculate(group=revision_data["id"], signal=False)
@classmethod
[docs]
def activity_parameter_recalculate_exchanges(cls, revision_data: dict) -> None:
cls.ORM_CLASS.recalculate_exchanges(group=revision_data["id"], signal=False)
@classmethod
@classmethod
@classmethod
[docs]
class RevisionedNode(RevisionedORMProxy):
[docs]
ORM_CLASS: TypeAlias = Activity.ORMDataset
@classmethod
[docs]
def orm_as_dict(cls, orm_object: ORM_CLASS) -> dict:
return orm_object.data
@classmethod
[docs]
def prepare_data_dict_for_orm_class(cls, data: dict) -> dict:
return dict_as_activitydataset(data)
@classmethod
[docs]
def activity_database_change(cls, revision_data: dict) -> None:
"""Special handling for changing activity `database` attributes"""
node = get_node(id=revision_data["id"])
node._change_database(
new_database=revision_data["delta"]["values_changed"]["root['database']"]["new_value"],
signal=False,
)
@classmethod
[docs]
def activity_code_change(cls, revision_data: dict) -> None:
"""Special handling for changing activity `code` attributes"""
node = get_node(id=revision_data["id"])
node._change_code(
new_code=revision_data["delta"]["values_changed"]["root['code']"]["new_value"],
signal=False,
)
[docs]
class RevisionedEdge(RevisionedORMProxy):
[docs]
ORM_CLASS: TypeAlias = Exchange.ORMDataset
@classmethod
[docs]
def orm_as_dict(cls, orm_object: ORM_CLASS) -> dict:
return dict_as_exchangedataset(orm_object.data)
[docs]
class RevisionedDatabase:
@classmethod
[docs]
def handle(cls, revision_data: dict) -> None:
if revision_data["change_type"] == "database_metadata_change":
new_data = Delta.from_dict(revision_data["delta"]).apply(databases.data)
for name, value in new_data.items():
# Need to call this method to create search index database file
if value.get("searchable") and not databases.get(name, {}).get("searchable"):
DatabaseChooser(name).make_searchable(reset=False, signal=False)
elif not value.get("searchable") and databases.get(name, {}).get("searchable"):
DatabaseChooser(name).make_unsearchable(reset=False, signal=False)
databases.data = new_data
databases.flush(signal=False)
if revision_data["change_type"] == "database_reset":
DatabaseChooser(revision_data["id"]).delete(warn=False, signal=False)
if revision_data["change_type"] == "database_delete":
databases.__delitem__(revision_data["id"], signal=False)
[docs]
SIGNALLEDOBJECT_TO_LABEL = {
ActivityDataset: "lci_node",
ExchangeDataset: "lci_edge",
ProjectParameter: "project_parameter",
DatabaseParameter: "database_parameter",
ActivityParameter: "activity_parameter",
ParameterizedExchange: "parameterized_exchange",
Group: "group",
}
[docs]
REVISIONED_LABEL_AS_OBJECT: dict[str, type[RevisionedORMProxy]] = {
"lci_node": RevisionedNode,
"lci_edge": RevisionedEdge,
# TODO separate uses
"lci_database": RevisionedDatabase,
"project_parameter": RevisionedProjectParameter,
"database_parameter": RevisionedDatabaseParameter,
"activity_parameter": RevisionedActivityParameter,
"parameterized_exchange": RevisionedParameterizedExchange,
"group": RevisionedGroup,
}
[docs]
REVISIONS_OBJECT_AS_LABEL = {v: k for k, v in REVISIONED_LABEL_AS_OBJECT.items()}