Source code for bw2io.extractors.ecospold2

# -*- coding: utf-8 -*-
from bw2data.utils import recursive_str_to_unicode
from lxml import objectify
from stats_arrays.distributions import *
import math
import multiprocessing
import os
import pyprind
import sys


[docs] PM_MAPPING = { "reliability": "reliability", "completeness": "completeness", "temporalCorrelation": "temporal correlation", "geographicalCorrelation": "geographical correlation", "furtherTechnologyCorrelation": "further technological correlation", }
[docs] ACTIVITY_TYPES = { 0: "ordinary transforming activity", 1: "market activity", 2: "IO activity", 3: "Residual activity", 4: "production mix", 5: "import activity", 6: "supply mix", 7: "export activity", 8: "re-export activity", 9: "correction activity", 10: "market group", }
[docs] def getattr2(obj, attr): try: return getattr(obj, attr) except: return {}
[docs] TOO_LOW = """Lognormal scale value at or below zero: {}. Reverting to undefined uncertainty."""
[docs] TOO_HIGH = """Lognormal scale value impossibly high: {}. Reverting to undefined uncertainty."""
[docs] class Ecospold2DataExtractor(object): @classmethod
[docs] def extract_technosphere_metadata(cls, dirpath): def extract_metadata(o): return {"name": o.name.text, "unit": o.unitName.text, "id": o.get("id")} fp = os.path.join(dirpath, "IntermediateExchanges.xml") assert os.path.exists(fp), "Can't find IntermediateExchanges.xml" root = objectify.parse(open(fp, encoding="utf-8")).getroot() return [extract_metadata(ds) for ds in root.iterchildren()]
@classmethod
[docs] def extract(cls, dirpath, db_name, use_mp=True): assert os.path.exists(dirpath) if os.path.isdir(dirpath): filelist = [ filename for filename in os.listdir(dirpath) if os.path.isfile(os.path.join(dirpath, filename)) and filename.split(".")[-1].lower() == "spold" ] elif os.path.isfile(dirpath): filelist = [dirpath] else: raise OSError("Can't understand path {}".format(dirpath)) if sys.version_info < (3, 0): use_mp = False if use_mp: with multiprocessing.Pool(processes=multiprocessing.cpu_count()) as pool: print("Extracting XML data from {} datasets".format(len(filelist))) results = [ pool.apply_async( Ecospold2DataExtractor.extract_activity, args=(dirpath, x, db_name), ) for x in filelist ] data = [p.get() for p in results] else: pbar = pyprind.ProgBar( len(filelist), title="Extracting ecospold2 files:", monitor=True ) data = [] for index, filename in enumerate(filelist): data.append(cls.extract_activity(dirpath, filename, db_name)) pbar.update(item_id=filename[:15]) print(pbar) if sys.version_info < (3, 0): print("Converting to unicode") return recursive_str_to_unicode(data) else: return data
@classmethod
[docs] def condense_multiline_comment(cls, element): try: return "\n".join( [ child.text for child in element.iterchildren() if child.tag == "{http://www.EcoInvent.org/EcoSpold02}text" ] + [ "Image: " + child.text for child in element.iterchildren() if child.tag == "{http://www.EcoInvent.org/EcoSpold02}imageUrl" ] ) except: return ""
@classmethod
[docs] def extract_activity(cls, dirpath, filename, db_name): root = objectify.parse( open(os.path.join(dirpath, filename), encoding="utf-8") ).getroot() if hasattr(root, "activityDataset"): stem = root.activityDataset else: stem = root.childActivityDataset comments = [ cls.condense_multiline_comment( getattr2(stem.activityDescription.activity, "generalComment") ), ( "Included activities start: ", getattr2( stem.activityDescription.activity, "includedActivitiesStart" ).get("text"), ), ( "Included activities end: ", getattr2( stem.activityDescription.activity, "includedActivitiesEnd" ).get("text"), ), ( "Geography: ", cls.condense_multiline_comment( getattr2(stem.activityDescription.geography, "comment") ), ), ( "Technology: ", cls.condense_multiline_comment( getattr2(stem.activityDescription.technology, "comment") ), ), ( "Time period: ", cls.condense_multiline_comment( getattr2(stem.activityDescription.timePeriod, "comment") ), ), ] comment = "\n".join( [ (" ".join(x) if isinstance(x, tuple) else x) for x in comments if (x[1] if isinstance(x, tuple) else x) ] ) classifications = [ (el.classificationSystem.text, el.classificationValue.text) for el in stem.activityDescription.iterchildren() if el.tag == u"{http://www.EcoInvent.org/EcoSpold02}classification" ] data = { "comment": comment, "classifications": classifications, "activity type": ACTIVITY_TYPES[ int(stem.activityDescription.activity.get("specialActivityType") or 0) ], 'activity': stem.activityDescription.activity.get('id'), 'database': db_name, "exchanges": [ cls.extract_exchange(exc) for exc in stem.flowData.iterchildren() if "parameter" not in exc.tag ], 'filename': os.path.basename(filename), 'location': stem.activityDescription.geography.shortname.text, 'name': stem.activityDescription.activity.activityName.text, 'synonyms': [s.text for s in getattr(stem.activityDescription.activity, 'synonym', [])], "parameters": dict( [ cls.extract_parameter(exc) for exc in stem.flowData.iterchildren() if "parameter" in exc.tag ] ), "authors": { "data entry": { "name": stem.administrativeInformation.dataEntryBy.get( "personName" ), "email": stem.administrativeInformation.dataEntryBy.get( "personEmail" ), }, "data generator": { "name": stem.administrativeInformation.dataGeneratorAndPublication.get( "personName" ), "email": stem.administrativeInformation.dataGeneratorAndPublication.get( "personEmail" ), }, }, "type": "process", } return data
@classmethod
[docs] def abort_exchange(cls, exc, comment=None): exc["uncertainty type"] = UndefinedUncertainty.id exc["loc"] = exc["amount"] for key in ("scale", "shape", "minimum", "maximum"): if key in exc: del exc[key] exc["comment"] = exc.get("comment", "") if exc["comment"]: exc["comment"] += "\n" exc["comment"] += ( comment or "Invalid parameters - set to undefined uncertainty." )
@classmethod
[docs] def extract_uncertainty_dict(cls, obj): data = { "amount": float(obj.get("amount")), } if obj.get("formula"): data["formula"] = obj.get("formula") if hasattr(obj, "uncertainty"): unc = obj.uncertainty if hasattr(unc, "pedigreeMatrix"): data["pedigree"] = dict( [ (PM_MAPPING[key], int(unc.pedigreeMatrix.get(key))) for key in PM_MAPPING ] ) if hasattr(unc, "lognormal"): data.update( { "uncertainty type": LognormalUncertainty.id, "loc": float(unc.lognormal.get("mu")), "scale": math.sqrt( float(unc.lognormal.get("varianceWithPedigreeUncertainty")) ), } ) if unc.lognormal.get("variance"): data["scale without pedigree"] = math.sqrt( float(unc.lognormal.get("variance")) ) if data["scale"] <= 0: cls.abort_exchange(data, TOO_LOW.format(data["scale"])) elif data["scale"] > 25: cls.abort_exchange(data, TOO_HIGH.format(data["scale"])) elif hasattr(unc, "normal"): data.update( { "uncertainty type": NormalUncertainty.id, "loc": float(unc.normal.get("meanValue")), "scale": math.sqrt( float(unc.normal.get("varianceWithPedigreeUncertainty")) ), } ) if unc.normal.get("variance"): data["scale without pedigree"] = math.sqrt( float(unc.normal.get("variance")) ) if data["scale"] <= 0: cls.abort_exchange(data) elif hasattr(unc, "triangular"): data.update( { "uncertainty type": TriangularUncertainty.id, "minimum": float(unc.triangular.get("minValue")), "loc": float(unc.triangular.get("mostLikelyValue")), "maximum": float(unc.triangular.get("maxValue")), } ) if data["minimum"] >= data["maximum"]: cls.abort_exchange(data) elif hasattr(unc, "uniform"): data.update( { "uncertainty type": UniformUncertainty.id, "loc": data["amount"], "minimum": float(unc.uniform.get("minValue")), "maximum": float(unc.uniform.get("maxValue")), } ) if data["minimum"] >= data["maximum"]: cls.abort_exchange(data) elif hasattr(unc, "undefined"): data.update( { "uncertainty type": UndefinedUncertainty.id, "loc": data["amount"], } ) else: raise ValueError("Unknown uncertainty type") else: data.update( {"uncertainty type": UndefinedUncertainty.id, "loc": data["amount"],} ) return data
@classmethod
[docs] def extract_parameter(cls, exc): name = exc.get("variableName") data = { "description": exc.name.text, "id": exc.get("parameterId"), } if hasattr(exc, "unitName"): data["unit"] = exc.unitName.text if hasattr(exc, "comment"): data["comment"] = exc.comment.text data.update(cls.extract_uncertainty_dict(exc)) if name is None: name = "Unnamed parameter: {}".format(data["id"]) data["unnamed"] = True return name, data
@classmethod
[docs] def extract_properties(cls, exc): properties = {} for obj in exc.iterchildren(): if not obj.tag.endswith("property"): continue properties[obj.name.text] = {"amount": float(obj.get("amount"))} if hasattr(obj, "unitName"): properties[obj.name.text]["unit"] = obj.unitName.text if hasattr(obj, "comment"): properties[obj.name.text]["comment"] = obj.comment.text return properties
@classmethod
[docs] def extract_exchange(cls, exc): """Process exchange. Input groups are: 1. Materials/fuels 2. Electricity/Heat 3. Services 4. From environment (elementary exchange only) 5. FromTechnosphere Output groups are: 0. ReferenceProduct 2. By-product 3. MaterialForTreatment 4. To environment (elementary exchange only) 5. Stock addition """ if exc.tag == "{http://www.EcoInvent.org/EcoSpold02}intermediateExchange": flow = "intermediateExchangeId" is_biosphere = False elif exc.tag == "{http://www.EcoInvent.org/EcoSpold02}elementaryExchange": flow = "elementaryExchangeId" is_biosphere = True else: print(exc.tag) raise ValueError is_product = hasattr(exc, "outputGroup") and exc.outputGroup.text in ("0", "2") if is_biosphere and is_product: raise ValueError("Impossible output group") if is_product: kind = "production" elif is_biosphere: kind = "biosphere" else: kind = "technosphere" data = { "flow": exc.get(flow), "type": kind, "name": exc.name.text, "classifications": { "CPC": [ o.classificationValue.text for o in exc.iterchildren() if "classification" in o.tag and o.classificationSystem.text == "CPC" ] }, "production volume": float(exc.get("productionVolumeAmount") or 0), "properties": cls.extract_properties(exc), # 'xml': etree.tostring(exc, pretty_print=True) } if not is_biosphere: data["activity"] = exc.get("activityLinkId") if hasattr(exc, "unitName"): data["unit"] = exc.unitName.text if hasattr(exc, "comment"): data["comment"] = exc.comment.text data.update(cls.extract_uncertainty_dict(exc)) return data