import csv
import datetime
import itertools
import json
import os
import shutil
import sys
from functools import partial
from io import StringIO
from pathlib import Path
from typing import Optional, Union
from bw2parameters import ParameterSet
from loguru import logger
from platformdirs import user_log_dir
from .blocks import (
DamageCategory,
DatabaseCalculatedParameters,
DatabaseInputParameters,
EmptyBlock,
GenericBiosphere,
ImpactCategory,
LiteratureReference,
Method,
NormalizationWeightingSet,
Process,
ProjectCalculatedParameters,
ProjectInputParameters,
Quantities,
SimaProCSVBlock,
SystemDescription,
Units,
)
from .csv_reader import BeKindRewind
from .errors import IndeterminateBlockEnd
from .header import SimaProCSVType, parse_header
from .parameters import (
FormulaSubstitutor,
add_prefix_to_uppercase_input_parameters,
build_substitutes,
prepare_formulas,
substitute_in_formulas,
)
from .units import normalize_units
from .utils import json_serializer, parameter_set_evaluate_each_formula, get_true_length
[docs]
def dummy(data, *args):
return data
[docs]
CONTROL_BLOCK_MAPPING = {
"Database Calculated parameters": DatabaseCalculatedParameters,
"Database Input parameters": DatabaseInputParameters,
"Literature reference": LiteratureReference,
"Project Input parameters": ProjectInputParameters,
"Project Calculated parameters": ProjectCalculatedParameters,
"Quantities": Quantities,
"Product stage": dummy,
"Units": Units,
"Process": Process,
"Method": Method,
"Impact category": ImpactCategory,
"Normalization-Weighting set": NormalizationWeightingSet,
"Damage category": DamageCategory,
}
# These are lists of flows at the end of the file
[docs]
INDETERMINATE_SECTION_ERROR = """
Flow lists are given at the end of this file, but the section headings for
flow lists are also used in inventory process descriptions. We can normally
use the text 'End' to show when a process block stops, but this file doesn't
seem to use 'End' sections. We therefore can't tell if '{}' is a new block
or not, and can't parse this file.
"""
[docs]
class SimaProCSV:
def __init__(
self,
path_or_stream: Union[Path, StringIO],
encoding: str = "sloppy-windows-1252",
database_name: Optional[str] = None,
stderr_logs: bool = True,
write_logs: bool = True,
copy_logs: bool = False,
):
"""Read a SimaPro CSV file object, and parse the contents.
We start with the header, as this defines how the rest of the file is to be parsed.
It gives the CSV delimiter and decimal separator.
We then break the file into logical chunks, such as processes or LCIA impact categories."""
# Control logging level
now = datetime.datetime.now().isoformat()[:19].replace(":", "-")
if isinstance(path_or_stream, Path):
if not path_or_stream.is_file():
raise ValueError(f"Given `Path` {path_or_stream} is not a file")
if not os.access(path_or_stream, os.R_OK):
raise ValueError(f"File {path_or_stream} exists but lacks read permission")
data = open(path_or_stream, encoding=encoding)
self.logs_dir = (
Path(user_log_dir("bw_simapro_csv", "pylca")) / f"{path_or_stream.stem}-{now}"
)
logger.info("Writing logs to {d}", d=str(self.logs_dir))
elif not isinstance(path_or_stream, StringIO):
raise ValueError(
f"`path_or_stream` must be `Path` or `StringIO` - got {type(path_or_stream)}"
)
else:
# We have to assume that the StringIO object was created with
# some reasonable newline definition.
data = path_or_stream
self.logs_dir = Path(user_log_dir("bw_simapro_csv", "pylca")) / f"StringIO-{now}"
self.configure_logs(stderr_logs, write_logs)
# Converting Pydantic back to dict to release memory
header, header_lines = parse_header(data)
if header.kind in (SimaProCSVType.processes, SimaProCSVType.stages):
self.database_name = database_name or self.header["project"]
if not self.database_name:
raise ValueError(
"Can't find database name in parameter `database_name` or SimaPro header"
)
logger.info("Using database name '{n}'", n=self.database_name)
[docs]
self.uses_end_text = False
[docs]
self.filepath = str(path_or_stream) if isinstance(path_or_stream, Path) else "<StringIO>"
logger.info(
"SimaPro CSV import started.\n\tFile: '{file}'\n\tDelimiter: '{delimiter}'\n\tName: '{name}'",
file=path_or_stream if isinstance(path_or_stream, Path) else "<StringIO>",
delimiter="<tab>" if self.header["delimiter"] == "\t" else self.header["delimiter"],
name=self.header["project"] or "(Not given)",
)
logger.debug(
"Header information:\n\theader lines: {header_lines}\n\t{header}",
header_lines=header_lines,
header="\n\t".join(["{}: {}".format(k, v) for k, v in self.header.items()]),
)
if self.header["delimiter"] not in {";", ".", "\t", "|", " "}:
logger.warning(f"SimaPro CSV file uses unusual delimiter '{self.header['delimiter']}'")
rewindable_csv_reader = BeKindRewind(
csv.reader(data, delimiter=self.header["delimiter"], strict=True),
clean_elements=True,
offset=header_lines,
)
while block := self.get_next_block(rewindable_csv_reader, self.header):
if block is not EmptyBlock:
self.blocks.append(block)
if header.kind in (SimaProCSVType.processes, SimaProCSVType.stages):
self.resolve_parameters()
normalize_units(self.blocks)
if copy_logs:
self.copy_log_dir(Path.cwd())
def __iter__(self):
return iter(self.blocks)
[docs]
def to_brightway(
self,
filepath: Optional[Path] = None,
separate_products: bool = True,
shorten_names: bool = True,
) -> Union[dict, Path]:
if self.header["kind"] == SimaProCSVType.processes:
from .brightway import lci_to_brightway
data = lci_to_brightway(
self, separate_products=separate_products, shorten_names=shorten_names
)
if filepath is not None:
with open(filepath, "w") as f:
json.dump(data, f, indent=2, ensure_ascii=False, default=json_serializer)
return filepath
else:
return data
else:
raise TypeError("Only process exports are currently supported")
[docs]
def copy_log_dir(self, base_dir: Path) -> None:
"""Copy the logs directory and its files to `base_dir`"""
if not isinstance(base_dir, Path):
raise ValueError(f"`base_dir` must be a `pathlib.Path` instance; got {type(base_dir)}")
if not base_dir.is_dir():
raise ValueError(f"`base_dir` must be an existing directory; got {type(base_dir)}")
return shutil.copytree(self.logs_dir, base_dir / self.logs_dir.stem)
[docs]
def data_list_not_empty(self, lst: list) -> bool:
return any(line[1] for line in lst)
[docs]
def get_next_block(
self, rewindable_csv_reader: BeKindRewind, header: dict
) -> Optional[SimaProCSVBlock]:
data = []
for line in rewindable_csv_reader:
if not any(line):
# Skip empty lines at beginning of block
continue
if get_true_length(line) == 1 and line[0] == "End":
# Empty block
self.uses_end_text = True
return EmptyBlock
# File object exhausted
break
else:
# Already at end of file; return false-y result to break `while`
return None
block_type = line[0]
if block_type in CONTROL_BLOCK_MAPPING:
block_class = CONTROL_BLOCK_MAPPING[block_type]
elif block_type in INDETERMINATE_SECTION_HEADERS:
if not self.uses_end_text:
raise IndeterminateBlockEnd(INDETERMINATE_SECTION_ERROR.format(block_type))
block_class = INDETERMINATE_SECTION_HEADERS[block_type]
else:
raise ValueError(f"Can't process unknown block type {block_type}")
for line in rewindable_csv_reader:
if line and line[0] == "End":
self.uses_end_text = True
return block_class(data, header) if self.data_list_not_empty(data) else EmptyBlock
if line and line[0] in CONTROL_BLOCK_MAPPING:
rewindable_csv_reader.rewind()
return block_class(data, header) if self.data_list_not_empty(data) else EmptyBlock
data.append((rewindable_csv_reader.line_no, line))
# EOF
return block_class(data, header) if self.data_list_not_empty(data) else None
[docs]
def resolve_parameters(self) -> None:
"""Read in input parameters, and resolve formulas."""
dcp = [
add_prefix_to_uppercase_input_parameters(prepare_formulas(b.parsed, self.header))
for b in self.blocks
if isinstance(b, DatabaseCalculatedParameters)
]
dip = [
add_prefix_to_uppercase_input_parameters(b.parsed)
for b in self.blocks
if isinstance(b, DatabaseInputParameters)
]
pcp = [
add_prefix_to_uppercase_input_parameters(prepare_formulas(b.parsed, self.header))
for b in self.blocks
if isinstance(b, ProjectCalculatedParameters)
]
pip = [
add_prefix_to_uppercase_input_parameters(b.parsed)
for b in self.blocks
if isinstance(b, ProjectInputParameters)
]
substitutes = build_substitutes(
itertools.chain(*pip), itertools.chain(*dip)
) | build_substitutes(itertools.chain(*dcp), itertools.chain(*pcp))
visitor = FormulaSubstitutor(substitutes)
for obj in itertools.chain(*dcp):
substitute_in_formulas(obj, visitor)
global_params = {o["name"]: o["amount"] for o in itertools.chain(*dip)} | {
o["name"]: o["amount"] for o in itertools.chain(*pip)
}
ps = ParameterSet({o["name"]: o for o in itertools.chain(*dcp)}, global_params)
parameter_set_evaluate_each_formula(ps)
substitutes = substitutes | {
o["original_name"].upper(): o["name"] for o in itertools.chain(*dcp)
}
visitor = FormulaSubstitutor(substitutes)
global_params = global_params | {o["name"]: o["amount"] for o in itertools.chain(*dcp)}
for obj in itertools.chain(*pcp):
substitute_in_formulas(obj, visitor)
ps = ParameterSet({o["name"]: o for o in itertools.chain(*pcp)}, global_params)
parameter_set_evaluate_each_formula(ps)
substitutes = substitutes | {
o["original_name"].upper(): o["name"] for o in itertools.chain(*pcp)
}
visitor = FormulaSubstitutor(substitutes)
global_params = global_params | {o["name"]: o["amount"] for o in itertools.chain(*pcp)}
logger.info(
"Extracted and cleaned {n} process datasets",
n=sum([1 for block in self if isinstance(block, Process)]),
)
for block in filter(lambda b: isinstance(b, Process), self):
block.resolve_local_parameters(global_params=global_params, substitutes=substitutes)
block.check_waste_production_model_consistency()
block.supplement_biosphere_edges(blocks=self.blocks)