Source code for bw2io.extractors.simapro_lcia_95project_csv

import csv
from pathlib import Path

from bw2data.logs import close_log, get_io_logger
from stats_arrays import *

from bw2io.utils import standardize_method_to_len_3

# SKIPPABLE_SECTIONS = {
#     "Airborne emissions",
#     "Economic issues",
#     "Emissions to soil",
#     "Final waste flows",
#     "Quantities",
#     "Raw materials",
#     "Units",
#     "Waterborne emissions",
# }


[docs] class EndOfDatasets(Exception): pass
[docs] class SimaProLCIA95ProjectCSVExtractor: """ Extract data from SimaPro LCIA 9.5 Project CSV file format. Differs from `SimaProLCIACSVExtractor` in that this format seems not to use `End` at the end of sections. Parameters ---------- filepath: str Filepath of the SimaPro LCIACSV file. delimiter: str, optional (default: ";") Delimiter used in the SimaPro LCIACSV file. encoding: str, optional (default: "cp1252") Encoding of the SimaPro LCIACSV file. Raises ------ AssertionError If the filepath does not exist or the file is not a valid SimaPro export file. Returns ------- list List of impact categories extracted from the SimaPro file. """ @classmethod
[docs] def extract(cls, filepath: Path, delimiter: str = ";", encoding: str = "cp1252"): filepath = Path(filepath) assert filepath.is_file(), f"Can't find file {filepath}" log, logfile = get_io_logger("SimaPro-LCIA-extractor") log.info( f"""Starting SimaPro import: Filepath: {filepath} Delimiter: {delimiter}""" ) strip_delete = lambda obj: ( obj.strip().replace("\x7f", "") if isinstance(obj, str) else obj ) empty_lines = lambda line: line if any(line) else None with open(filepath, "r", encoding=encoding) as csv_file: reader = csv.reader(csv_file, delimiter=delimiter) lines = [[strip_delete(elem) for elem in line] for line in reader] # Check if valid SimaPro file assert "SimaPro" in lines[0][0], "File is not valid SimaPro export" impact_categories, context = [], {} sections = cls.clean_sections(cls.split_into_sections(lines)) for section in sections: if section[0][0].startswith("SimaPro"): context["simapro version"] = section[0][1] elif section[0][0] == "Name": context["method"] = section[0][1] elif section[0][0] == "Comment": context["comment"] = "\n".join([line[1] for line in section]) elif section[0][0].startswith("Use"): context["configuration"] = dict(section) elif section[0][0] == "Impact category": impact_categories.append( { "impact category": section[0][1], "unit": section[0][2], "cfs": [cls.parse_cf(line) for line in section[1:]], **context, } ) elif section[0][0] == "Normalization-Weighting set": continue elif section[0][0] == "Normalization": pass elif section[0][0] == "Weighting": pass close_log(log) return impact_categories
@classmethod
[docs] def clean_sections(cls, sections: list) -> list: """Remove empty sections, and empty lines from sections""" return [ [line for line in section if line != []] for section in sections if section != [[]] ]
@classmethod
[docs] def split_into_sections(cls, data: list) -> list: """Split the SimaPro file into sections using the blank line pattern""" split_locations = [2] for index, line in enumerate(data): if line == []: split_locations.append(index + 1) sections = ( [data[: split_locations[0]]] + [ data[split_locations[index] : split_locations[index + 1]] for index in range(len(split_locations) - 1) ] + [data[split_locations[-1] :]] ) return sections
@classmethod
[docs] def parse_cf(cls, line): """Parse line in `Substances` section. 0. category 1. subcategory 2. flow 3. CAS number 4. CF 5. unit 6. damage rate """ return { "categories": (line[0], line[1]), "name": line[2], "CAS number": line[3], "amount": float(line[4].replace(",", ".")), "unit": line[5], "damage_rate": line[6] if len(line) >= 7 else None, }
# @classmethod # def read_method_data_set(cls, data, index, filepath): # """ # Read method data set from `data` starting at `index`. # Parameters # ---------- # data : list # A list of lists containing the data to be processed. # index : int # The starting index to read method data set from. # filepath : str # The file path of the method data set. # Returns # ------- # list # A list of completed method data sets. # int # The index where the method data set reading ended. # Raises # ------ # ValueError # """ # metadata, index = cls.read_metadata(data, index) # method_root_name = metadata.pop("Name") # description = metadata.pop("Comment") # category_data, nw_data, damage_category_data, completed_data = [], [], [], [] # # `index` is now the `Impact category` line # while not data[index] or data[index][0] != "End": # if not data[index] or not data[index][0]: # index += 1 # elif data[index][0] == "Impact category": # catdata, index = cls.get_category_data(data, index + 1) # category_data.append(catdata) # elif data[index][0] == "Normalization-Weighting set": # nw_dataset, index = cls.get_normalization_weighting_data( # data, index + 1 # ) # nw_data.append(nw_dataset) # elif data[index][0] == "Damage category": # catdata, index = cls.get_damage_category_data(data, index + 1) # damage_category_data.append(catdata) # else: # raise ValueError # for ds in category_data: # completed_data.append( # { # "description": description, # "name": (method_root_name, ds[0]), # "unit": ds[1], # "filename": filepath, # "exchanges": ds[2], # } # ) # for ds in nw_data: # completed_data.append( # { # "description": description, # "name": (method_root_name, ds[0]), # "unit": metadata["Weighting unit"], # "filename": filepath, # "exchanges": cls.get_all_cfs(ds[1], category_data), # } # ) # for ds in damage_category_data: # completed_data.append( # { # "description": description, # "name": (method_root_name, ds[0]), # "unit": ds[1], # "filename": filepath, # "exchanges": cls.get_damage_exchanges(ds[2], category_data), # } # ) # return completed_data, index # @classmethod # def get_all_cfs(cls, nw_data, category_data): # """ # Get all CFs from `nw_data` and `category_data`. # Parameters # ---------- # nw_data : list # A list of tuples containing normalization-weighting (NW) set names and scales. # category_data : list # A list of tuples containing impact category names, units, and CF data. # Returns # ------- # list # A list of all CFs. # """ # def rescale(cf, scale): # cf["amount"] *= scale # return cf # cfs = [] # for nw_name, scale in nw_data: # for cat_name, _, cf_data in category_data: # if cat_name == nw_name: # cfs.extend([rescale(cf, scale) for cf in cf_data]) # return cfs # @classmethod # def get_damage_exchanges(cls, damage_data, category_data): # """ # Calculate the damage exchanges based on damage data and category data. # Parameters # ---------- # damage_data : list of tuples # A list of tuples containing the name and scale of the damage # category_data : list of tuples # A list of tuples containing the name, unit, and data of each impact category # Returns # ------- # list of dictionaries # A list of dictionaries with the calculated damage exchanges of each impact category # """ # def rescale(cf, scale): # cf["amount"] *= scale # return cf # cfs = [] # for damage_name, scale in damage_data: # for cat_name, _, cf_data in category_data: # if cat_name == damage_name: # # Multiple impact categories might use the same exchanges # # So scale and increment the amount if it exists, scale and append if it doesn't # for cf in cf_data: # c_name, c_categories = cf["name"], cf["categories"] # found_cf = False # for existing_cf in cfs: # if ( # existing_cf["name"] == c_name # and existing_cf["categories"] == c_categories # ): # existing_cf["amount"] += cf["amount"] * scale # found_cf = True # continue # if found_cf: # continue # cfs.extend([rescale(cf, scale) for cf in cf_data]) # return cfs # @classmethod # def get_category_data(cls, data, index): # """ # Parse impact category data and return its name, unit, and data. # Parameters # ---------- # data : list of lists # A list of lists with the data for all categories # index : int # The index of the current impact category in the list # Returns # ------- # tuple # A tuple with the name, unit, and data for the impact category # """ # cf_data = [] # # First line is name and unit # name, unit = data[index][:2] # index += 2 # assert data[index][0] == "Substances" # index += 1 # while data[index]: # cf_data.append(cls.parse_cf(data[index])) # index += 1 # return (name, unit, cf_data), index # @classmethod # def get_damage_category_data(cls, data, index): # """ # Parse damage category data and return the name, unit, and data of the category. # Parameters # ---------- # data : list of lists # A list of lists with the data of the damage categories # index : int # The index of the current damage category in the list # Returns # ------- # tuple # A tuple with the name, unit, and data for the damage category # """ # damage_data = [] # # First line is name and unit # name, unit = data[index][:2] # index += 2 # assert data[index][0] == "Impact categories" # index += 1 # while data[index]: # method, scalar = data[index][:2] # damage_data.append((method, float(scalar.replace(",", ".")))) # index += 1 # return (name, unit, damage_data), index # @classmethod # def get_normalization_weighting_data(cls, data, index): # # TODO: Only works for weighting data, no addition or normalization # nw_data = [] # name = data[index][0] # index += 2 # assert data[index][0] == "Weighting" # index += 1 # while data[index]: # cat, weight = data[index][:2] # index += 1 # if weight == "0": # continue # nw_data.append((cat, float(weight.replace(",", ".")))) # return (name, nw_data), index