Source code for bw2data.backends.iotable

# -*- coding: utf-8 -*-
from __future__ import print_function, unicode_literals
from eight import *

from ... import mapping, geomapping, config, databases
from ..peewee import SQLiteBackend
from ..peewee.schema import ActivityDataset, ExchangeDataset
from ..peewee.utils import dict_as_activitydataset
from ...utils import MAX_INT_32, TYPE_DICTIONARY
from ...errors import UnknownObject
import datetime
import itertools
import numpy as np


[docs] class IOTableBackend(SQLiteBackend): """IO tables have too much data to store each value in a database; instead, we only store the processed data in NumPy arrays. Activities will not seem to have any activities."""
[docs] backend = "iotable"
[docs] def write(self, products, exchanges, includes_production=False, **kwargs): """ Write IO data to disk in two different formats. Product data is stored in SQLite as normal activities. Exchange data is written directly to NumPy structured arrays. ``products`` is a dictionary of product datasets in the normal format. ``exchanges`` is a list of exchanges with the format ``(input code, output code, type, value)``. """ super(IOTableBackend, self).write(products, process=False) print("Starting IO table write") num_products = len(products) # Create geomapping array arr = np.zeros( (num_products, ), dtype=self.dtype_fields_geomapping + self.base_uncertainty_fields ) print("Writing geomapping") for index, row in enumerate(sorted(products.values(), key=lambda x: x.get('key'))): arr[index] = ( mapping[row['key']], geomapping[row['location'] or config.global_location], MAX_INT_32, MAX_INT_32, 0, 1, np.NaN, np.NaN, np.NaN, np.NaN, np.NaN, False ) np.save(self.filepath_geomapping(), arr, allow_pickle=False) dependents = set() print("Creating array - this will take a while...") step = 5000000 arr = np.zeros((step,), dtype=self.dtype) for index, row in enumerate(exchanges): if isinstance(row, dict): inpt = row['input'] outpt = row['output'] row_type = row['type'] unc_type = row.get('uncertainty type', 0) amount = row['amount'] loc = row.get('loc', row['amount']) scale = row.get('scale', np.NaN) shape = row.get('shape', np.NaN) minimum = row.get('minimum', np.NaN) maximum = row.get('maximum', np.NaN) else: inpt, outpt, row_type, amount = row loc, unc_type = amount, 0 scale = shape = minimum = maximum = np.NaN if index and not index % 1000000: print("On exchange number {}".format(index)) if index and not index % step: # Add another `step` rows arr = np.hstack(( arr, np.zeros((step,), dtype=self.dtype) )) dependents.add(inpt[0]) try: arr[index] = ( mapping[inpt], mapping[outpt], MAX_INT_32, MAX_INT_32, TYPE_DICTIONARY[row_type], unc_type, amount, loc, scale, shape, minimum, maximum, amount < 0 ) except KeyError: raise UnknownObject(("Exchange between {} and {} is invalid " "- one of these objects is unknown (i.e. doesn't exist " "as a process dataset)" ).format(inpt, outpt) ) if not includes_production: for index2, obj in enumerate(products): if index and not index % step: arr = np.hstack(( arr, np.zeros((step,), dtype=self.dtype) )) arr[index + index2] = ( mapping[obj], mapping[obj], MAX_INT_32, MAX_INT_32, TYPE_DICTIONARY['production'], 0, 1, 1, np.NaN, np.NaN, np.NaN, np.NaN, False ) databases[self.name]['depends'] = sorted(dependents.difference({self.name})) databases[self.name]['processed'] = datetime.datetime.now().isoformat() databases.flush() # Trim arr print("Trimming array") arr = arr[np.where(arr["row"] == MAX_INT_32)] print("Writing array") np.save(self.filepath_processed(), arr, allow_pickle=False)
[docs] def process(self): """No-op; no intermediate data to process""" return