Source code for bw2io.extractors.ecospold1
import math
import multiprocessing
import os
from io import StringIO
from pathlib import Path
from typing import Any, Optional, Union
import numpy as np
import pyecospold
from lxml import etree
from stats_arrays.distributions import (
LognormalUncertainty,
NormalUncertainty,
TriangularUncertainty,
UndefinedUncertainty,
UniformUncertainty,
)
from tqdm import tqdm
[docs]
def robust_text(root: etree.ElementBase, attribute: str) -> Optional[str]:
"""Just because the spec says it must be there doesn't mean it will be."""
try:
return getattr(root, attribute).text
except AttributeError:
return None
[docs]
def robust_nested_attribute(root: etree.ElementBase, attr1: str, attr2: str) -> Any:
"""Try to get nested attribute, and fail gracefully."""
try:
first_level = getattr(root, attr1)
if first_level is None:
return None
return getattr(first_level, attr2)
except AttributeError:
return None
[docs]
class Ecospold1DataExtractor:
@classmethod
[docs]
def extract(
cls, path: Union[str, Path, StringIO], db_name: str, use_mp: bool = True
):
"""
Extract data from ecospold1 files.
Parameters
----------
path : str
Path to the directory containing the ecospold1 files or path to a single file.
db_name : str
Name of the database.
use_mp : bool, optional
If True, uses multiprocessing to parallelize extraction of data from multiple files, by default True.
Returns
-------
list
List of dictionaries containing data from the ecospold1 files.
"""
data = []
if os.path.isdir(path):
filelist = [
os.path.join(path, filename)
for filename in os.listdir(path)
if filename[-4:].lower() == ".xml"
# Skip SimaPro-specific flow list
and filename != "ElementaryFlows.xml"
]
else:
filelist = [path]
if not filelist:
raise OSError("Provided path doesn't appear to have any XML files")
if use_mp:
with multiprocessing.Pool(processes=multiprocessing.cpu_count()) as pool:
print("Extracting XML data from {} datasets".format(len(filelist)))
results = [
pool.apply_async(
Ecospold1DataExtractor.process_file, args=(x, db_name)
)
for x in filelist
]
data = [x for p in results for x in p.get() if x]
else:
data = []
for index, filepath in enumerate(tqdm(filelist)):
for x in cls.process_file(filepath, db_name):
if x:
data.append(x)
return data
@classmethod
[docs]
def process_file(cls, filepath: Union[str, Path, StringIO], db_name: str):
"""
Process a single ecospold1 file.
Parameters
----------
filepath : str
Path to the ecospold1 file.
db_name : str
Name of the database.
Returns
-------
list
List of dictionaries containing data from the ecospold1 file.
"""
root = pyecospold.parse_file_v1(filepath)
data = []
for dataset in root.datasets:
if dataset.tag == "comment":
continue
data.append(cls.process_dataset(dataset, filepath, db_name))
return data
@classmethod
[docs]
def process_dataset(
cls,
dataset: pyecospold.model_v1.Dataset,
filename: Union[str, Path, StringIO],
db_name: str,
):
MI = dataset.metaInformation
PI = MI.processInformation
RF = PI.referenceFunction
MV = MI.modellingAndValidation
comments = {
"generalComment": RF.generalComment,
"includedProcesses": RF.includedProcesses,
"location": "Location: " + PI.geography.text,
"technology": "Technology: " + PI.technology.text,
"timePeriod": "Time period: " + PI.timePeriod.text,
"productionVolume": "Production volume: "
+ (
robust_nested_attribute(MV, "representativeness", "productionVolume")
or ""
),
"sampling": "Sampling: "
+ (
robust_nested_attribute(MV, "representativeness", "samplingProcedure")
or ""
),
"extrapolations": "Extrapolations: "
+ (
robust_nested_attribute(MV, "representativeness", "extrapolations")
or ""
),
"uncertaintyAdjustments": "Uncertainty adjustments: "
+ (
robust_nested_attribute(
MV, "representativeness", "uncertaintyAdjustments"
)
or ""
),
}
def get_authors():
AI = MI.administrativeInformation
PERSON_FIELDS = [
("address", "address"),
("company", "companyCode"),
("country", "countryCode"),
("email", "email"),
("name", "name"),
]
people = {
person.number: {a: getattr(person, b, "") for a, b in PERSON_FIELDS}
for person in AI.persons
}
data = {
"data_entry": people[AI.dataEntryBy.person],
}
# Good, good, let the hate flow through you
unique_people = {}
for person in people.values():
if not any(person == other for other in unique_people.values()):
unique_people[len(unique_people) + 1] = person
for k, v in unique_people.items():
# Because we added the *same* dict to `data_entry`, this
# also gets the correct identifier there.
v["identifier"] = k
data["people"] = list(unique_people.values())
# We don't extract the `dataGeneratorAndPublication` tag because
# it is insane; there is only one but we have multiple publications,
# and implementing software puts in garbage anyway
return data
data = {
"tags": [
("ecoSpold01datasetRelatesToProduct", RF.datasetRelatesToProduct),
("ecoSpold01infrastructureProcess", RF.infrastructureProcess),
("ecoSpold01infrastructureIncluded", RF.infrastructureIncluded),
("ecoSpold01localName", RF.localName),
("ecoSpold01localCategory", RF.localCategory),
("ecoSpold01localSubCategory", RF.localSubCategory),
("ecoSpold01category", RF.category),
("ecoSpold01subCategory", RF.subCategory),
("ecoSpold01includedProcesses", RF.includedProcesses),
(
"ecoSpold01dataValidForEntirePeriod",
PI.timePeriod.dataValidForEntirePeriod,
),
# Get string representation instead of converting to native
# date type
("ecoSpold01endDate", PI.timePeriod.endDate.strftime("%Y-%m-%d")),
("ecoSpold01startDate", PI.timePeriod.startDate.strftime("%Y-%m-%d")),
("ecoSpold01type", PI.dataSetInformation.type),
(
"ecoSpold01impactAssessmentResult",
PI.dataSetInformation.impactAssessmentResult,
),
("ecoSpold01version", PI.dataSetInformation.version),
(
"ecoSpold01internalVersion",
PI.dataSetInformation.internalVersion,
),
("ecoSpold01timestamp", PI.dataSetInformation.timestamp.isoformat()),
("ecoSpold01languageCode", PI.dataSetInformation.languageCode),
(
"ecoSpold01localLanguageCode",
PI.dataSetInformation.localLanguageCode,
),
("ecoSpold01energyValues", PI.dataSetInformation.energyValues),
],
"references": [
{
"identifier": source.number,
"type": source.sourceTypeStr,
# additional authors supposed to be split by comma, but comma
# also used in first/last names, so can split names.
# Just add as long string
"authors": [source.firstAuthor, source.additionalAuthors],
"year": source.year,
"title": source.title,
"pages": source.pageNumbers,
"editors": source.nameOfEditors,
"anthology": source.titleOfAnthology,
"place_of_publication": source.placeOfPublications,
"publisher": source.publisher,
"journal": source.journal,
"volume": source.volumeNo,
"issue": source.issueNo,
"text": source.text,
}
for source in MV.sources
],
"categories": [RF.get("category"), RF.get("subCategory")],
"code": int(dataset.get("number")),
"comment": "\n".join(text for text in comments.values() if text),
"comments": comments,
"authors": get_authors(),
"database": db_name,
"exchanges": cls.process_exchanges(dataset),
"filename": (
Path(filename).name
if not isinstance(filename, StringIO)
else "StringIO"
),
"location": PI.geography.location,
"name": RF.name.strip(),
"unit": RF.unit,
"type": "process",
}
allocation_exchanges = [
exc for exc in data["exchanges"] if exc.get("reference")
]
if allocation_exchanges:
data["allocations"] = allocation_exchanges
data["exchanges"] = [exc for exc in data["exchanges"] if exc.get("type")]
return data
@classmethod
[docs]
def process_exchanges(cls, dataset):
data = []
# Skip definitional exchange - we assume this already
for exc in dataset.flowData.exchanges:
data.append(cls.process_exchange(exc, dataset))
for exc in dataset.flowData.allocations:
data.append(cls.process_allocation(exc, dataset))
return data
@classmethod
[docs]
def process_allocation(cls, exc, dataset):
return {
"reference": int(exc.get("referenceToCoProduct")),
"fraction": float(exc.get("fraction")),
"exchanges": [
int(c.text) for c in exc.iterchildren() if c.tag != "comment"
],
}
@classmethod
[docs]
def process_exchange(cls, exc, dataset):
"""Process exchange.
Input groups are:
1. Materials/fuels
2. Electricity/Heat
3. Services
4. FromNature
5. FromTechnosphere
Output groups are:
0. Reference product
1. Include avoided product system
2. Allocated byproduct
3. Waste to treatment
4. ToNature
A single-output process will have one output group 0; A MO process will have multiple output group 2s. Output groups 1 and 3 are not used in ecoinvent.
"""
if exc.groupsStr[0] in (
"ReferenceProduct",
"Allocated by product",
"WasteToTreatment",
):
kind = "production"
elif exc.groupsStr[0] == "Include avoided product system":
kind = "substitution"
elif exc.groupsStr[0] == "ToNature":
kind = "biosphere"
elif exc.groupsStr[0] in (
"Materials/Fuels",
"Electricity/Heat",
"Services",
"FromTechnosphere",
):
kind = "technosphere"
elif exc.groupsStr[0] == "FromNature":
kind = "biosphere" # Resources
else:
raise ValueError("Can't understand exchange group {}".format(exc.groupsStr))
data = {
"code": int(exc.number or 0),
"categories": (exc.get("category"), exc.get("subCategory")),
"location": exc.location,
"unit": exc.unit,
"name": exc.name.strip(),
"type": kind,
"infrastructureProcess": exc.infrastructureProcess,
}
if exc.generalComment:
data["comment"] = exc.generalComment
if exc.CASNumber:
data["CAS number"] = exc.CASNumber
if exc.formula:
data["chemical formula"] = exc.formula
if exc.referenceToSource:
data["source_reference"] = exc.referenceToSource
if exc.pageNumbers:
data["pages"] = exc.pageNumbers
return cls.process_uncertainty_fields(exc, data)
@classmethod
[docs]
def process_uncertainty_fields(cls, exc, data):
uncertainty = int(exc.get("uncertaintyType", 0))
def floatish(x):
try:
return float(x.strip())
except:
return np.nan
mean = floatish(exc.get("meanValue"))
min_ = floatish(exc.get("minValue"))
max_ = floatish(exc.get("maxValue"))
sigma = floatish(exc.get("standardDeviation95"))
if uncertainty == 1 and sigma in (0, 1):
# Bad data
uncertainty = 0
if uncertainty == 1:
# Lognormal
data.update(
{
"uncertainty type": LognormalUncertainty.id,
"amount": float(mean),
"loc": np.log(np.abs(mean)),
"scale": math.log(math.sqrt(float(sigma))),
"negative": mean < 0,
}
)
if np.isnan(data["scale"]) or mean == 0:
# Bad data
data["uncertainty type"] = UndefinedUncertainty.id
data["loc"] = data["amount"]
del data["scale"]
elif uncertainty == 2:
# Normal
data.update(
{
"uncertainty type": NormalUncertainty.id,
"amount": float(mean),
"loc": float(mean),
"scale": float(sigma) / 2,
}
)
elif uncertainty == 3:
# Triangular
data.update(
{
"uncertainty type": TriangularUncertainty.id,
"minimum": float(min_),
"maximum": float(max_),
}
)
# Sometimes this isn't included (though it SHOULD BE)
if exc.get("mostLikelyValue"):
mode = floatish(exc.get("mostLikelyValue"))
data["amount"] = data["loc"] = mode
else:
data["amount"] = data["loc"] = float(mean)
elif uncertainty == 4:
# Uniform
data.update(
{
"uncertainty type": UniformUncertainty.id,
"amount": float(mean),
"minimum": float(min_),
"maximum": float(max_),
}
)
else:
# None
data.update(
{
"uncertainty type": UndefinedUncertainty.id,
"amount": float(mean),
"loc": float(mean),
}
)
return data