Source code for bw2io.importers.base

import functools
import warnings
from datetime import datetime
from time import time

from ..errors import StrategyError
from ..migrations import migrations
from ..strategies import migrate_datasets, migrate_exchanges
from ..unlinked_data import UnlinkedData, unlinked_data
from ..utils import activity_hash


[docs] class ImportBase(object): """ Base class for format-specific importers. Defines workflow for applying strategies. """ def __init__(self, *args, **kwargs): """ Initialize the ImportBase object. Parameters ---------- *args : Variable length argument list. **kwargs : Arbitrary keyword arguments. Raises ------ NotImplemented : This class should be subclassed. """ raise NotImplemented("This class should be subclassed") def __iter__(self): """ Iterate over the data and yield the current data. Yields ------ ds : The current data being iterated over. """ for ds in self.data: yield ds
[docs] def apply_strategy(self, strategy, verbose=True): """ Apply the specified strategy transform to the importer's data. This method applies a given strategy to the importer's data and logs the applied strategy's name to `self.applied_strategies`. If the strategy raises a `StrategyError`, the error message is printed but not raised. Parameters ---------- strategy : callable The strategy function to apply to the importer's data. verbose : bool, optional If True, print a message indicating which strategy is being applied. Defaults to True. Returns ------- None Modifies the importer's data in place. Raises ------ None If the strategy raises a `StrategyError`, the error message is printed but not raised. Notes ----- Strategies should not partially modify data before raising a `StrategyError`. """ if not hasattr(self, "applied_strategies"): self.applied_strategies = [] try: func_name = strategy.__name__ except AttributeError: # Curried function func_name = strategy.func.__name__ if verbose: print("Applying strategy: {}".format(func_name)) try: self.data = strategy(self.data) self.applied_strategies.append(func_name) except StrategyError as err: print("Couldn't apply strategy {}:\n\t{}".format(func_name, err))
[docs] def apply_strategies(self, strategies=None, verbose=True): """ Apply a list of strategies to the importer's data. This method applies a list of given strategies to the importer's data and logs the applied strategies' names to `self.applied_strategies`. If no list of strategies is provided, it uses `self.strategies`. Parameters ---------- strategies : list, optional List of strategies to apply. Defaults to `self.strategies`. verbose : bool, optional If True, print a message indicating which strategy is being applied. Defaults to True. Returns ------- None Modifies the importer's data in place. Notes ----- The method `apply_strategy` is called to apply each individual strategy to the importer's data. Strategies that partially modify data before raising a `StrategyError` should be avoided. """ start = time() func_list = self.strategies if strategies is None else strategies total = len(func_list) for i, func in enumerate(func_list): self.apply_strategy(func, verbose) if hasattr(self, "signal") and hasattr(self.signal, "emit"): self.signal.emit(i + 1, total) if verbose: print( "Applied {} strategies in {:.2f} seconds".format( len(func_list), time() - start ) )
@property
[docs] def unlinked(self): """ Iterate through unique unlinked exchanges. Uniqueness is determined by `activity_hash`. Yields ------ exc : The unlinked exchange that is currently being iterated over. """ seen = set() for ds in self.data: for exc in ds.get("exchanges", []): if not exc.get("input"): ah = activity_hash(exc) if ah in seen: continue else: seen.add(ah) yield exc
[docs] def write_unlinked(self, name): """ Write all data to an `UnlinkedData` data store. This method writes all of the importer's data to an `UnlinkedData` data store with the specified `name`. The `UnlinkedData` object is created with the importer's class name appended to the `name`. The applied strategies are logged to the `unlinked_data` dictionary. Parameters ---------- name : str The name of the `UnlinkedData` data store to be written. """ with warnings.catch_warnings(): warnings.simplefilter("ignore") udb = UnlinkedData(name + " " + self.__class__.__name__) if udb.name not in unlinked_data: udb.register() unlinked_data[udb.name] = { "strategies": getattr(self, "applied_strategies", []), "modified": datetime.now().isoformat(), "kind": "database", } unlinked_data.flush() udb.write(self.data) print("Saved unlinked data: {}".format(udb.name))
[docs] def _migrate_datasets(self, migration_name): """ Apply a migration function to the importer's datasets. This method applies a given migration function to the importer's datasets, using `migrate_datasets`. The migration function must be specified by name in the `migrations` dictionary. Parameters ---------- migration_name : str The name of the migration function to apply to the importer's datasets. Returns ------- None Modifies the importer's data in place. Raises ------ AssertionError If the specified migration function is not in the `migrations` dictionary. """ assert migration_name in migrations, "Can't find migration {}".format( migration_name ) self.apply_strategy( functools.partial(migrate_datasets, migration=migration_name) )
[docs] def _migrate_exchanges(self, migration_name): """ Apply a migration function to the importer's exchanges. This method applies a given migration function to the importer's exchanges, using `migrate_exchanges`. The migration function must be specified by name in the `migrations` dictionary. Parameters ---------- migration_name : str The name of the migration function to apply to the importer's exchanges. Returns ------- None Modifies the importer's data in place. Raises ------ AssertionError If the specified migration function is not in the `migrations` dictionary. """ assert migration_name in migrations, "Can't find migration {}".format( migration_name ) self.apply_strategy( functools.partial(migrate_exchanges, migration=migration_name) )