# -*- coding: utf-8 -*-
from __future__ import print_function, unicode_literals
from eight import *
from . import sqlite3_lci_db
from ... import databases, mapping, geomapping, config
from ...errors import ValidityError
from ...project import writable_project
from ...proxies import ActivityProxyBase, ExchangeProxyBase
from ...search import IndexManager
from ...utils import get_activity
from .schema import ActivityDataset, ExchangeDataset
from .utils import dict_as_activitydataset, dict_as_exchangedataset
from future.utils import implements_iterator
import copy
import warnings
import uuid
try:
from collections.abc import Iterable
except ImportError:
from collections import Iterable
[docs]
class Exchanges(Iterable):
"""Iterator for exchanges with some additional methods.
This is not a generator; ``next()`` is not supported. Everything time you start to iterate over the object you get a new list starting from the beginning. However, to get a single item you can do ``next(iter(foo))``.
Ordering is by database row id.
Supports the following:
.. code-block:: python
exchanges = activity.exchanges()
# Iterate
for exc in exchanges:
pass
# Length
len(exchanges)
# Delete all
exchanges.delete()
"""
def __init__(self, key, kinds=None, reverse=False):
self._key, self._kinds = key, kinds
if reverse:
self._args = [
ExchangeDataset.input_database == self._key[0],
ExchangeDataset.input_code == self._key[1],
# No production exchanges - these two clauses have to be together,
# not individually
ExchangeDataset.output_database != self._key[0] &
ExchangeDataset.output_code != self._key[1],
]
else:
self._args = [
ExchangeDataset.output_database == self._key[0],
ExchangeDataset.output_code == self._key[1],
]
if self._kinds:
self._args.append(ExchangeDataset.type << self._kinds)
[docs]
def filter(self, expr):
self._args.append(expr)
@writable_project
[docs]
def delete(self):
databases.set_dirty(self._key[0])
ExchangeDataset.delete().where(*self._args).execute()
[docs]
def _get_queryset(self):
return ExchangeDataset.select().where(*self._args).order_by(ExchangeDataset.id)
def __iter__(self):
for obj in self._get_queryset():
yield Exchange(obj)
def __len__(self):
return self._get_queryset().count()
[docs]
class Activity(ActivityProxyBase):
def __init__(self, document=None, **kwargs):
"""Create an `Activity` proxy object.
If this is a new activity, can pass `kwargs`.
If the activity exists in the database, `document` should be an `ActivityDataset`."""
if document is None:
self._document = ActivityDataset()
self._data = kwargs
else:
self._document = document
self._data = self._document.data
self._data['code'] = self._document.code
self._data['database'] = self._document.database
def __setitem__(self, key, value):
if key == 'code' and 'code' in self._data:
self._change_code(value)
print("Successfully switched activity dataset to new code `{}`".format(value))
elif key == 'database' and 'database' in self._data:
self._change_database(value)
print("Successfully switch activity dataset to database `{}`".format(value))
else:
super(Activity, self).__setitem__(key, value)
def __getitem__(self, key):
if key == 0:
return self["database"]
elif key == 1:
return self["code"]
elif key in self._data:
return self._data[key]
try:
rp = self.rp_exchange()
except ValueError:
raise KeyError
if key in rp.get('classifications', []):
return rp['classifications'][key]
if key in rp.get('properties', []):
return rp['properties'][key]
raise KeyError
@property
[docs]
def key(self):
return (self.get("database"), self.get("code"))
@writable_project
[docs]
def delete(self):
from ... import Database
from ...parameters import ActivityParameter, ParameterizedExchange
try:
ap = ActivityParameter.get(database=self[0], code=self[1])
ParameterizedExchange.delete().where(
ParameterizedExchange.group == ap.group).execute()
ActivityParameter.delete().where(
ActivityParameter.database == self[0],
ActivityParameter.code == self[1]
).execute()
except ActivityParameter.DoesNotExist:
pass
IndexManager(Database(self['database']).filename).delete_dataset(self._data)
self.exchanges().delete()
self._document.delete_instance()
self = None
@writable_project
[docs]
def save(self):
from ... import Database
if not self.valid():
raise ValidityError("This activity can't be saved for the "
"following reasons\n\t* " + \
"\n\t* ".join(self.valid(why=True)[1])
)
databases.set_dirty(self['database'])
for key, value in dict_as_activitydataset(self._data).items():
setattr(self._document, key, value)
self._document.save()
if self.key not in mapping:
mapping.add([self.key])
if self.get('location') and self['location'] not in geomapping:
geomapping.add([self['location']])
if databases[self['database']].get('searchable', True):
IndexManager(Database(self['database']).filename).update_dataset(self._data)
[docs]
def _change_code(self, new_code):
if self['code'] == new_code:
return
if ActivityDataset.select().where(
ActivityDataset.database == self['database'],
ActivityDataset.code == new_code
).count():
raise ValueError(
"Activity database with code `{}` already exists".format(new_code)
)
with sqlite3_lci_db.atomic() as txn:
ActivityDataset.update(code=new_code).where(
ActivityDataset.database == self['database'],
ActivityDataset.code == self['code']
).execute()
ExchangeDataset.update(output_code=new_code).where(
ExchangeDataset.output_database == self['database'],
ExchangeDataset.output_code == self['code'],
).execute()
ExchangeDataset.update(input_code=new_code).where(
ExchangeDataset.input_database == self['database'],
ExchangeDataset.input_code == self['code'],
).execute()
if databases[self['database']].get('searchable'):
from ... import Database
IndexManager(Database(self['database']).filename).delete_dataset(self)
self._data['code'] = new_code
IndexManager(Database(self['database']).filename).add_datasets([self])
else:
self._data['code'] = new_code
[docs]
def _change_database(self, new_database):
if self['database'] == new_database:
return
if new_database not in databases:
raise ValueError("Database {} does not exist".format(new_database))
with sqlite3_lci_db.atomic() as txn:
ActivityDataset.update(database=new_database).where(
ActivityDataset.database == self['database'],
ActivityDataset.code == self['code']
).execute()
ExchangeDataset.update(output_database=new_database).where(
ExchangeDataset.output_database == self['database'],
ExchangeDataset.output_code == self['code'],
).execute()
ExchangeDataset.update(input_database=new_database).where(
ExchangeDataset.input_database == self['database'],
ExchangeDataset.input_code == self['code'],
).execute()
if databases[self['database']].get('searchable'):
from ... import Database
IndexManager(Database(self['database']).filename).delete_dataset(self)
self._data['database'] = new_database
IndexManager(Database(self['database']).filename).add_datasets([self])
else:
self._data['database'] = new_database
[docs]
def exchanges(self):
return Exchanges(self.key)
[docs]
def technosphere(self, include_substitution=True):
return Exchanges(
self.key,
kinds=(("technosphere", "substitution")
if include_substitution
else ("technosphere",))
)
[docs]
def biosphere(self):
return Exchanges(
self.key,
kinds=("biosphere",),
)
[docs]
def production(self):
return Exchanges(
self.key,
kinds=("production",),
)
[docs]
def substitution(self):
return Exchanges(
self.key,
kinds=("substitution",),
)
[docs]
def upstream(self, kinds=("technosphere",)):
return Exchanges(
self.key,
kinds=kinds,
reverse=True
)
[docs]
def rp_exchange(self):
"""Return an ``Exchange`` object corresponding to the reference production. Uses the following in order:
* The ``production`` exchange, if only one is present
* The ``production`` exchange with the same name as the activity ``reference product``.
Raises ``ValueError`` if no suitable exchange is found."""
candidates = list(self.production())
if len(candidates) == 1:
return candidates[0]
candidates2 = [exc for exc in candidates if exc.input._data.get('name') == self._data.get('reference product')]
if len(candidates2) == 1:
return candidates2[0]
else:
raise ValueError("Can't find a single reference product exchange (found {} candidates)".format(len(candidates)))
[docs]
def new_exchange(self, **kwargs):
"""Create a new exchange linked to this activity"""
exc = Exchange()
exc.output = self.key
for key in kwargs:
exc[key] = kwargs[key]
return exc
@writable_project
[docs]
def copy(self, code=None, **kwargs):
"""Copy the activity. Returns a new `Activity`.
`code` is the new activity code; if not given, a UUID is used.
`kwargs` are additional new fields and field values, e.g. name='foo'
"""
activity = Activity()
for key, value in self.items():
activity[key] = value
for k, v in kwargs.items():
activity._data[k] = v
activity._data[u'code'] = str(code or uuid.uuid4().hex)
activity.save()
for exc in self.exchanges():
data = copy.deepcopy(exc._data)
data['output'] = activity.key
# Change `input` for production exchanges
if exc['input'] == exc['output']:
data['input'] = activity.key
ExchangeDataset.create(**dict_as_exchangedataset(data))
return activity
[docs]
class Exchange(ExchangeProxyBase):
def __init__(self, document=None, **kwargs):
"""Create an `Exchange` proxy object.
If this is a new exchange, can pass `kwargs`.
If the exchange exists in the database, `document` should be an `ExchangeDataset`."""
if document is None:
self._document = ExchangeDataset()
self._data = kwargs
else:
self._document = document
self._data = self._document.data
self._data['input'] = (self._document.input_database, self._document.input_code)
self._data['output'] = (self._document.output_database, self._document.output_code)
@writable_project
[docs]
def save(self):
if not self.valid():
raise ValidityError("This exchange can't be saved for the "
"following reasons\n\t* " + \
"\n\t* ".join(self.valid(why=True)[1])
)
databases.set_dirty(self['output'][0])
for key, value in dict_as_exchangedataset(self._data).items():
setattr(self._document, key, value)
self._document.save()
@writable_project
[docs]
def delete(self):
from ...parameters import ParameterizedExchange
ParameterizedExchange.delete().where(
ParameterizedExchange.exchange == self._document.id).execute()
self._document.delete_instance()
databases.set_dirty(self['output'][0])
self = None