import functools
import warnings
from time import time
from bw2data import Database, config
from ..extractors import CSVExtractor, ExcelExtractor
from ..strategies import (
assign_only_product_as_production,
convert_activity_parameters_to_list,
convert_uncertainty_types_to_integers,
csv_add_missing_exchanges_section,
csv_drop_unknown,
csv_numerize,
csv_restore_booleans,
csv_restore_temporal_distributions,
csv_restore_tuples,
drop_falsey_uncertainty_fields_but_keep_zeros,
link_iterable_by_fields,
link_technosphere_by_activity_hash,
normalize_units,
set_code_by_activity_hash,
strip_biosphere_exc_locations,
)
from .base_lci import LCIImporter
[docs]
is_empty_line = lambda line: not line or not any(line)
[docs]
remove_empty = lambda dct: {k: v for k, v in dct.items() if (v or v == 0)}
[docs]
def valid_first_cell(sheet, data):
"""Return boolean if first cell in worksheet is not ``skip``."""
try:
return hasattr(data[0][0], "lower") and data[0][0].lower() != "skip"
except:
warnings.warn("Invalid first cell (A1) in worksheet {}".format(sheet))
return False
[docs]
class ExcelImporter(LCIImporter):
"""
Generic Excel importer.
Excel spreadsheet should follow the following format.
Note that this is an illustrative example, all sections (e.g., parameters) and fields (e.g., exchange columns) are not required.
+-------------------------+-------------------------+------------------------------+
| Database | <name of database> | |
+-------------------------+-------------------------+------------------------------+
| <database field name> | <database field value> | |
+-------------------------+-------------------------+------------------------------+
| Project parameters | | |
+-------------------------+-------------------------+------------------------------+
| name | amount | <other parameter field name> |
+-------------------------+-------------------------+------------------------------+
| <value> | <value> | <parameter field value> |
+-------------------------+-------------------------+------------------------------+
| Database parameters | | |
+-------------------------+-------------------------+------------------------------+
| name | amount | <other parameter field name> |
+-------------------------+-------------------------+------------------------------+
| <parameter field value> | <parameter field value> | <parameter field value> |
+-------------------------+-------------------------+------------------------------+
| Activity | <name of activity> | |
+-------------------------+-------------------------+------------------------------+
| <activity field name> | <activity field value> | |
+-------------------------+-------------------------+------------------------------+
| Parameters | <parameter group> | |
+-------------------------+-------------------------+------------------------------+
| name | amount | <other parameter field name> |
+-------------------------+-------------------------+------------------------------+
| <parameter field value> | <parameter field value> | <parameter field value> |
+-------------------------+-------------------------+------------------------------+
| Exchanges | | |
+-------------------------+-------------------------+------------------------------+
| name | amount | <other exchange field name> |
+-------------------------+-------------------------+------------------------------+
| <exchange field value> | <exchange field value> | <exchange field value> |
+-------------------------+-------------------------+------------------------------+
Neither project parameters, parameters, nor exchanges for each activity are required.
Blank lines are allowed anywhere, they won't have any effect. The end of the file is determined in the Excel file based on the content.
The very first cell of the worksheet must not be empty (A1), otherwise the worksheet won't be imported.
In general, data is imported without modification. However, the following transformations are applied:
* Numbers are translated from text into actual numbers.
* Tuples, separated in the cell by the ``::`` string, are reconstructed.
* ``True`` and ``False`` are transformed to boolean values.
* Fields with the value ``(Unknown)`` are dropped.
"""
def __init__(self, filepath, sheet_name=None):
[docs]
self.strategies = [
csv_restore_tuples,
csv_restore_booleans,
csv_numerize,
csv_drop_unknown,
csv_restore_temporal_distributions,
csv_add_missing_exchanges_section,
normalize_units,
strip_biosphere_exc_locations,
set_code_by_activity_hash,
functools.partial(
link_iterable_by_fields,
other=Database(config.biosphere),
edge_kinds=["biosphere"],
),
assign_only_product_as_production,
link_technosphere_by_activity_hash,
drop_falsey_uncertainty_fields_but_keep_zeros,
convert_uncertainty_types_to_integers,
convert_activity_parameters_to_list,
]
start = time()
data = self.extractor.extract(filepath, sheet_name=sheet_name)
if self.format != "CSV":
data = [(x, y) for x, y in data if valid_first_cell(x, y)]
else:
# CSV exporter can't extract multiple worksheets by definition
data = [data]
print(
"Extracted {} worksheets in {:.2f} seconds".format(
len(data), time() - start
)
)
if data and any(line for line in data):
self.db_name, self.metadata = self.get_database(data)
self.project_parameters = self.get_project_parameters(data)
self.database_parameters = self.get_database_parameters(data)
self.data = self.process_activities(data)
else:
warnings.warn("No data in workbook found")
[docs]
def get_database(self, data):
results = []
found = False
for sn, ws in data:
for index, line in enumerate(ws):
if line and hasattr(line[0], "lower") and line[0].lower() == "database":
if found:
raise ValueError("Multiple `database` sections found")
results.append(self.get_metadata_section(sn, ws, index))
found = True
if not results:
raise ValueError("No `database` section found")
return results[0]
[docs]
def get_database_parameters(self, data):
parameters, found = [], False
for sn, ws in data:
for index, line in enumerate(ws):
if (
line
and hasattr(line[0], "lower")
and line[0].strip().lower() == "database parameters"
and (len(line) == 1 or not any(line[1:]))
):
parameters.extend(self.get_labelled_section(sn, ws, index + 1))
found = True
if not found:
return None
else:
return parameters
[docs]
def get_project_parameters(self, data):
"""Extract project parameters (variables and formulas).
Project parameters are a section that starts with a line with the string "project parameters" (case-insensitive) in the first cell, and ends with a blank line. There can be multiple project parameter sections.
"""
parameters, found = [], False
for sn, ws in data:
for index, line in enumerate(ws):
if (
line
and hasattr(line[0], "lower")
and line[0].strip().lower() == "project parameters"
):
parameters.extend(self.get_labelled_section(sn, ws, index + 1))
found = True
if not found:
return None
else:
return parameters
[docs]
def get_labelled_section(self, sn, ws, index=0, transform=True):
"""Turn a list of rows into a list of dictionaries.
The first line of ``ws`` is the column labels. All subsequent rows are the data values. Missing columns are dropped.
``transform`` is a boolean: perform CSV transformation functions like ``csv_restore_tuples``.
"""
data = []
ws = ws[index:]
columns = ws[0]
# Columns can be ['foo', '', 'bar', ''] - find last existing value.
# Can't just test for boolean-like behaviour, unfortunately
for index, elem in enumerate(columns[-1:0:-1]):
if elem:
break
if index:
columns = columns[:-index]
assert columns, "No label columns found"
assert all(columns), "Missing column labels: {}:{}\n{}".format(
sn, index, columns
)
ws = ws[1:]
for row in ws:
if is_empty_line(row):
break
data.append({x: y for x, y in zip(columns, row)})
if transform:
data = csv_restore_tuples(
csv_restore_booleans(csv_numerize(csv_drop_unknown(data)))
)
return [remove_empty(o) for o in data]
[docs]
def process_activities(self, data):
"""Take list of `(sheet names, raw data)` and process it."""
new_activity = lambda x: (
len(x)
and isinstance(x[0], str)
and len(x) > 1
and isinstance(x[1], str)
and x[0].strip().lower() == "activity"
)
def cut_worksheet(obj):
if isinstance(obj[0][0], str) and obj[0][0].lower() == "cutoff":
try:
cutoff = int(obj[0][1])
except:
raise ValueError("Can't understand cutoff index")
return [x[:cutoff] for x in obj]
else:
return obj
results = []
for sn, ws in data:
ws = cut_worksheet(ws)
if not any(line for line in ws):
warnings.warn("All data cutoff in worksheet {}".format(sn))
continue
for index, line in enumerate(ws):
if new_activity(line):
results.append(self.get_activity(sn, ws[index:]))
return results
[docs]
def write_activity_parameters(self, data=None, delete_existing=True):
self._write_activity_parameters(
self._prepare_activity_parameters(data, delete_existing)
)
[docs]
def write_database_parameters(self, activate_parameters=True, delete_existing=True):
"""Same as base ``write_database_parameters`` method, but ``activate_parameters`` is True by default."""
super(ExcelImporter, self).write_database_parameters(
activate_parameters, delete_existing
)
[docs]
def write_database(self, **kwargs):
"""Same as base ``write_database`` method, but ``activate_parameters`` is True by default."""
kwargs["activate_parameters"] = kwargs.get("activate_parameters", True)
super(ExcelImporter, self).write_database(**kwargs)
[docs]
def get_activity(self, sn, ws):
activity_end = lambda x: (
len(x)
and isinstance(x[0], str)
and x[0].strip().lower() in ("activity", "database", "project parameters")
)
exc_section = lambda x: (
isinstance(x[0], str) and x[0].strip().lower() == "exchanges"
)
param_section = lambda x: (
isinstance(x[0], str) and x[0].strip().lower() == "parameters"
)
end = None
found_next_section = False
for end, row in enumerate(ws[1:]):
if activity_end(row):
found_next_section = True
break
# Started from row 1, not row 0
end += 1
# Worksheet ends; `end` is too small by 1
if not found_next_section:
end += 1
ws = [row for row in ws[:end] if not is_empty_line(row)]
param_index = exc_index = None
for index, row in enumerate(ws):
if param_section(row):
if param_index is not None:
raise ValueError("Multiple parameter sections in activity")
param_index = index
elif exc_section(row):
if exc_index is not None:
raise ValueError("Multiple exchanges sections in activity")
exc_index = index
if param_index is None:
if exc_index is None:
metadata, parameters, exchanges = ws, None, None
else:
metadata = ws[:exc_index]
parameters = None
exchanges = ws[exc_index + 1 :]
elif exc_index is None:
metadata = ws[:param_index]
parameters = ws[param_index:]
exchanges = []
else:
metadata = ws[:param_index]
parameters = ws[param_index:exc_index]
exchanges = ws[exc_index + 1 :]
name, data = self.get_metadata_section(sn, metadata, transform=False)
data["name"] = name
if parameters and len(parameters) > 1:
try:
group_name = parameters[0][1]
assert group_name
except:
group_name = None
data["parameters"] = {
e.pop("name"): e for e in self.get_labelled_section(sn, parameters[1:])
}
if group_name:
for ds in data["parameters"].values():
ds["group"] = group_name
if exchanges:
data["exchanges"] = self.get_labelled_section(
sn, exchanges, transform=False
)
else:
data["exchanges"] = []
data["worksheet name"] = sn
data["database"] = self.db_name
return data
[docs]
class CSVImporter(ExcelImporter):
"""Generic CSV importer"""