Source code for bw2io.importers.base
# -*- coding: utf-8 -*-
from ..errors import StrategyError
from ..migrations import migrations
from ..utils import activity_hash
from ..strategies import migrate_datasets, migrate_exchanges
from ..unlinked_data import UnlinkedData, unlinked_data
from datetime import datetime
from time import time
import functools
import warnings
[docs]
class ImportBase(object):
"""Base class for format-specific importers.
Defines workflow for applying strategies."""
def __init__(self, *args, **kwargs):
raise NotImplemented("This class should be subclassed")
def __iter__(self):
for ds in self.data:
yield ds
[docs]
def apply_strategy(self, strategy, verbose=True):
"""Apply ``strategy`` transform to ``self.data``.
Adds strategy name to ``self.applied_strategies``. If ``StrategyError`` is raised, print error message, but don't raise error.
.. note:: Strategies should not partially modify data before raising ``StrategyError``.
Args:
*strategy* (callable)
Returns:
Nothing, but modifies ``self.data``, and strategy to ``self.applied_strategies``.
"""
if not hasattr(self, "applied_strategies"):
self.applied_strategies = []
try:
func_name = strategy.__name__
except AttributeError: # Curried function
func_name = strategy.func.__name__
if verbose:
print("Applying strategy: {}".format(func_name))
try:
self.data = strategy(self.data)
self.applied_strategies.append(func_name)
except StrategyError as err:
print("Couldn't apply strategy {}:\n\t{}".format(func_name, err))
[docs]
def apply_strategies(self, strategies=None, verbose=True):
"""Apply a list of strategies.
Uses the default list ``self.strategies`` if ``strategies`` is ``None``.
Args:
*strategies* (list, optional): List of strategies to apply. Defaults to ``self.strategies``.
Returns:
Nothings, but modifies ``self.data``, and adds each strategy to ``self.applied_strategies``.
"""
start = time()
func_list = self.strategies if strategies is None else strategies
total = len(func_list)
for i, func in enumerate(func_list):
self.apply_strategy(func, verbose)
if hasattr(self, "signal") and hasattr(self.signal, "emit"):
self.signal.emit(i + 1, total)
if verbose:
print(
"Applied {} strategies in {:.2f} seconds".format(
len(func_list), time() - start
)
)
@property
[docs]
def unlinked(self):
"""Iterate through unique unlinked exchanges.
Uniqueness is determined by ``activity_hash``."""
seen = set()
for ds in self.data:
for exc in ds.get("exchanges", []):
if not exc.get("input"):
ah = activity_hash(exc)
if ah in seen:
continue
else:
seen.add(ah)
yield exc
[docs]
def write_unlinked(self, name):
"""Write all data to an ``UnlikedData`` data store (not a ``Database``!)"""
with warnings.catch_warnings():
warnings.simplefilter("ignore")
udb = UnlinkedData(name + " " + self.__class__.__name__)
if udb.name not in unlinked_data:
udb.register()
unlinked_data[udb.name] = {
"strategies": getattr(self, "applied_strategies", []),
"modified": datetime.now().isoformat(),
"kind": "database",
}
unlinked_data.flush()
udb.write(self.data)
print("Saved unlinked data: {}".format(udb.name))
[docs]
def _migrate_datasets(self, migration_name):
assert migration_name in migrations, "Can't find migration {}".format(
migration_name
)
self.apply_strategy(
functools.partial(migrate_datasets, migration=migration_name)
)
[docs]
def _migrate_exchanges(self, migration_name):
assert migration_name in migrations, "Can't find migration {}".format(
migration_name
)
self.apply_strategy(
functools.partial(migrate_exchanges, migration=migration_name)
)