import re
import zipfile
from collections import defaultdict
from pathlib import Path
from typing import Any, Optional
import bw2data as bd
import ecoinvent_interface as ei
from ecoinvent_interface.core import SYSTEM_MODELS
from ecoinvent_interface.string_distance import damerau_levenshtein
from .extractors import ExcelExtractor
from .importers import Ecospold2BiosphereImporter, SingleOutputEcospold2Importer
[docs]
def get_excel_sheet_names(file_path: Path) -> list[str]:
"""Read XML metadata file instead of using openpyxl, which loads the whole workbook.
From https://stackoverflow.com/questions/12250024/how-to-obtain-sheet-names-from-xls-files-without-loading-the-whole-file.
"""
sheets = []
with zipfile.ZipFile(file_path, "r") as zip_ref:
xml = zip_ref.read("xl/workbook.xml").decode("utf-8")
for s_tag in re.findall("<sheet [^>]*", xml):
sheets.append(re.search('name="[^"]*', s_tag).group(0)[6:])
return sheets
[docs]
def drop_unspecified(a: str, b: str, c: str) -> tuple:
if c.lower() == "unspecified":
return (a, b)
else:
return (a, b, c)
[docs]
def pick_a_unit_label_already(obj: dict) -> str:
candidates = ("indicator unit", "unit", "unitname", "impact score unit")
for candidate in candidates:
if candidate in obj:
return candidate
raise KeyError("Can't find suitable column label for LCIA units")
[docs]
def import_ecoinvent_release(
version: str,
system_model: str,
username: Optional[str] = None,
password: Optional[str] = None,
lci: bool = True,
lcia: bool = True,
biosphere_name: Optional[str] = None,
biosphere_write_mode: str = "patch",
importer_signal: Any = None,
namespace_lcia_methods: bool = True,
use_mp: bool = True,
separate_products: bool = False,
) -> None:
"""
Import an ecoinvent LCI and/or LCIA release.
Uses [ecoinvent_interface](https://github.com/brightway-lca/ecoinvent_interface).
Auth credentials are optional as they can be set externally (see the
`ecoinvent_interface` documentation), and such permanent storage is highly
recommended.
**DO NOT** run `bw2setup` before using this function - it isn't needed and
will cause broken results.
System model strings follow the ecoinvent unofficial API. They can be given
in a short or long form. The short forms:
* cutoff
* consequential
* apos
* EN15804
And the long forms:
* Allocation cut-off by classification
* Substitution, consequential, long-term
* Allocation at the Point of Substitution
* Allocation, cut-off, EN15804"
Parameters
----------
version
The ecoinvent release version as a string, e.g. '3.9.1'
system_model
The system model as a string in short or long form, e.g. 'apos' or
'Allocation cut-off by classification'
username
ecoinvent username
password
ecoinvent password
lci
Flag on whether to import the inventory database
lcia
Flag on whether to import the LCIA impact categories. The biosphere
database must exist if `lci` is `False`
biosphere_name
Name of database to store biosphere flows. They will be stored in the
main LCI database if not specified.
biosphere_write_mode
How to handle an existing biosphere database. Must be either `replace` or `patch`
importer_signal
Used by the Activity Browser to provide feedback during the import
namespace_lcia_methods
Add ecoinvent version as a prefix to LCIA impact categories, e.g.
`("ecoinvent-3.9.1", "global warming")`. Helps clarify the version intended for use, and
allows for multiple LCIA implementation versions to be installed in parallel
use_mp
Use a multiprocessing pool when importing ecospold2 XML files
Examples
--------
Get ecoinvent 3.9.1 cutoff in a new project (**without** running `bw2setup` first):
>>> import bw2data as bd
>>> import bw2io as bi
>>> bd.projects.set_current("some new project")
>>> bi.import_ecoinvent_release(
... version="3.9.1",
... system_model="cutoff",
... username="XXX",
... password="XXX"",
... )
>>> bd.databases
Databases dictionary with 2 object(s):
ecoinvent-3.9.1-biosphere
ecoinvent-3.9.1-cutoff
>>> len(bd.methods)
762
Add ecoinvent 3.9.1 apos to the same project:
>>> bi.import_ecoinvent_release(
... version="3.9.1",
... system_model="apos",
... username="XXX",
... password="XXX"",
... use_existing_biosphere=True
... )
>>> bd.databases
Databases dictionary with 3 object(s):
ecoinvent-3.9.1-apos
ecoinvent-3.9.1-biosphere
ecoinvent-3.9.1-cutoff
Create a new database but use `biosphere3` for the biosphere database name
and don't add LCIA methods:
>>> bd.projects.set_current("some other project")
>>> bi.import_ecoinvent_release(
... version="3.9.1",
... system_model="cutoff",
... username="XXX",
... password="XXX",
... biosphere_name="biosphere3",
... lcia=False
... )
>>> bd.databases
Databases dictionary with 2 object(s):
biosphere3
ecoinvent-3.9.1-cutoff
>>> len(bd.methods)
0
"""
from . import create_core_migrations, migrations
if not len(migrations):
create_core_migrations()
if username is None and password is None:
settings = ei.Settings()
else:
settings = ei.Settings(username=username, password=password)
if not settings.username or not settings.password:
raise ValueError("Can't determine ecoinvent username or password")
release = ei.EcoinventRelease(settings)
if version not in release.list_versions():
raise ValueError(f"Invalid version {version}")
if system_model in SYSTEM_MODELS:
system_model = SYSTEM_MODELS[system_model]
if system_model not in release.list_system_models(version):
raise ValueError(f"Invalid system model {system_model}")
if biosphere_name is None:
biosphere_name = f"ecoinvent-{version}-biosphere"
if biosphere_write_mode not in ("patch", "replace"):
error = (
"`biosphere_write_mode` must be either `patch` or `replace`;"
+ f" got `{biosphere_write_mode}`"
)
raise ValueError(error)
if lci:
lci_path = release.get_release(
version=version,
system_model=system_model,
release_type=ei.ReleaseType.ecospold,
)
db_name = f"ecoinvent-{version}-{system_model}"
if db_name in bd.databases:
raise ValueError(f"Database {db_name} already exists")
eb = Ecospold2BiosphereImporter(
name=biosphere_name,
filepath=lci_path / "MasterData" / "ElementaryExchanges.xml",
)
eb.apply_strategies()
if not eb.all_linked:
raise ValueError(
f"Can't ingest biosphere database {biosphere_name} - unlinked flows."
)
if biosphere_name not in bd.databases or biosphere_write_mode == "replace":
eb.write_database(overwrite=False)
else:
existing = {flow["code"] for flow in bd.Database(biosphere_name)}
new = [flow for flow in eb.data if flow["code"] not in existing]
if new:
new_list = "\n\t".join(
["{}: {}".format(o["name"], o["categories"]) for o in new]
)
print(
f"Adding {len(new)} biosphere flows to {biosphere_name}:\n\t{new_list}"
)
for flow in new:
if "database" in flow:
del flow["database"]
bd.Database(biosphere_name).new_activity(**flow).save()
bd.preferences["biosphere_database"] = biosphere_name
soup = SingleOutputEcospold2Importer(
dirpath=lci_path / "datasets",
db_name=db_name,
biosphere_database_name=biosphere_name,
signal=importer_signal,
use_mp=use_mp,
separate_products=separate_products,
)
soup.apply_strategies()
if not soup.all_linked:
raise ValueError(
f"Can't ingest inventory database {db_name} - unlinked flows."
)
soup.write_database()
if lcia:
subversion = int(version.split(".")[1])
if subversion < 4:
raise ValueError("LCIA import for versions 3.0-3.3 not supported")
if biosphere_name is None:
biosphere_name = bd.config.biosphere
if biosphere_name not in bd.databases or not len(bd.Database(biosphere_name)):
raise ValueError(
f"Can't find populated biosphere flow database {biosphere_name}"
)
lcia_file = ei.get_excel_lcia_file_for_version(release=release, version=version)
sheet_names = get_excel_sheet_names(lcia_file)
if "units" in sheet_names:
units_sheetname = "units"
elif "Indicators" in sheet_names:
units_sheetname = "Indicators"
else:
raise ValueError(
f"Can't find worksheet for impact category units in {sheet_names}"
)
if "CFs" not in sheet_names:
raise ValueError(
f"Can't find worksheet for characterization factors; expected `CFs`, found {sheet_names}"
)
data = dict(ExcelExtractor.extract(lcia_file))
units = header_dict(data[units_sheetname])
cfs = header_dict(data["CFs"])
CF_COLUMN_LABELS = {
"3.4": "cf 3.4",
"3.5": "cf 3.5",
"3.6": "cf 3.6",
}
cf_col_label = CF_COLUMN_LABELS.get(version, "cf")
units_col_label = pick_a_unit_label_already(units[0])
if namespace_lcia_methods:
units_mapping = {
(
f"ecoinvent-{version}",
row["method"],
row["category"],
row["indicator"],
): row[units_col_label]
for row in units
}
else:
units_mapping = {
(row["method"], row["category"], row["indicator"]): row[units_col_label]
for row in units
}
biosphere_mapping = {}
for flow in bd.Database(biosphere_name):
biosphere_mapping[(flow["name"],) + tuple(flow["categories"])] = flow.id
if flow["name"].startswith("[Deleted]"):
biosphere_mapping[
(flow["name"].replace("[Deleted]", ""),) + tuple(flow["categories"])
] = flow.id
lcia_data_as_dict = defaultdict(list)
unmatched = set()
substituted = set()
for row in cfs:
if namespace_lcia_methods:
impact_category = (
f"ecoinvent-{version}",
row["method"],
row["category"],
row["indicator"],
)
else:
impact_category = (row["method"], row["category"], row["indicator"])
if row[cf_col_label] is None:
continue
try:
lcia_data_as_dict[impact_category].append(
(
biosphere_mapping[
drop_unspecified(
row["name"], row["compartment"], row["subcompartment"]
)
],
float(row[cf_col_label]),
)
)
except KeyError:
# How is this possible? We are matching ecoinvent data against
# ecoinvent data from the same release! And yet it moves...
category = (
(row["compartment"], row["subcompartment"])
if row["subcompartment"].lower() != "unspecified"
else (row["compartment"],)
)
same_context = {
k[0]: v for k, v in biosphere_mapping.items() if k[1:] == category
}
candidates = sorted(
[
(damerau_levenshtein(name, row["name"]), name)
for name in same_context
]
)
if (
candidates[0][0] < 3
and candidates[0][0] != candidates[1][0]
and candidates[0][1][0].lower() == row["name"][0].lower()
):
new_name = candidates[0][1]
pair = (new_name, row["name"])
if pair not in substituted:
print(f"Substituting {new_name} for {row['name']}")
substituted.add(pair)
lcia_data_as_dict[impact_category].append(
(
same_context[new_name],
float(row[cf_col_label]),
)
)
else:
if row["name"] not in unmatched:
print(
"Skipping unmatched flow {}:({}, {})".format(
row["name"], row["compartment"], row["subcompartment"]
)
)
unmatched.add(row["name"])
for key in lcia_data_as_dict:
method = bd.Method(key)
if key not in bd.methods:
method.register(
unit=units_mapping.get(key, "Unknown"),
filepath=str(lcia_file),
ecoinvent_version=version,
database=biosphere_name,
)
method.write(lcia_data_as_dict[key])
else:
existing = bd.Method(key).load()
bd.Method(key).write(existing + lcia_data_as_dict[key])