__all__ = (
"ActivityParameter",
"DatabaseParameter",
"Group",
"GroupDependency",
"ParameterizedExchange",
"parameters",
"ProjectParameter",
)
import datetime
import itertools
import re
import uuid
from typing import Optional
import asteval
from asteval import Interpreter
from bw2parameters import ParameterSet
from bw2parameters.errors import MissingName
from peewee import BooleanField, Check, DateTimeField, FloatField, IntegerField, Model, TextField
from bw2data import config, databases, get_activity, projects
from bw2data.backends.schema import ExchangeDataset
from bw2data.signals import (
on_activity_parameter_recalculate,
on_activity_parameter_recalculate_exchanges,
on_activity_parameter_update_formula_activity_parameter_name,
on_activity_parameter_update_formula_database_parameter_name,
on_activity_parameter_update_formula_project_parameter_name,
on_database_parameter_recalculate,
on_database_parameter_update_formula_database_parameter_name,
on_database_parameter_update_formula_project_parameter_name,
on_project_parameter_recalculate,
on_project_parameter_update_formula_parameter_name,
)
from bw2data.snowflake_ids import SnowflakeIDBaseClass
from bw2data.sqlite import PickleField, SubstitutableDatabase
# https://stackoverflow.com/questions/34544784/arbitrary-string-to-valid-python-name
clean = lambda x: re.sub(r"\W|^(?=\d)", "_", x)
nonempty = lambda dct: {k: v for k, v in dct.items() if v is not None}
"""Autoupdate `updated` field in Group when parameters change"""
AUTOUPDATE_TRIGGER = """CREATE TRIGGER IF NOT EXISTS {table}_{action}_trigger AFTER {action} ON {table} BEGIN
UPDATE group_table SET updated = datetime('now') WHERE name = {name};
END;"""
"""Activity parameter groups can't cross databases"""
_CROSSDATABASE_TEMPLATE = """CREATE TRIGGER IF NOT EXISTS ap_crossdatabase_{action} BEFORE {action} ON activityparameter BEGIN
SELECT CASE WHEN
((SELECT COUNT(*) FROM activityparameter WHERE "group" = NEW."group") > 0)
AND (NEW.database NOT IN (SELECT DISTINCT "database" FROM activityparameter where "group" = NEW."group"))
THEN RAISE(ABORT,'Cross database group')
END;
END;"""
CROSSDATASE_INSERT_TRIGGER = _CROSSDATABASE_TEMPLATE.format(action="INSERT")
CROSSDATASE_UPDATE_TRIGGER = _CROSSDATABASE_TEMPLATE.format(action="UPDATE")
"""Activities can't be in multiple activity parameter groups"""
_CROSSGROUP_TEMPLATE = """CREATE TRIGGER IF NOT EXISTS ap_crossgroup_{action} BEFORE {action} ON activityparameter BEGIN
SELECT CASE WHEN EXISTS (SELECT * FROM activityparameter AS a WHERE
a.database = NEW.database AND
a.code = NEW.code AND
a."group" != NEW."group")
THEN RAISE(ABORT,'Cross group activity')
END;
END;"""
CROSSGROUP_INSERT_TRIGGER = _CROSSGROUP_TEMPLATE.format(action="INSERT")
CROSSGROUP_UPDATE_TRIGGER = _CROSSGROUP_TEMPLATE.format(action="UPDATE")
"""No circular dependences in activity parameter group dependencies"""
_CLOSURE_TEMPLATE = """CREATE TRIGGER IF NOT EXISTS gd_circular_{action} BEFORE {action} ON groupdependency BEGIN
SELECT CASE WHEN EXISTS (SELECT * FROM groupdependency AS g WHERE g."group" = NEW.depends AND g.depends = NEW."group")
THEN RAISE(ABORT,'Circular dependency')
END;
END;
"""
GD_INSERT_TRIGGER = _CLOSURE_TEMPLATE.format(action="INSERT")
GD_UPDATE_TRIGGER = _CLOSURE_TEMPLATE.format(action="UPDATE")
"""Parameterized exchange groups must be in activityparameters table"""
_PE_GROUP_TEMPLATE = """CREATE TRIGGER IF NOT EXISTS pe_group_{action} BEFORE {action} ON parameterizedexchange BEGIN
SELECT CASE WHEN
((SELECT COUNT(*) FROM activityparameter WHERE "group" = NEW."group") < 1)
THEN RAISE(ABORT,'Missing activity parameter group')
END;
END;
"""
PE_INSERT_TRIGGER = _PE_GROUP_TEMPLATE.format(action="INSERT")
PE_UPDATE_TRIGGER = _PE_GROUP_TEMPLATE.format(action="UPDATE")
class ParameterBase(SnowflakeIDBaseClass):
__repr__ = lambda x: str(x)
def __lt__(self, other):
if type(self) != type(other):
raise TypeError
else:
return self.name.lower() < other.name.lower()
@classmethod
def create_table(cls):
super(ParameterBase, cls).create_table()
cls._meta.database.execute_sql(
AUTOUPDATE_TRIGGER.format(action="INSERT", name=cls._new_name, table=cls._db_table)
)
for action in ("UPDATE", "DELETE"):
cls._meta.database.execute_sql(
AUTOUPDATE_TRIGGER.format(action=action, name=cls._old_name, table=cls._db_table)
)
@staticmethod
def expire_downstream(group):
"""Expire any activity parameters that depend on this group"""
Group.update(fresh=False).where(
Group.name
<< GroupDependency.select(GroupDependency.group).where(GroupDependency.depends == group)
).execute()
[docs]
class ProjectParameter(ParameterBase):
"""Parameter set for a project. Group name is 'project'.
Columns:
- name: str, unique
- formula: str, optional
- amount: float, optional
- data: object, optional. Used for any other metadata.
Note that there is no magic for reading and writing to ``data`` (unlike ``Activity`` objects) - it must be used directly.
"""
[docs]
name = TextField(index=True, unique=True)
[docs]
amount = FloatField(null=True)
[docs]
data = PickleField(default={})
[docs]
_old_name = "'project'"
[docs]
_new_name = "'project'"
[docs]
_db_table = "projectparameter"
def __str__(self):
return "Project parameter: {}".format(self.name)
[docs]
def save(self, *args, **kwargs):
Group.get_or_create(name="project")[0].expire()
super(ProjectParameter, self).save(*args, **kwargs)
@staticmethod
[docs]
def load(group=None):
"""Return dictionary of parameter data with names as keys and ``.dict()`` as values."""
def reformat(o):
o = o.dict
return (o.pop("name"), o)
return dict([reformat(o) for o in ProjectParameter.select()])
@staticmethod
[docs]
def static(ignored="project", only=None):
"""Get dictionary of ``{name: amount}`` for all project parameters.
``only`` restricts returned names to ones found in ``only``. ``ignored`` included for API compatibility with other ``recalculate`` methods.
"""
result = dict(
ProjectParameter.select(ProjectParameter.name, ProjectParameter.amount).tuples()
)
if only is not None:
result = {k: v for k, v in result.items() if k in only}
return result
@staticmethod
[docs]
def expired():
"""Return boolean - is this group expired?"""
try:
return not Group.get(name="project").fresh
except Group.DoesNotExist:
return False
@staticmethod
[docs]
def recalculate(ignored: Optional[bool] = None, signal: bool = True):
"""Recalculate all parameters.
``ignored`` included for API compatibility with other ``recalculate`` methods - it will really be ignored.
"""
if not ProjectParameter.expired():
return
data = ProjectParameter.load()
if not data:
return
ParameterSet(data).evaluate_and_set_amount_field()
with parameters.db.atomic() as _:
for key, value in data.items():
ProjectParameter.update(
amount=value["amount"],
).where(ProjectParameter.name == key).execute()
Group.get_or_create(name="project")[0].freshen()
ProjectParameter.expire_downstream("project")
if signal:
on_project_parameter_recalculate.send()
@staticmethod
[docs]
def dependency_chain():
"""Determine if ```ProjectParameter`` parameters have dependencies
within the group.
Returns:
.. code-block:: python
[
{
'kind': 'project',
'group': 'project',
'names': set of variables names
}
]
"""
data = ProjectParameter.load()
if not data:
return []
# Parse all formulas, find missing variables
needed = get_new_symbols(data.values())
if not needed:
return []
missing = needed.difference(data)
if missing:
raise MissingName(
"The following variables aren't defined:\n{}".format("|".join(missing))
)
return [{"kind": "project", "group": "project", "names": needed}]
@staticmethod
[docs]
def is_dependency_within_group(name):
own_group = next(iter(ProjectParameter.dependency_chain()), {})
return True if name in own_group.get("names", set()) else False
[docs]
def is_deletable(self):
"""Perform a test to see if the current parameter can be deleted."""
if ProjectParameter.is_dependency_within_group(self.name):
return False
# Test the database parameters
if DatabaseParameter.is_dependent_on(self.name):
return False
# Test activity parameters
if ActivityParameter.is_dependent_on(self.name, "project"):
return False
return True
@classmethod
@property
[docs]
def dict(self):
"""Parameter data as a standardized dictionary"""
obj = nonempty(
{
"name": self.name,
"formula": self.formula,
"amount": self.amount,
}
)
obj.update(self.data)
return obj
[docs]
class DatabaseParameter(ParameterBase):
"""Parameter set for a database. Group name is the name of the database.
Columns:
* database: str
* name: str, unique within a database
* formula: str, optional
* amount: float, optional
* data: object, optional. Used for any other metadata.
Note that there is no magic for reading and writing to ``data`` (unlike ``Activity`` objects) - it must be used directly.
"""
[docs]
database = TextField(index=True)
[docs]
name = TextField(index=True)
[docs]
amount = FloatField(null=True)
[docs]
data = PickleField(default={})
[docs]
_old_name = "OLD.database"
[docs]
_new_name = "NEW.database"
[docs]
_db_table = "databaseparameter"
def __str__(self):
return "Database parameter: {}:{}".format(self.database, self.name)
@staticmethod
[docs]
def load(database):
"""Return dictionary of parameter data with names as keys and ``.dict()`` as values."""
def reformat(o):
o = o.dict
return (o.pop("name"), o)
return dict(
[
reformat(o)
for o in DatabaseParameter.select().where(DatabaseParameter.database == database)
]
)
@staticmethod
[docs]
def expired(database):
"""Return boolean - is this group expired?"""
try:
return not Group.get(name=database).fresh
except Group.DoesNotExist:
return False
@staticmethod
[docs]
def static(database, only=None):
"""Return dictionary of {name: amount} for database group."""
result = dict(
DatabaseParameter.select(DatabaseParameter.name, DatabaseParameter.amount)
.where(DatabaseParameter.database == database)
.tuples()
)
if only is not None:
result = {k: v for k, v in result.items() if k in only}
return result
@staticmethod
[docs]
def recalculate(database: str, signal: bool = True):
"""Recalculate all database parameters for ``database``, if expired."""
if ProjectParameter.expired():
ProjectParameter.recalculate()
# Can we avoid doing anything?
if not DatabaseParameter.expired(database):
return
data = DatabaseParameter.load(database)
if not data:
return
# Parse all formulas, find missing variables
new_symbols = get_new_symbols(data.values(), set(data))
found_symbols = {x[0] for x in ProjectParameter.select(ProjectParameter.name).tuples()}
missing = new_symbols.difference(found_symbols)
if missing:
raise MissingName(
"The following variables aren't defined:\n{}".format("|".join(missing))
)
# Add or delete `project` dependency if needed
if new_symbols:
GroupDependency.get_or_create(group=database, depends="project")
# Load needed project variables as {'foo': 42} dict
glo = ProjectParameter.static(only=new_symbols)
else:
GroupDependency.delete().where(
GroupDependency.group == database, GroupDependency.depends == "project"
).execute()
glo = None
# Update database parameter values
ParameterSet(data, glo).evaluate_and_set_amount_field()
with parameters.db.atomic():
for key, value in data.items():
DatabaseParameter.update(
amount=value["amount"],
).where(
DatabaseParameter.name == key,
DatabaseParameter.database == database,
).execute()
Group.get(name=database).freshen()
DatabaseParameter.expire_downstream(database)
if signal:
on_database_parameter_recalculate.send(name=database)
@staticmethod
[docs]
def dependency_chain(group, include_self=False):
"""Find where each missing variable is defined in dependency chain.
If ``include_self`` is True will include parameters within the group as possible dependencies
Returns:
.. code-block:: python
[
{
'kind': one of 'project', 'database', 'activity',
'group': group name,
'names': set of variables names
}
]
"""
data = DatabaseParameter.load(group)
if not data:
return []
# Parse all formulas, find missing variables
context = set(data) if not include_self else set()
needed = get_new_symbols(data.values(), context=context)
if not needed:
return []
names, chain = set(), []
if include_self:
included = needed.intersection(data)
for name in included:
names.add(name)
needed.remove(name)
if names:
chain.append({"kind": "database", "group": group, "names": names})
if needed:
names = set()
for name in ProjectParameter.static(only=needed):
names.add(name)
needed.remove(name)
if names:
chain.insert(0, {"kind": "project", "group": "project", "names": names})
if needed:
raise MissingName(
"The following variables aren't defined:\n{}".format("|".join(needed))
)
return chain
@staticmethod
[docs]
def is_dependency_within_group(name, database):
own_group = next(
(
x
for x in DatabaseParameter.dependency_chain(database, include_self=True)
if x.get("group") == database
),
{},
)
return True if name in own_group.get("names", set()) else False
[docs]
def save(self, *args, **kwargs):
"""Save this model instance"""
Group.get_or_create(name=self.database)[0].expire()
super(DatabaseParameter, self).save(*args, **kwargs)
[docs]
def is_deletable(self):
"""Perform a test to see if the current parameter can be deleted."""
# Test if the current parameter is used by other database parameters
if DatabaseParameter.is_dependency_within_group(self.name, self.database):
return False
# Then test all relevant activity parameters
if ActivityParameter.is_dependent_on(self.name, self.database):
return False
return True
@staticmethod
[docs]
def is_dependent_on(name):
"""Test if any database parameters are dependent on the given
project parameter name.
"""
query = (
GroupDependency.select(GroupDependency.group)
.where(GroupDependency.depends == "project")
.distinct()
)
for row in query.execute():
chain = DatabaseParameter.dependency_chain(row.group)
own_group = next((x for x in chain if x.get("group") == "project"), {})
if name in own_group.get("names", set()):
return True
return False
@classmethod
@classmethod
@property
[docs]
def dict(self):
"""Parameter data as a standardized dictionary"""
obj = nonempty(
{
"database": self.database,
"name": self.name,
"formula": self.formula,
"amount": self.amount,
}
)
obj.update(self.data)
return obj
[docs]
class ActivityParameter(ParameterBase):
"""Parameter set for a group of activities.
Columns:
- group: str
- database: str
- code: str. Code and database define the linked activity for this parameter.
- name: str, unique within a group
- formula: str, optional
- amount: float, optional
- data: object, optional. Used for any other metadata.
Activities can only have parameters in one group. Group names cannot be 'project' or the name of any existing database.
Activity parameter groups can depend on other activity parameter groups, so that a formula in group "a" can depend on a variable in group "b". This dependency information is stored in ``Group.order`` - in our small example, we could define the following:
.. code-block:: python
a = Group.get(name="a")
a.order = ["b", "c"]
a.save()
In this case, a variable not found in "a" would be searched for in "b" and then "c", in that order. Database and then project parameters are also implicitly included at the end of ``Group.order``.
Note that there is no magic for reading and writing to ``data`` (unlike ``Activity`` objects) - it must be used directly.
"""
[docs]
amount = FloatField(null=True)
[docs]
data = PickleField(default={})
[docs]
_old_name = 'OLD."group"'
[docs]
_new_name = 'NEW."group"'
[docs]
_db_table = "activityparameter"
def __str__(self):
return "Activity parameter: {}:{}".format(self.group, self.name)
@staticmethod
[docs]
def load(group):
"""Return dictionary of parameter data with names as keys and ``.dict()`` as values."""
def reformat(o):
o = o.dict
return (o.pop("name"), o)
return dict(
[
reformat(o)
for o in ActivityParameter.select().where(ActivityParameter.group == group)
]
)
@staticmethod
[docs]
def static(group, only=None, full=False):
"""Get dictionary of ``{name: amount}`` for parameters defined in ``group``.
``only`` restricts returned names to ones found in ``only``. ``full`` returns all names, including those found in the dependency chain.
"""
result = dict(
ActivityParameter.select(ActivityParameter.name, ActivityParameter.amount)
.where(ActivityParameter.group == group)
.tuples()
)
if full:
temp = ActivityParameter._static_dependencies(group)
temp.update(result)
result = temp
if only is not None:
result = {k: v for k, v in result.items() if k in only}
return result
@staticmethod
[docs]
def _static_dependencies(group):
"""Get dictionary of ``{name: amount}`` for all variables defined in dependency chain.
Be careful! This could have variables which overlap with local variable names. Designed for internal use.
"""
database = ActivityParameter.get(group=group).database
chain = [ProjectParameter.static(), DatabaseParameter.static(database)] + [
ActivityParameter.static(g) for g in Group.get(name=group).order[::-1]
]
result = {}
for dct in chain:
result.update(dct)
return result
@staticmethod
[docs]
def insert_dummy(group: str, activity: tuple, signal: bool = True):
code, database = activity[1], activity[0]
if (
not ActivityParameter.select()
.where(
ActivityParameter.group == group,
ActivityParameter.code == code,
ActivityParameter.database == database,
)
.count()
):
ActivityParameter(
group=group,
name="__dummy_{}__".format(uuid.uuid4().hex),
code=code,
database=database,
amount=0,
).save(signal=signal)
@staticmethod
[docs]
def expired(group):
"""Return boolean - is this group expired?"""
try:
return not Group.get(name=group).fresh
except Group.DoesNotExist:
return False
@staticmethod
[docs]
def dependency_chain(group, include_self=False):
"""Find where each missing variable is defined in dependency chain.
Will also load in all parameters needed to resolve the ``ParameterizedExchanges`` for this group.
If ``include_self`` is True will include parameters within the group as possible dependencies
Returns:
.. code-block:: python
[
{
'kind': one of 'project', 'database', 'activity',
'group': group name,
'names': set of variables names
}
]
"""
data = ActivityParameter.load(group)
if not data:
return []
# Parse all formulas, find missing variables
context = set(data) if not include_self else None
activity_needed = get_new_symbols(data.values(), context=context)
exchanges_needed = get_new_symbols(
ParameterizedExchange.load(group).values(), context=context
)
needed = activity_needed.union(exchanges_needed)
if not needed:
return []
chain = []
# Iteratively search through other activity params,
# then db params, then project params
for new_group in Group.get(name=group).order:
names = set()
for name in ActivityParameter.static(new_group, only=needed):
names.add(name)
needed.remove(name)
if names:
chain.append({"kind": "activity", "group": new_group, "names": names})
if needed and include_self:
names = set()
included = needed.intersection(data)
for name in included:
names.add(name)
needed.remove(name)
if names:
chain.append({"kind": "activity", "group": group, "names": names})
if needed:
database = ActivityParameter.get(group=group).database
names = set()
for name in DatabaseParameter.static(database, only=needed):
names.add(name)
needed.remove(name)
if names:
chain.append({"kind": "database", "group": database, "names": names})
if needed:
names = set()
for name in ProjectParameter.static(only=needed):
names.add(name)
needed.remove(name)
if names:
chain.append({"kind": "project", "group": "project", "names": names})
if needed:
raise MissingName(
"The following variables aren't defined:\n{}".format("|".join(needed))
)
return chain
@staticmethod
[docs]
def is_dependency_within_group(name, group, include_order=False):
"""Determine if the given parameter `name` is a dependency within
the given activity `group`.
The optional ``include_order`` parameter will include dependencies
from groups found in the the ``Group``.`order` field.
"""
chain = ActivityParameter.dependency_chain(group, include_self=True)
own_group = next((x for x in chain if x.get("group") == group), {})
names = own_group.get("names", set())
if include_order:
for new_group in Group.get(name=group).order:
order_group = next((x for x in chain if x.get("group") == new_group), {})
names = names.union(order_group.get("names", set()))
return True if name in names else False
@staticmethod
[docs]
def recalculate(group: str, signal: bool = True):
"""Recalculate all values for activity parameters in this group, and update their underlying `Activity` and `Exchange` values."""
# Start by traversing and updating the list of dependencies
if not ActivityParameter.expired(group):
return
chain = ActivityParameter.dependency_chain(group)
# Reset dependencies and dependency order
if chain:
obj = Group.get(name=group)
obj.order = [o["group"] for o in chain if o["kind"] == "activity"]
obj.save()
GroupDependency.delete().where(GroupDependency.group == group).execute()
GroupDependency.insert_many(
[{"group": group, "depends": o["group"]} for o in chain]
).execute()
# Update all upstream groups
mapping = {
"project": ProjectParameter,
"database": DatabaseParameter,
"activity": ActivityParameter,
}
# Not guaranteed to be the most efficient,
# but definitely simplest for now.
# Could be smarter here in the future
# Shouldn't be any race conditions because check for
# circular dependencies
for row in chain[::-1]:
mapping[row["kind"]].recalculate(row["group"])
# Update activity parameter values
data = ActivityParameter.load(group)
static = {
k: v for k, v in ActivityParameter._static_dependencies(group).items() if k not in data
}
ParameterSet(data, static).evaluate_and_set_amount_field()
with parameters.db.atomic():
for key, value in data.items():
ActivityParameter.update(
amount=value["amount"],
).where(
ActivityParameter.name == key,
ActivityParameter.group == group,
).execute()
Group.get(name=group).freshen()
ActivityParameter.expire_downstream(group)
ActivityParameter.recalculate_exchanges(group, signal=False)
if signal:
on_activity_parameter_recalculate.send(name=group)
@staticmethod
[docs]
def recalculate_exchanges(group: str, signal: bool = True):
"""Recalculate formulas for all parameterized exchanges in group ``group``."""
if ActivityParameter.expired(group):
return ActivityParameter.recalculate(group)
interpreter = Interpreter()
for k, v in ActivityParameter.static(group, full=True).items():
interpreter.symtable[k] = v
# TODO: Remove uncertainty from exchanges?
for obj in ParameterizedExchange.select().where(ParameterizedExchange.group == group):
exc = ExchangeDataset.get(id=obj.exchange)
exc.data["amount"] = interpreter(obj.formula)
exc.save(signal=False)
databases.set_dirty(ActivityParameter.get(group=group).database)
if signal:
on_activity_parameter_recalculate_exchanges.send(name=group)
[docs]
def save(self, *args, **kwargs):
"""Save this model instance"""
Group.get_or_create(name=self.group)[0].expire()
super().save(*args, **kwargs)
[docs]
def is_deletable(self):
"""Perform a test to see if the current parameter can be deleted."""
# First check own group
if ActivityParameter.is_dependency_within_group(self.name, self.group):
return False
# Then test other relevant activity groups.
if ActivityParameter.is_dependent_on(self.name, self.group):
return False
return True
@staticmethod
[docs]
def is_dependent_on(name, group):
"""Test if any activity parameters are dependent on the given
parameter name from the given group.
"""
query = (
GroupDependency.select(GroupDependency.group)
.where(GroupDependency.depends == group)
.distinct()
)
for row in query.execute():
chain = ActivityParameter.dependency_chain(row.group)
own_group = next((x for x in chain if x.get("group") == group), {})
if name in own_group.get("names", set()):
return True
return False
@classmethod
@classmethod
@classmethod
@classmethod
[docs]
def create_table(cls):
super(ActivityParameter, cls).create_table()
cls._meta.database.execute_sql(CROSSDATASE_UPDATE_TRIGGER)
cls._meta.database.execute_sql(CROSSDATASE_INSERT_TRIGGER)
cls._meta.database.execute_sql(CROSSGROUP_UPDATE_TRIGGER)
cls._meta.database.execute_sql(CROSSGROUP_INSERT_TRIGGER)
@property
[docs]
def dict(self):
"""Parameter data as a standardized dictionary"""
obj = nonempty(
{
"database": self.database,
"code": self.code,
"name": self.name,
"formula": self.formula,
"amount": self.amount,
}
)
obj.update(self.data)
return obj
[docs]
class ParameterizedExchange(SnowflakeIDBaseClass):
[docs]
exchange = IntegerField(unique=True)
@classmethod
[docs]
def create_table(cls):
super(ParameterizedExchange, cls).create_table()
cls._meta.database.execute_sql(PE_UPDATE_TRIGGER)
cls._meta.database.execute_sql(PE_INSERT_TRIGGER)
[docs]
def save(self, *args, **kwargs):
Group.get_or_create(name=self.group)[0].expire()
super().save(*args, **kwargs)
# Push the changed formula to the Exchange.
exc = ExchangeDataset.get_or_none(id=self.exchange)
if exc and exc.data.get("formula") != self.formula:
exc.data["formula"] = self.formula
exc.save(signal=False)
@staticmethod
[docs]
def load(group):
"""Return dictionary of parameter data with names as keys and ``.dict()`` as values."""
return {
o.exchange: o.formula
for o in ParameterizedExchange.select().where(ParameterizedExchange.group == group)
}
@staticmethod
[docs]
def recalculate(group):
"""Shortcut for ``ActivityParameter.recalculate_exchanges``."""
return ActivityParameter.recalculate_exchanges(group)
[docs]
class Group(SnowflakeIDBaseClass):
[docs]
name = TextField(unique=True)
[docs]
fresh = BooleanField(default=True)
[docs]
updated = DateTimeField(default=datetime.datetime.now)
[docs]
order = PickleField(default=[])
[docs]
def expire(self):
"""Set ``fresh`` to ``False``"""
self.fresh = False
self.save()
[docs]
def freshen(self):
"""Set ``fresh`` to ``True``"""
self.fresh = True
self.save()
[docs]
def save(self, *args, **kwargs):
"""Save this model instance. Will remove 'project' and database names from ``order``."""
self.purge_order()
super(Group, self).save(*args, **kwargs)
[docs]
def purge_order(self):
reserved = set(databases).union(set(["project"]))
self.order = [x for x in self.order if x not in reserved]
[docs]
class GroupDependency(Model):
[docs]
def save(self, *args, **kwargs):
if self.group == "project":
raise ValueError("`project` group can't have dependencies")
elif self.group in databases and self.depends != "project":
raise ValueError("Database groups can only depend on `project`")
super(GroupDependency, self).save(*args, **kwargs)
@classmethod
[docs]
def create_table(cls):
super(GroupDependency, cls).create_table()
cls._meta.database.execute_sql(GD_UPDATE_TRIGGER)
cls._meta.database.execute_sql(GD_INSERT_TRIGGER)
class ParameterManager:
def __init__(self):
self.db = SubstitutableDatabase(
projects.dir / "parameters.db",
[
DatabaseParameter,
ProjectParameter,
ActivityParameter,
ParameterizedExchange,
Group,
GroupDependency,
],
)
config.sqlite3_databases.append(("parameters.db", self.db))
def add_to_group(self, group, activity):
"""Add `activity` to group.
Creates ``group`` if needed.
Will delete any existing ``ActivityParameter`` for this activity.
Deletes `parameters` key from `Activity`."""
Group.get_or_create(name=group)
activity = get_activity((activity[0], activity[1]))
if "parameters" not in activity:
return
# Avoid duplicate by deleting existing parameters
# Call in loop to get event handling
for ap in ActivityParameter.select().where(
ActivityParameter.database == activity["database"],
ActivityParameter.code == activity["code"],
):
ap.delete_instance()
def reformat(o):
skipped = ("name", "amount", "formula")
return [
nonempty(
{
"group": group,
"database": o["database"],
"code": o["code"],
"name": p["name"],
"formula": p.get("formula"),
"amount": p.get("amount", 0),
"data": {k: v for k, v in p.items() if k not in skipped},
}
)
for p in o.get("parameters", [])
]
# Get formatted parameters
with self.db.atomic():
for row in reformat(activity):
ActivityParameter.create(**row)
# Parameters are now "active", remove from `Activity`
del activity["parameters"]
activity.save()
self.add_exchanges_to_group(group, activity)
return (
ActivityParameter.select()
.where(
ActivityParameter.database == activity["database"],
ActivityParameter.code == activity["code"],
)
.count()
)
def remove_from_group(self, group, activity, restore_amounts=True):
"""Remove `activity` from `group`.
Will delete any existing ``ActivityParameter`` and ``ParameterizedExchange`` for this activity.
Restores `parameters` key to this `Activity`.
By default, restores `amount` value of each parameterized exchange
of the `Activity` to the original value. This can be avoided by using
the ``restore_amounts`` parameter.
"""
def drop_fields(dct):
dct = {k: v for k, v in dct.items() if k not in ("database", "code")}
return dct.pop("name"), dct
activity = get_activity((activity[0], activity[1]))
activity["parameters"] = dict(
[
drop_fields(o.dict)
for o in ActivityParameter.select().where(
ActivityParameter.database == activity[0],
ActivityParameter.code == activity[1],
)
]
)
with self.db.atomic():
self.remove_exchanges_from_group(group, activity, restore_amounts)
# Call in loop to get event handling
for ap in ActivityParameter.select().where(
ActivityParameter.database == activity[0],
ActivityParameter.code == activity[1],
):
ap.delete_instance()
activity.save()
def add_exchanges_to_group(self, group, activity):
"""Add exchanges with formulas from ``activity`` to ``group``.
Every exchange with a formula field will have its original `amount`
value stored as `original_amount`. This original value can be
restored when parameterization is removed from the activity with
`remove_from_group`.
"""
count = 0
if (
not ActivityParameter.select()
.where(
ActivityParameter.database == activity[0],
ActivityParameter.code == activity[1],
)
.count()
):
ActivityParameter.insert_dummy(group, activity)
for exc in get_activity((activity[0], activity[1])).exchanges():
if "formula" in exc:
try:
obj = ParameterizedExchange.get(exchange=exc._document.id)
except ParameterizedExchange.DoesNotExist:
obj = ParameterizedExchange(exchange=exc._document.id)
obj.group = group
obj.formula = exc["formula"]
obj.save()
if "original_amount" not in exc:
exc["original_amount"] = exc["amount"]
exc.save()
count += 1
return count
def remove_exchanges_from_group(self, group, activity, restore_original=True):
"""Takes a group and activity and removes all ``ParameterizedExchange``
objects from the group.
The ``restore_original`` parameter determines if the original amount
values will be restored to those exchanges where a formula was used
to alter the amount.
"""
if restore_original:
for exc in (ex for ex in activity.exchanges() if "original_amount" in ex):
exc["amount"] = exc["original_amount"]
del exc["original_amount"]
exc.save()
# Call in loop to get event handling
for pe in ParameterizedExchange.select().where(ParameterizedExchange.group == group):
pe.delete_instance()
def new_project_parameters(self, data, overwrite=True):
"""Correctly enter multiple parameters.
Will overwrite existing project parameters with the same name, unless ``overwrite`` is false, in which case a ``ValueError`` is raised.
``data`` should be a list of dictionaries:
.. code-block:: python
[{
'name': name of variable (unique),
'amount': numeric value of variable (optional),
'formula': formula in Python as string (optional),
optional keys like uncertainty, etc. (no limitations)
}]
"""
potentially_non_unique_names = [ds["name"] for ds in data]
unique_names = list(set(potentially_non_unique_names))
assert len(unique_names) == len(potentially_non_unique_names), "Nonunique names: {}".format(
[p for p in unique_names if potentially_non_unique_names.count(p) > 1]
)
def reformat(ds):
return {
"name": ds.pop("name"),
"amount": ds.pop("amount", 0),
"formula": ds.pop("formula", None),
"data": ds,
}
data = [reformat(ds) for ds in data]
new = {o["name"] for o in data}
existing = {o[0] for o in ProjectParameter.select(ProjectParameter.name).tuples()}
if new.intersection(existing) and not overwrite:
raise ValueError(
"The following parameters already exist:\n{}".format(
"|".join(new.intersection(existing))
)
)
with self.db.atomic():
# Remove existing values
# Call in loop to get event handling
for pp in ProjectParameter.select().where(ProjectParameter.name << tuple(new)):
pp.delete_instance()
for dataset in data:
ProjectParameter.create(**dataset)
Group.get_or_create(name="project")[0].expire()
ProjectParameter.recalculate()
def new_database_parameters(self, data, database, overwrite=True):
"""Correctly enter multiple parameters. Deletes **all** existing database parameters for this database.
Will overwrite existing database parameters with the same name, unless ``overwrite`` is false, in which case a ``ValueError`` is raised.
``database`` should be an existing database. ``data`` should be a list of dictionaries:
.. code-block:: python
[{
'name': name of variable (unique),
'amount': numeric value of variable (optional),
'formula': formula in Python as string (optional),
optional keys like uncertainty, etc. (no limitations)
}]
"""
assert database in databases, "Unknown database"
potentially_non_unique_names = [ds["name"] for ds in data]
unique_names = list(set(potentially_non_unique_names))
assert len(unique_names) == len(potentially_non_unique_names), "Nonunique names: {}".format(
[p for p in unique_names if potentially_non_unique_names.count(p) > 1]
)
def reformat(ds):
return {
"database": database,
"name": ds.pop("name"),
"amount": ds.pop("amount", 0),
"formula": ds.pop("formula", None),
"data": ds,
}
data = [reformat(ds) for ds in data]
new = {o["name"] for o in data}
existing = {
o[0]
for o in DatabaseParameter.select(DatabaseParameter.name)
.where(DatabaseParameter.database == database)
.tuples()
}
if new.intersection(existing) and not overwrite:
raise ValueError(
"The following parameters already exist:\n{}".format(
"|".join(new.intersection(existing))
)
)
with self.db.atomic():
# Remove existing values
for dp in DatabaseParameter.select().where(
DatabaseParameter.database == database,
DatabaseParameter.name << tuple(new),
):
dp.delete_instance()
for dataset in data:
DatabaseParameter.create(**dataset)
Group.get_or_create(name=database)[0].expire()
DatabaseParameter.recalculate(database)
def new_activity_parameters(self, data, group, overwrite=True):
"""Correctly enter multiple parameters. Deletes **all** existing activity parameters for this group.
Will overwrite existing parameters in the same group with the same name, unless ``overwrite`` is false, in which case a ``ValueError`` is raised.
Input parameters must refer to a single, existing database.
``group`` is the group name; will be autocreated if necessary. ``data`` should be a list of dictionaries:
.. code-block:: python
[{
'name': name of variable (unique),
'database': activity database,
'code': activity code,
'amount': numeric value of variable (optional),
'formula': formula in Python as string (optional),
optional keys like uncertainty, etc. (no limitations)
}]
"""
database = {o["database"] for o in data}
assert len(database) == 1, "Multiple databases"
assert database.pop() in databases, "Unknown database"
potentially_non_unique_names = [o["name"] for o in data]
unique_names = list(set(potentially_non_unique_names))
assert len(unique_names) == len(potentially_non_unique_names), "Nonunique names: {}".format(
[p for p in unique_names if potentially_non_unique_names.count(p) > 1]
)
Group.get_or_create(name=group)
def reformat(ds):
return {
"group": group,
"database": ds.pop("database"),
"code": ds.pop("code"),
"name": ds.pop("name"),
"formula": ds.pop("formula", None),
"amount": ds.pop("amount", 0),
"data": ds,
}
data = [reformat(ds) for ds in data]
new = {o["name"] for o in data}
existing = {
o[0]
for o in ActivityParameter.select(ActivityParameter.name)
.where(ActivityParameter.group == group)
.tuples()
}
if new.intersection(existing) and not overwrite:
raise ValueError(
"The following parameters already exist:\n{}".format(
"|".join(new.intersection(existing))
)
)
with self.db.atomic():
# Remove existing values
for ap in ActivityParameter.select().where(
ActivityParameter.group == group, ActivityParameter.name << new
):
ap.delete_instance()
for dataset in data:
ActivityParameter.create(**dataset)
Group.get_or_create(name=group)[0].expire()
ActivityParameter.recalculate(group)
def rename_project_parameter(self, parameter, new_name, update_dependencies=False):
"""Given a parameter and a new name, safely update the parameter.
Will raise a TypeError if the given parameter is of the incorrect type.
Will raise a ValueError if other parameters depend on the given one
and ``update_dependencies`` is False.
"""
if not isinstance(parameter, ProjectParameter):
raise TypeError("Incorrect parameter type for this method.")
if parameter.name == new_name:
return
project = ProjectParameter.is_dependency_within_group(parameter.name)
database = DatabaseParameter.is_dependent_on(parameter.name)
activity = ActivityParameter.is_dependent_on(parameter.name, "project")
if not update_dependencies and any([project, database, activity]):
raise ValueError(
"Parameter '{}' is used in other (downstream) formulas".format(parameter.name)
)
with self.db.atomic():
if project:
ProjectParameter.update_formula_parameter_name(parameter.name, new_name)
if database:
DatabaseParameter.update_formula_project_parameter_name(parameter.name, new_name)
if activity:
ActivityParameter.update_formula_project_parameter_name(parameter.name, new_name)
parameter.name = new_name
parameter.save()
self.recalculate()
def rename_database_parameter(self, parameter, new_name, update_dependencies=False):
"""Given a parameter and a new name, safely update the parameter.
Will raise a TypeError if the given parameter is of the incorrect type.
Will raise a ValueError if other parameters depend on the given one
and ``update_dependencies`` is False.
"""
if not isinstance(parameter, DatabaseParameter):
raise TypeError("Incorrect parameter type for this method.")
if parameter.name == new_name:
return
database = DatabaseParameter.is_dependency_within_group(parameter.name, parameter.database)
activity = ActivityParameter.is_dependent_on(parameter.name, parameter.database)
if not update_dependencies and any([database, activity]):
raise ValueError(
"Parameter '{}' is used in other (downstream) formulas".format(parameter.name)
)
with self.db.atomic():
if database:
DatabaseParameter.update_formula_database_parameter_name(parameter.name, new_name)
if activity:
ActivityParameter.update_formula_database_parameter_name(parameter.name, new_name)
parameter.name = new_name
parameter.save()
self.recalculate()
def rename_activity_parameter(self, parameter, new_name, update_dependencies=False):
"""Given a parameter and a new name, safely update the parameter.
Will raise a TypeError if the given parameter is of the incorrect type.
Will raise a ValueError if other parameters depend on the given one
and ``update_dependencies`` is False.
"""
if not isinstance(parameter, ActivityParameter):
raise TypeError("Incorrect parameter type for this method.")
if parameter.name == new_name:
return
activity = any(
[
ActivityParameter.is_dependency_within_group(
parameter.name, parameter.group, include_order=True
),
ActivityParameter.is_dependent_on(parameter.name, parameter.group),
]
)
if not update_dependencies and activity:
raise ValueError(
"Parameter '{}' is used in other (downstream) formulas".format(parameter.name)
)
with self.db.atomic():
if activity:
ActivityParameter.update_formula_activity_parameter_name(
parameter.name, new_name, include_order=True
)
parameter.name = new_name
parameter.save()
self.recalculate()
def recalculate(self):
"""Recalculate all expired project, database, and activity parameters, as well as exchanges."""
if ProjectParameter.expired():
ProjectParameter.recalculate()
for db in databases:
if DatabaseParameter.expired(db):
DatabaseParameter.recalculate(db)
for obj in Group.select().where(Group.fresh == False):
# Shouldn't be possible? Maybe concurrent access?
if obj.name in databases or obj.name == "project":
continue
ActivityParameter.recalculate(obj.name)
def __len__(self):
return (
DatabaseParameter.select().count()
+ ProjectParameter.select().count()
+ ActivityParameter.select().count()
)
def __repr__(self):
return "Parameters manager with {} objects".format(len(self))
[docs]
parameters = ParameterManager()
def get_new_symbols(data, context=None):
interpreter = asteval.Interpreter()
BUILTIN_SYMBOLS = set(interpreter.symtable).union(set(context or set()))
found = set()
for ds in data:
if isinstance(ds, str):
formula = ds
elif "formula" in ds:
formula = ds["formula"]
else:
continue
nf = asteval.NameFinder()
nf.generic_visit(interpreter.parse(formula))
found.update(set(nf.names))
return found.difference(BUILTIN_SYMBOLS)
def alter_parameter_formula(parameter, old, new):
"""Replace the `old` part with `new` in the formula field and return
the parameter itself.
"""
if hasattr(parameter, "formula"):
parameter.formula = re.sub(r"\b{}\b".format(old), new, parameter.formula)
return parameter