Source code for bw2io.extractors.simapro_lcia_csv

import csv
import os
from numbers import Number

from bw2data.logs import close_log, get_io_logger
from stats_arrays import *

[docs] INTRODUCTION = """Starting SimaPro import: \tFilepath: %s \tDelimiter: %s """
[docs] SKIPPABLE_SECTIONS = { "Airborne emissions", "Economic issues", "Emissions to soil", "Final waste flows", "Quantities", "Raw materials", "Units", "Waterborne emissions", }
[docs] class EndOfDatasets(Exception): pass
[docs] strip_delete = lambda obj: obj.replace("\x7f", "") if isinstance(obj, str) else obj
[docs] class SimaProLCIACSVExtractor(object): """ Extract data from SimaPro LCIACSV file format. Parameters ---------- filepath: str Filepath of the SimaPro LCIACSV file. delimiter: str, optional (default: ";") Delimiter used in the SimaPro LCIACSV file. encoding: str, optional (default: "cp1252") Encoding of the SimaPro LCIACSV file. Raises ------ AssertionError If the filepath does not exist or the file is not a valid SimaPro export file. Returns ------- list List of datasets extracted from the SimaPro LCIACSV file. """ @classmethod
[docs] def extract(cls, filepath, delimiter=";", encoding="cp1252", **kwargs): assert os.path.exists(filepath), "Can't find file %s" % filepath log, logfile = get_io_logger("SimaPro-LCIA-extractor") log.info( INTRODUCTION % ( filepath, repr(delimiter), ) ) with open(filepath, "r", encoding=encoding) as csv_file: reader = csv.reader(csv_file, delimiter=delimiter) lines = [ strip_delete(line) if not all(i == "" for i in line) else [] for line in reader ] # Check if valid SimaPro file assert "SimaPro" in lines[0][0], "File is not valid SimaPro export" datasets = [] index = cls.get_next_method_index(lines, 0) while True: try: ds, index = cls.read_method_data_set(lines, index, filepath) datasets.extend(ds) index = cls.get_next_method_index(lines, index) except EndOfDatasets: break close_log(log) return datasets
@classmethod
[docs] def get_next_method_index(cls, data, index): """ Find the index of the next "Method" in the given data starting from the specified index, skipping any sections specified in SKIPPABLE_SECTIONS. Parameters ---------- data : list of lists The nested list containing the data. index : int The starting index to search for the next "Method". Returns ------- int The index of the next "Method" in the data. Raises ------ EndOfDatasets If the file ends without extra metadata. """ while True: try: if data[index] and data[index][0] in SKIPPABLE_SECTIONS: index = cls.skip_to_section_end(data, index) elif data[index] and data[index][0] in ("Method", "Impact category"): return index + 1 except IndexError: # File ends without extra metadata raise EndOfDatasets index += 1
@classmethod
[docs] def skip_to_section_end(cls, data, index): """ Skip to the end of the current section in the data starting from the specified index. Parameters ---------- data : list of lists The nested list containing the data. index : int The starting index to skip from. Returns ------- int The index of the end of the current section in the data. """ while (data[index][0] if data[index] else "").strip() != "End": index += 1 return index
@classmethod
[docs] def parse_cf(cls, line): """Parse line in `Substances` section. 0. category 1. subcategory 2. flow 3. CAS number 4. CF 5. unit """ categories = (line[0], line[1]) return { "amount": float(line[4].replace(",", ".")), "CAS number": line[3], "categories": categories, "name": line[2], "unit": line[5], }
@classmethod
[docs] def read_metadata(cls, data, index): """Read metadata from `data` starting at `index`. Parameters ---------- data : list A list of lists containing the data to be processed. index : int The starting index to read metadata from. Returns ------- tuple A tuple containing a dictionary of metadata and the index where the metadata reading ended. """ metadata = {} while True: if not data[index]: pass elif data[index] and data[index][0] == "Impact category": return metadata, index elif data[index] and data[index + 1] and data[index][0]: metadata[data[index][0]] = data[index + 1][0] index += 1 index += 1
@classmethod
[docs] def read_method_data_set(cls, data, index, filepath): """ Read method data set from `data` starting at `index`. Parameters ---------- data : list A list of lists containing the data to be processed. index : int The starting index to read method data set from. filepath : str The file path of the method data set. Returns ------- list A list of completed method data sets. int The index where the method data set reading ended. Raises ------ ValueError """ metadata, index = cls.read_metadata(data, index) method_root_name = metadata.pop("Name") description = metadata.pop("Comment") category_data, nw_data, damage_category_data, completed_data = [], [], [], [] # `index` is now the `Impact category` line while not data[index] or data[index][0] != "End": if not data[index] or not data[index][0]: index += 1 elif data[index][0] == "Impact category": catdata, index = cls.get_category_data(data, index + 1) category_data.append(catdata) elif data[index][0] == "Normalization-Weighting set": nw_dataset, index = cls.get_normalization_weighting_data( data, index + 1 ) nw_data.append(nw_dataset) elif data[index][0] == "Damage category": catdata, index = cls.get_damage_category_data(data, index + 1) damage_category_data.append(catdata) else: raise ValueError for ds in category_data: completed_data.append( { "description": description, "name": (method_root_name, ds[0]), "unit": ds[1], "filename": filepath, "exchanges": ds[2], } ) for ds in nw_data: completed_data.append( { "description": description, "name": (method_root_name, ds[0]), "unit": metadata["Weighting unit"], "filename": filepath, "exchanges": cls.get_all_cfs(ds[1], category_data), } ) for ds in damage_category_data: completed_data.append( { "description": description, "name": (method_root_name, ds[0]), "unit": ds[1], "filename": filepath, "exchanges": cls.get_damage_exchanges(ds[2], category_data), } ) return completed_data, index
@classmethod
[docs] def get_all_cfs(cls, nw_data, category_data): """ Get all CFs from `nw_data` and `category_data`. Parameters ---------- nw_data : list A list of tuples containing normalization-weighting (NW) set names and scales. category_data : list A list of tuples containing impact category names, units, and CF data. Returns ------- list A list of all CFs. """ def rescale(cf, scale): cf["amount"] *= scale return cf cfs = [] for nw_name, scale in nw_data: for cat_name, _, cf_data in category_data: if cat_name == nw_name: cfs.extend([rescale(cf, scale) for cf in cf_data]) return cfs
@classmethod
[docs] def get_damage_exchanges(cls, damage_data, category_data): """ Calculate the damage exchanges based on damage data and category data. Parameters ---------- damage_data : list of tuples A list of tuples containing the name and scale of the damage category_data : list of tuples A list of tuples containing the name, unit, and data of each impact category Returns ------- list of dictionaries A list of dictionaries with the calculated damage exchanges of each impact category """ def rescale(cf, scale): cf["amount"] *= scale return cf cfs = [] for damage_name, scale in damage_data: for cat_name, _, cf_data in category_data: if cat_name == damage_name: # Multiple impact categories might use the same exchanges # So scale and increment the amount if it exists, scale and append if it doesn't for cf in cf_data: c_name, c_categories = cf["name"], cf["categories"] found_cf = False for existing_cf in cfs: if ( existing_cf["name"] == c_name and existing_cf["categories"] == c_categories ): existing_cf["amount"] += cf["amount"] * scale found_cf = True continue if found_cf: continue cfs.extend([rescale(cf, scale) for cf in cf_data]) return cfs
@classmethod
[docs] def get_category_data(cls, data, index): """ Parse impact category data and return its name, unit, and data. Parameters ---------- data : list of lists A list of lists with the data for all categories index : int The index of the current impact category in the list Returns ------- tuple A tuple with the name, unit, and data for the impact category """ cf_data = [] # First line is name and unit name, unit = data[index][:2] index += 2 assert data[index][0] == "Substances" index += 1 while data[index]: cf_data.append(cls.parse_cf(data[index])) index += 1 return (name, unit, cf_data), index
@classmethod
[docs] def get_damage_category_data(cls, data, index): """ Parse damage category data and return the name, unit, and data of the category. Parameters ---------- data : list of lists A list of lists with the data of the damage categories index : int The index of the current damage category in the list Returns ------- tuple A tuple with the name, unit, and data for the damage category """ damage_data = [] # First line is name and unit name, unit = data[index][:2] index += 2 assert data[index][0] == "Impact categories" index += 1 while data[index]: method, scalar = data[index][:2] damage_data.append((method, float(scalar.replace(",", ".")))) index += 1 return (name, unit, damage_data), index
@classmethod
[docs] def get_normalization_weighting_data(cls, data, index): # TODO: Only works for weighting data, no addition or normalization nw_data = [] name = data[index][0] index += 2 assert data[index][0] == "Weighting" index += 1 while data[index]: cat, weight = data[index][:2] index += 1 if weight == "0": continue nw_data.append((cat, float(weight.replace(",", ".")))) return (name, nw_data), index