Source code for bw2io.importers.excel

import functools
import warnings
from time import time

from bw2data import Database, config

from ..extractors import CSVExtractor, ExcelExtractor
from ..strategies import (
    assign_only_product_as_production,
    convert_activity_parameters_to_list,
    convert_uncertainty_types_to_integers,
    csv_add_missing_exchanges_section,
    csv_drop_unknown,
    csv_numerize,
    csv_restore_booleans,
    csv_restore_temporal_distributions,
    csv_restore_tuples,
    drop_falsey_uncertainty_fields_but_keep_zeros,
    link_iterable_by_fields,
    link_technosphere_by_activity_hash,
    normalize_units,
    set_code_by_activity_hash,
    strip_biosphere_exc_locations,
)
from .base_lci import LCIImporter

[docs] is_empty_line = lambda line: not line or not any(line)
[docs] remove_empty = lambda dct: {k: v for k, v in dct.items() if (v or v == 0)}
[docs] def valid_first_cell(sheet, data): """Return boolean if first cell in worksheet is not ``skip``.""" try: return hasattr(data[0][0], "lower") and data[0][0].lower() != "skip" except: warnings.warn("Invalid first cell (A1) in worksheet {}".format(sheet)) return False
[docs] class ExcelImporter(LCIImporter): """ Generic Excel importer. Excel spreadsheet should follow the following format. Note that this is an illustrative example, all sections (e.g., parameters) and fields (e.g., exchange columns) are not required. +-------------------------+-------------------------+------------------------------+ | Database | <name of database> | | +-------------------------+-------------------------+------------------------------+ | <database field name> | <database field value> | | +-------------------------+-------------------------+------------------------------+ | Project parameters | | | +-------------------------+-------------------------+------------------------------+ | name | amount | <other parameter field name> | +-------------------------+-------------------------+------------------------------+ | <value> | <value> | <parameter field value> | +-------------------------+-------------------------+------------------------------+ | Database parameters | | | +-------------------------+-------------------------+------------------------------+ | name | amount | <other parameter field name> | +-------------------------+-------------------------+------------------------------+ | <parameter field value> | <parameter field value> | <parameter field value> | +-------------------------+-------------------------+------------------------------+ | Activity | <name of activity> | | +-------------------------+-------------------------+------------------------------+ | <activity field name> | <activity field value> | | +-------------------------+-------------------------+------------------------------+ | Parameters | <parameter group> | | +-------------------------+-------------------------+------------------------------+ | name | amount | <other parameter field name> | +-------------------------+-------------------------+------------------------------+ | <parameter field value> | <parameter field value> | <parameter field value> | +-------------------------+-------------------------+------------------------------+ | Exchanges | | | +-------------------------+-------------------------+------------------------------+ | name | amount | <other exchange field name> | +-------------------------+-------------------------+------------------------------+ | <exchange field value> | <exchange field value> | <exchange field value> | +-------------------------+-------------------------+------------------------------+ Neither project parameters, parameters, nor exchanges for each activity are required. Blank lines are allowed anywhere, they won't have any effect. The end of the file is determined in the Excel file based on the content. The very first cell of the worksheet must not be empty (A1), otherwise the worksheet won't be imported. In general, data is imported without modification. However, the following transformations are applied: * Numbers are translated from text into actual numbers. * Tuples, separated in the cell by the ``::`` string, are reconstructed. * ``True`` and ``False`` are transformed to boolean values. * Fields with the value ``(Unknown)`` are dropped. """
[docs] format = "Excel"
[docs] extractor = ExcelExtractor
def __init__(self, filepath, sheet_name=None):
[docs] self.strategies = [ csv_restore_tuples, csv_restore_booleans, csv_numerize, csv_drop_unknown, csv_restore_temporal_distributions, csv_add_missing_exchanges_section, normalize_units, strip_biosphere_exc_locations, set_code_by_activity_hash, functools.partial( link_iterable_by_fields, other=Database(config.biosphere), edge_kinds=["biosphere"], ), assign_only_product_as_production, link_technosphere_by_activity_hash, drop_falsey_uncertainty_fields_but_keep_zeros, convert_uncertainty_types_to_integers, convert_activity_parameters_to_list, ]
start = time() data = self.extractor.extract(filepath, sheet_name=sheet_name) if self.format != "CSV": data = [(x, y) for x, y in data if valid_first_cell(x, y)] else: # CSV exporter can't extract multiple worksheets by definition data = [data] print( "Extracted {} worksheets in {:.2f} seconds".format( len(data), time() - start ) ) if data and any(line for line in data): self.db_name, self.metadata = self.get_database(data) self.project_parameters = self.get_project_parameters(data) self.database_parameters = self.get_database_parameters(data) self.data = self.process_activities(data) else: warnings.warn("No data in workbook found")
[docs] def get_database(self, data): results = [] found = False for sn, ws in data: for index, line in enumerate(ws): if line and hasattr(line[0], "lower") and line[0].lower() == "database": if found: raise ValueError("Multiple `database` sections found") results.append(self.get_metadata_section(sn, ws, index)) found = True if not results: raise ValueError("No `database` section found") return results[0]
[docs] def get_database_parameters(self, data): parameters, found = [], False for sn, ws in data: for index, line in enumerate(ws): if ( line and hasattr(line[0], "lower") and line[0].strip().lower() == "database parameters" and (len(line) == 1 or not any(line[1:])) ): parameters.extend(self.get_labelled_section(sn, ws, index + 1)) found = True if not found: return None else: return parameters
[docs] def get_project_parameters(self, data): """Extract project parameters (variables and formulas). Project parameters are a section that starts with a line with the string "project parameters" (case-insensitive) in the first cell, and ends with a blank line. There can be multiple project parameter sections. """ parameters, found = [], False for sn, ws in data: for index, line in enumerate(ws): if ( line and hasattr(line[0], "lower") and line[0].strip().lower() == "project parameters" ): parameters.extend(self.get_labelled_section(sn, ws, index + 1)) found = True if not found: return None else: return parameters
[docs] def get_labelled_section(self, sn, ws, index=0, transform=True): """Turn a list of rows into a list of dictionaries. The first line of ``ws`` is the column labels. All subsequent rows are the data values. Missing columns are dropped. ``transform`` is a boolean: perform CSV transformation functions like ``csv_restore_tuples``. """ data = [] ws = ws[index:] columns = ws[0] # Columns can be ['foo', '', 'bar', ''] - find last existing value. # Can't just test for boolean-like behaviour, unfortunately for index, elem in enumerate(columns[-1:0:-1]): if elem: break if index: columns = columns[:-index] assert columns, "No label columns found" assert all(columns), "Missing column labels: {}:{}\n{}".format( sn, index, columns ) ws = ws[1:] for row in ws: if is_empty_line(row): break data.append({x: y for x, y in zip(columns, row)}) if transform: data = csv_restore_tuples( csv_restore_booleans(csv_numerize(csv_drop_unknown(data))) ) return [remove_empty(o) for o in data]
[docs] def get_metadata_section(self, sn, ws, index=0, transform=True): data = {} ws = ws[index:] name = ws[0][1] assert name, "Must provide valid name for metadata section (got '{}')".format( name ) for row in ws[1:]: if is_empty_line(row): break data[row[0]] = row[1] if transform: # Only need first element data = csv_restore_tuples( csv_restore_booleans(csv_numerize(csv_drop_unknown([data]))) )[0] return name, data
[docs] def process_activities(self, data): """Take list of `(sheet names, raw data)` and process it.""" new_activity = lambda x: ( len(x) and isinstance(x[0], str) and len(x) > 1 and isinstance(x[1], str) and x[0].strip().lower() == "activity" ) def cut_worksheet(obj): if isinstance(obj[0][0], str) and obj[0][0].lower() == "cutoff": try: cutoff = int(obj[0][1]) except: raise ValueError("Can't understand cutoff index") return [x[:cutoff] for x in obj] else: return obj results = [] for sn, ws in data: ws = cut_worksheet(ws) if not any(line for line in ws): warnings.warn("All data cutoff in worksheet {}".format(sn)) continue for index, line in enumerate(ws): if new_activity(line): results.append(self.get_activity(sn, ws[index:])) return results
[docs] def write_activity_parameters(self, data=None, delete_existing=True): self._write_activity_parameters( self._prepare_activity_parameters(data, delete_existing) )
[docs] def write_database_parameters(self, activate_parameters=True, delete_existing=True): """Same as base ``write_database_parameters`` method, but ``activate_parameters`` is True by default.""" super(ExcelImporter, self).write_database_parameters( activate_parameters, delete_existing )
[docs] def write_database(self, **kwargs): """Same as base ``write_database`` method, but ``activate_parameters`` is True by default.""" kwargs["activate_parameters"] = kwargs.get("activate_parameters", True) super(ExcelImporter, self).write_database(**kwargs)
[docs] def get_activity(self, sn, ws): activity_end = lambda x: ( len(x) and isinstance(x[0], str) and x[0].strip().lower() in ("activity", "database", "project parameters") ) exc_section = lambda x: ( isinstance(x[0], str) and x[0].strip().lower() == "exchanges" ) param_section = lambda x: ( isinstance(x[0], str) and x[0].strip().lower() == "parameters" ) end = None found_next_section = False for end, row in enumerate(ws[1:]): if activity_end(row): found_next_section = True break # Started from row 1, not row 0 end += 1 # Worksheet ends; `end` is too small by 1 if not found_next_section: end += 1 ws = [row for row in ws[:end] if not is_empty_line(row)] param_index = exc_index = None for index, row in enumerate(ws): if param_section(row): if param_index is not None: raise ValueError("Multiple parameter sections in activity") param_index = index elif exc_section(row): if exc_index is not None: raise ValueError("Multiple exchanges sections in activity") exc_index = index if param_index is None: if exc_index is None: metadata, parameters, exchanges = ws, None, None else: metadata = ws[:exc_index] parameters = None exchanges = ws[exc_index + 1 :] elif exc_index is None: metadata = ws[:param_index] parameters = ws[param_index:] exchanges = [] else: metadata = ws[:param_index] parameters = ws[param_index:exc_index] exchanges = ws[exc_index + 1 :] name, data = self.get_metadata_section(sn, metadata, transform=False) data["name"] = name if parameters and len(parameters) > 1: try: group_name = parameters[0][1] assert group_name except: group_name = None data["parameters"] = { e.pop("name"): e for e in self.get_labelled_section(sn, parameters[1:]) } if group_name: for ds in data["parameters"].values(): ds["group"] = group_name if exchanges: data["exchanges"] = self.get_labelled_section( sn, exchanges, transform=False ) else: data["exchanges"] = [] data["worksheet name"] = sn data["database"] = self.db_name return data
[docs] class CSVImporter(ExcelImporter): """Generic CSV importer"""
[docs] format = "CSV"
[docs] extractor = CSVExtractor