Source code for bw2io.extractors.simapro_csv
# -*- coding: utf-8 -*-
from ..compatibility import SIMAPRO_BIOSPHERE
from ..strategies.simapro import normalize_simapro_formulae
from bw2data.logs import get_io_logger, close_log
from bw2parameters import ParameterSet
from numbers import Number
from stats_arrays import *
import csv
import math
import os
import re
import uuid
[docs]
SIMAPRO_TECHNOSPHERE = {
"Avoided products",
"Electricity/heat",
"Materials/fuels",
"Waste to treatment",
}
[docs]
SIMAPRO_END_OF_DATASETS = {
"Database Calculated parameters",
"Database Input parameters",
"Literature reference",
"Project Input parameters",
"Project Calculated parameters",
"Quantities",
"Units",
}
[docs]
def to_number(obj):
try:
return float(obj.replace(",", ".").strip())
except (ValueError, SyntaxError):
# Sometimes allocation or ref product specific as percentage
if "%" in obj:
return float(obj.replace("%", "").strip()) / 100.0
try:
# Eval for simple expressions like "1/2"
return float(eval(obj.replace(",", ".").strip()))
except NameError:
# Formula with a variable which isn't in scope - raises NameError
return obj
except SyntaxError:
# Unit string like "ha a" raises a syntax error when evaled
return obj
# \x7f if ascii delete - where does it come from?
[docs]
strip_whitespace_and_delete = (
lambda obj: obj.replace("\x7f", "").strip() if isinstance(obj, str) else obj
)
[docs]
lowercase_expression = (
"(?:" # Don't capture this group
"^" # Match the beginning of the string
"|" # Or
"[^a-zA-Z_])" # Anything other than a letter or underscore. SimaPro is limited to ASCII characters
"(?P<variable>{})" # The variable name string will be substituted here
"(?:[^a-zA-Z_]|$)" # Match anything other than a letter or underscore, or the end of the line
)
[docs]
def replace_with_lowercase(string, names):
"""Replace all occurrences of elements of ``names`` in ``string`` with their lowercase equivalents.
``names`` is a list of variable name strings that should already all be lowercase.
Returns a modified ``string``."""
for name in names:
expression = lowercase_expression.format(name)
for result in re.findall(expression, string, re.IGNORECASE):
if result != name:
string = string.replace(result, result.lower())
return string
[docs]
class SimaProCSVExtractor(object):
@classmethod
[docs]
def extract(cls, filepath, delimiter=";", name=None, encoding="cp1252"):
assert os.path.exists(filepath), "Can't find file %s" % filepath
log, logfile = get_io_logger("SimaPro-extractor")
log.info(INTRODUCTION % (filepath, repr(delimiter), name,))
with open(filepath, "r", encoding=encoding) as csv_file:
reader = csv.reader(csv_file, delimiter=delimiter)
lines = [
[strip_whitespace_and_delete(obj) for obj in line] for line in reader
]
# Check if valid SimaPro file
assert (
"SimaPro" in lines[0][0] or "CSV separator" in lines[0][0]
), "File is not valid SimaPro export"
project_name = name or cls.get_project_name(lines)
datasets = []
project_metadata = cls.get_project_metadata(lines)
global_parameters = cls.get_global_parameters(lines, project_metadata)
index = cls.get_next_process_index(lines, 0)
while True:
try:
ds, index = cls.read_data_set(
lines,
index,
project_name,
filepath,
global_parameters,
project_metadata,
)
datasets.append(ds)
index = cls.get_next_process_index(lines, index)
except EndOfDatasets:
break
close_log(log)
return datasets, global_parameters, project_metadata
@classmethod
[docs]
def get_next_process_index(cls, data, index):
while True:
try:
if data[index] and data[index][0] in SIMAPRO_END_OF_DATASETS:
raise EndOfDatasets
elif data[index] and data[index][0] == "Process":
return index + 1
except IndexError:
# File ends without extra metadata
raise EndOfDatasets
index += 1
@classmethod
[docs]
def get_project_metadata(cls, data):
meta = {}
for line in data:
if not line:
return meta
elif ":" not in line[0]:
continue
if not len(line) == 1:
raise ValueError("Can't understand metadata line {}".format(line))
assert line[0][0] == "{" and line[0][-1] == "}"
line = line[0][1:-1].split(":")
key, value = line[0], ":".join(line[1:])
meta[key.strip()] = value.strip()
@classmethod
[docs]
def get_global_parameters(cls, data, pm):
current, parameters = None, []
for line in data:
if not line: # Blank line, end of section
current = None
elif line[0] in {"Database Input parameters", "Project Input parameters"}:
current = "input"
elif line[0] in {
"Database Calculated parameters",
"Project Calculated parameters",
}:
current = "calculated"
elif current is None:
continue
elif current == "input":
parameters.append(cls.parse_input_parameter(line))
elif current == "calculated":
parameters.append(cls.parse_calculated_parameter(line, pm))
else:
raise ValueError("This should never happen")
# Extract name and lowercase
parameters = {obj.pop("name").lower(): obj for obj in parameters}
# Change all formula values to lowercase if referencing global parameters
for obj in parameters.values():
if "formula" in obj:
obj["formula"] = replace_with_lowercase(obj["formula"], parameters)
ParameterSet(parameters).evaluate_and_set_amount_field()
return parameters
@classmethod
[docs]
def get_project_name(cls, data):
for line in data[:25]:
if not line:
continue
elif "{Project:" in line[0]:
return line[0][9:-1].strip()
# What the holy noodly appendage
# All other metadata in English, only this term
# translated into French‽
elif "{Projet:" in line[0]:
return line[0][9:-1].strip()
@classmethod
[docs]
def invalid_uncertainty_data(cls, amount, kind, field1, field2, field3):
if kind == "Lognormal" and (not amount or field1 == "0"):
return True
@classmethod
[docs]
def create_distribution(cls, amount, kind, field1, field2, field3):
amount = to_number(amount)
if kind == "Undefined":
return {
"uncertainty type": UndefinedUncertainty.id,
"loc": amount,
"amount": amount,
}
elif cls.invalid_uncertainty_data(amount, kind, field1, field2, field3):
# TODO: Log invalid data?
return {
"uncertainty type": UndefinedUncertainty.id,
"loc": amount,
"amount": amount,
}
elif kind == "Lognormal":
return {
"uncertainty type": LognormalUncertainty.id,
"scale": math.log(math.sqrt(to_number(field1))),
"loc": math.log(abs(amount)),
"negative": amount < 0,
"amount": amount,
}
elif kind == "Normal":
return {
"uncertainty type": NormalUncertainty.id,
"scale": math.sqrt(to_number(field1)),
"loc": amount,
"negative": amount < 0,
"amount": amount,
}
elif kind == "Triangle":
return {
"uncertainty type": TriangularUncertainty.id,
"minimum": to_number(field2),
"maximum": to_number(field3),
"loc": amount,
"negative": amount < 0,
"amount": amount,
}
elif kind == "Uniform":
return {
"uncertainty type": UniformUncertainty.id,
"minimum": to_number(field2),
"maximum": to_number(field3),
"loc": amount,
"negative": amount < 0,
"amount": amount,
}
else:
raise ValueError("Unknown uncertainty type: {}".format(kind))
@classmethod
[docs]
def parse_calculated_parameter(cls, line, pm):
"""Parse line in `Calculated parameters` section.
0. name
1. formula
2. comment
Can include multiline comment in TSV.
"""
return {
"name": line[0],
"formula": normalize_simapro_formulae(line[1], pm),
"comment": "; ".join([x for x in line[2:] if x]),
}
@classmethod
[docs]
def parse_input_parameter(cls, line):
"""Parse line in `Input parameters` section.
0. name
1. value (not formula)
2. uncertainty type
3. uncert. param.
4. uncert. param.
5. uncert. param.
6. hidden ("Yes" or "No" - we ignore)
7. comment
"""
ds = cls.create_distribution(*line[1:6])
ds.update({"name": line[0], "comment": "; ".join([x for x in line[7:] if x])})
return ds
@classmethod
[docs]
def parse_biosphere_flow(cls, line, category, pm):
"""Parse biosphere flow line.
0. name
1. subcategory
2. unit
3. value or formula
4. uncertainty type
5. uncert. param.
6. uncert. param.
7. uncert. param.
8. comment
However, sometimes the value is in index 2, and the unit in index 3. Because why not! We assume default ordering unless we find a number in index 2.
"""
unit, amount = line[2], line[3]
if isinstance(to_number(line[2]), Number):
unit, amount = amount, unit
is_formula = not isinstance(to_number(amount), Number)
if is_formula:
ds = {"formula": normalize_simapro_formulae(amount, pm)}
else:
ds = cls.create_distribution(amount, *line[4:8])
ds.update(
{
"name": line[0],
"categories": (category, line[1]),
"unit": unit,
"comment": "; ".join([x for x in line[8:] if x]),
"type": "biosphere",
}
)
return ds
@classmethod
[docs]
def parse_input_line(cls, line, category, pm):
"""Parse technosphere input line.
0. name
1. unit
2. value or formula
3. uncertainty type
4. uncert. param.
5. uncert. param.
6. uncert. param.
7. comment
However, sometimes the value is in index 1, and the unit in index 2. Because why not! We assume default ordering unless we find a number in index 1.
"""
unit, amount = line[1], line[2]
if isinstance(to_number(line[1]), Number):
unit, amount = amount, unit
is_formula = not isinstance(to_number(amount), Number)
if is_formula:
ds = {"formula": normalize_simapro_formulae(amount, pm)}
else:
ds = cls.create_distribution(amount, *line[3:7])
ds.update(
{
"categories": (category,),
"name": line[0],
"unit": unit,
"comment": "; ".join([x for x in line[7:] if x]),
"type": (
"substitution" if category == "Avoided products" else "technosphere"
),
}
)
return ds
@classmethod
[docs]
def parse_final_waste_flow(cls, line, pm):
"""Parse final wate flow line.
0: name
1: subcategory?
2: unit
3. value or formula
4. uncertainty type
5. uncert. param.
6. uncert. param.
7. uncert. param.
However, sometimes the value is in index 2, and the unit in index 3. Because why not! We assume default ordering unless we find a number in index 2.
"""
unit, amount = line[2], line[3]
if isinstance(to_number(line[2]), Number):
unit, amount = amount, unit
is_formula = not isinstance(to_number(amount), Number)
if is_formula:
ds = {"formula": normalize_simapro_formulae(amount, pm)}
else:
ds = cls.create_distribution(amount, *line[4:8])
ds.update(
{
"name": line[0],
"categories": ("Final waste flows", line[1])
if line[1]
else ("Final waste flows",),
"unit": unit,
"comment": "; ".join([x for x in line[8:] if x]),
"type": "technosphere",
}
)
return ds
@classmethod
[docs]
def parse_reference_product(cls, line, pm):
"""Parse reference product line.
0. name
1. unit
2. value or formula
3. allocation
4. waste type
5. category (separated by \\)
6. comment
However, sometimes the value is in index 1, and the unit in index 2. Because why not! We assume default ordering unless we find a number in index 1.
"""
unit, amount = line[1], line[2]
if isinstance(to_number(line[1]), Number):
unit, amount = amount, unit
is_formula = not isinstance(to_number(amount), Number)
if is_formula:
ds = {"formula": normalize_simapro_formulae(amount, pm)}
else:
ds = {"amount": to_number(amount)}
ds.update(
{
"name": line[0],
"unit": unit,
"allocation": to_number(line[3]),
"categories": tuple(line[5].split("\\")),
"comment": "; ".join([x for x in line[6:] if x]),
"type": "production",
}
)
return ds
@classmethod
[docs]
def parse_waste_treatment(cls, line, pm):
"""Parse reference product line.
0. name
1. unit
2. value or formula
3. waste type
4. category (separated by \\)
5. comment
"""
is_formula = not isinstance(to_number(line[2]), Number)
if is_formula:
ds = {"formula": normalize_simapro_formulae(line[2], pm)}
else:
ds = {"amount": to_number(line[2])}
ds.update(
{
"name": line[0],
"unit": line[1],
"categories": tuple(line[4].split("\\")),
"comment": "; ".join([x for x in line[5:] if x]),
"type": "production",
}
)
return ds
@classmethod
[docs]
def read_dataset_metadata(cls, data, index):
metadata = {}
while True:
if not data[index]:
pass
elif data[index] and data[index][0] in SIMAPRO_PRODUCTS:
return metadata, index
elif data[index] and data[index + 1] and data[index][0]:
metadata[data[index][0]] = data[index + 1][0]
index += 1
index += 1
@classmethod
[docs]
def read_data_set(cls, data, index, db_name, filepath, gp, pm):
metadata, index = cls.read_dataset_metadata(data, index)
# `index` is now the `Products` or `Waste Treatment` line
ds = {
"simapro metadata": metadata,
"code": metadata.get("Process identifier") or uuid.uuid4().hex,
"exchanges": [],
"parameters": [],
"database": db_name,
"filename": filepath,
"type": "process",
}
while not data[index] or data[index][0] != "End":
if not data[index] or not data[index][0]:
index += 1
elif data[index][0] in SIMAPRO_TECHNOSPHERE:
category = data[index][0]
index += 1 # Advance to data lines
while (
index < len(data) and data[index] and data[index][0]
): # Stop on blank line
ds["exchanges"].append(
cls.parse_input_line(data[index], category, pm)
)
index += 1
elif data[index][0] in SIMAPRO_BIOSPHERE:
category = data[index][0]
index += 1 # Advance to data lines
while (
index < len(data) and data[index] and data[index][0]
): # Stop on blank line
ds["exchanges"].append(
cls.parse_biosphere_flow(data[index], category, pm)
)
index += 1
elif data[index][0] == "Calculated parameters":
index += 1 # Advance to data lines
while (
index < len(data) and data[index] and data[index][0]
): # Stop on blank line
ds["parameters"].append(
cls.parse_calculated_parameter(data[index], pm)
)
index += 1
elif data[index][0] == "Input parameters":
index += 1 # Advance to data lines
while (
index < len(data) and data[index] and data[index][0]
): # Stop on blank line
ds["parameters"].append(cls.parse_input_parameter(data[index]))
index += 1
elif data[index][0] == "Products":
index += 1 # Advance to data lines
while (
index < len(data) and data[index] and data[index][0]
): # Stop on blank line
ds["exchanges"].append(cls.parse_reference_product(data[index], pm))
index += 1
elif data[index][0] == "Waste treatment":
index += 1 # Advance to data lines
while (
index < len(data) and data[index] and data[index][0]
): # Stop on blank line
ds["exchanges"].append(cls.parse_waste_treatment(data[index], pm))
index += 1
elif data[index][0] == "Final waste flows":
index += 1 # Advance to data lines
while (
index < len(data) and data[index] and data[index][0]
): # Stop on blank line
ds["exchanges"].append(cls.parse_final_waste_flow(data[index], pm))
index += 1
elif data[index][0] in SIMAPRO_END_OF_DATASETS:
# Don't care about processing steps below, as no dataset
# was extracted
raise EndOfDatasets
else:
index += 1
if index == len(data):
break
# Extract name and lowercase
ds["parameters"] = {obj.pop("name").lower(): obj for obj in ds["parameters"]}
# Change all parameter formula values to lowercase if referencing
# global or local parameters
for obj in ds["parameters"].values():
if "formula" in obj:
obj["formula"] = replace_with_lowercase(
obj["formula"], ds["parameters"]
)
obj["formula"] = replace_with_lowercase(obj["formula"], gp)
# Change all exchange values to lowercase if referencing
# global or local parameters
for obj in ds["exchanges"]:
if "formula" in obj:
obj["formula"] = replace_with_lowercase(
obj["formula"], ds["parameters"]
)
obj["formula"] = replace_with_lowercase(obj["formula"], gp)
ps = ParameterSet(
ds["parameters"], {key: value["amount"] for key, value in gp.items()}
)
# Changes in-place
ps(ds["exchanges"])
if not ds["parameters"]:
del ds["parameters"]
return ds, index