import contextlib
import os
import pickle
import re
import shutil
import sqlite3
import warnings
from pathlib import Path
import numpy as np
from bw_processing import safe_filename
from tqdm import tqdm
from bw2data import (
Database,
Method,
Normalization,
Weighting,
databases,
methods,
normalizations,
preferences,
projects,
weightings,
)
from bw2data.backends import sqlite3_lci_db
from bw2data.logs import stdout_feedback_logger
[docs]
hash_re = re.compile("^[a-zA-Z0-9]{32}$")
[docs]
is_hash = lambda x: bool(hash_re.match(x))
[docs]
UPDATE_WARNING = "\n\nYour data needs to be updated.\n\n"
[docs]
UPDATE_ACTIVITYDATASET = """
BEGIN;
DROP INDEX IF EXISTS "activitydataset_key";
ALTER TABLE ActivityDataset rename to AD_old;
CREATE TABLE "activitydataset" (
"id" INTEGER NOT NULL PRIMARY KEY,
"database" TEXT NOT NULL,
"code" TEXT NOT NULL,
"data" BLOB NOT NULL,
"location" TEXT,
"name" TEXT,
"product" TEXT,
"type" TEXT
);
INSERT INTO ActivityDataset ("database", "code", "data", "location", "name", "product", "type")
SELECT substr(key, 0, instr(key, '⊡')),
substr("key", instr("key", '⊡') + 1),
"data",
"location",
"name",
"product",
"type"
FROM AD_old;
CREATE UNIQUE INDEX "activitydataset_key" ON "activitydataset" ("database", "code");
DROP TABLE AD_old;
COMMIT;
"""
[docs]
UPDATE_EXCHANGEDATASET = """
BEGIN;
DROP INDEX IF EXISTS "exchangedataset_database";
DROP INDEX IF EXISTS "exchangedataset_input";
DROP INDEX IF EXISTS "exchangedataset_output";
ALTER TABLE ExchangeDataset rename to ED_old;
CREATE TABLE "exchangedataset" (
"id" INTEGER NOT NULL PRIMARY KEY,
"data" BLOB NOT NULL,
"input_database" TEXT NOT NULL,
"input_code" TEXT NOT NULL,
"output_database" TEXT NOT NULL,
"output_code" TEXT NOT NULL,
"type" TEXT NOT NULL
);
INSERT INTO ExchangeDataset ("data", "input_database", "input_code", "output_database", "output_code", "type")
SELECT "data",
substr("input", 0, instr("input", '⊡')),
substr("input", instr("input", '⊡') + 1),
substr("output", 0, instr("output", '⊡')),
substr("output", instr("output", '⊡') + 1),
"type"
FROM ED_old;
CREATE INDEX "exchangedataset_input" ON "exchangedataset" ("input_database", "input_code");
CREATE INDEX "exchangedataset_output" ON "exchangedataset" ("output_database", "output_code");
DROP TABLE ED_old;
COMMIT;
"""
[docs]
class Updates:
[docs]
UPDATES = {
"2.0 schema change": {
"method": "schema_change_20_compound_keys",
"explanation": "",
"automatic": True,
},
"2.0-2 database search directories": {
"method": "database_search_directories_20",
"automatic": True,
"explanation": "",
},
"2.3 processed data format": {
"method": "processed_data_format_change_23",
"automatic": True,
"explanation": "",
},
"4.0 new processed format": {
"method": "expire_all_processed_data_40",
"automatic": True,
"explanation": "bw2data 4.0 release requires all database be reprocessed",
},
"4.0 migrations filename change": {
"method": "fix_migrations_filename",
"automatic": True,
"explanation": "bw2data 4.0 release requires migrations filename changes",
},
"4.0 database search directories FTS5": {
"method": "database_search_directories_40",
"automatic": True,
"explanation": "bw2data 4.0 release switched to a new database search implementation",
},
"4.7 database dependencies in datapackage": {
"method": "expire_all_processed_data_47",
"automatic": True,
"explanation": "bw2data 4.7 adds database_dependencies to datapackage metadata; all databases must be reprocessed",
},
}
@classmethod
[docs]
def explain(cls, key):
return cls.UPDATES[key]["explanation"]
@classmethod
[docs]
def do_update(cls, key):
method = getattr(cls, cls.UPDATES[key]["method"])
method()
preferences["updates"][key] = True
preferences.flush()
@classmethod
[docs]
def check_status(cls, verbose=True):
"""Check if updates need to be applied.
Returns:
List of needed updates (strings), if any.
"""
cls.set_initial_updates()
updates = sorted(
[
key
for key in cls.UPDATES
if not preferences["updates"].get(key) and not cls.UPDATES[key]["automatic"]
]
)
if updates and verbose:
warnings.warn(UPDATE_WARNING)
return updates
@classmethod
[docs]
def set_initial_updates(cls):
if "updates" in preferences:
return
SQL = "PRAGMA table_info(activitydataset)"
column_names = {x[1] for x in sqlite3_lci_db.execute_sql(SQL)}
if "code" in column_names:
preferences["updates"] = {key: True for key in cls.UPDATES}
else:
preferences["updates"] = {}
@classmethod
[docs]
def check_automatic_updates(cls):
"""Get list of automatic updates to be applied"""
cls.set_initial_updates()
return sorted(
[
key
for key in cls.UPDATES
if not preferences["updates"].get(key) and cls.UPDATES[key]["automatic"]
]
)
@classmethod
[docs]
def reprocess_all_1_0(cls):
"""1.0: Reprocess all to make sure default 'loc' value inserted when not specified."""
cls._reprocess_all()
@classmethod
[docs]
def schema_change_20_compound_keys(cls):
with contextlib.closing(sqlite3.connect(sqlite3_lci_db.db.database)) as conn:
stdout_feedback_logger.info("Update ActivityDataset table schema and data")
conn.executescript(UPDATE_ACTIVITYDATASET)
stdout_feedback_logger.info("Updating ExchangeDataset table schema and data")
conn.executescript(UPDATE_EXCHANGEDATASET)
stdout_feedback_logger.info("Finished with schema change")
@classmethod
[docs]
def database_search_directories_20(cls):
shutil.rmtree(projects.request_directory("whoosh"))
# TBD: Delete existing search index if available
for db in databases:
if databases[db].get("searchable"):
databases[db]["searchable"] = False
stdout_feedback_logger.info("Reindexing database {}".format(db))
Database(db).make_searchable()
@classmethod
[docs]
def database_search_directories_40(cls):
shutil.rmtree(projects.request_directory("whoosh"))
shutil.rmtree(projects.request_directory("search"))
for db in databases:
if databases[db].get("searchable"):
databases[db]["searchable"] = False
stdout_feedback_logger.info("Reindexing database {}".format(db))
Database(db).make_searchable()
@classmethod
@classmethod
[docs]
def expire_all_processed_data_40(cls):
cls._reprocess_all()
@classmethod
[docs]
def expire_all_processed_data_47(cls):
cls._reprocess_all()
@classmethod
[docs]
def fix_migrations_filename(cls):
""" "Fix migration data filenames to use shorter hash.
See https://github.com/brightway-lca/brightway2-io/issues/115"""
try:
import bw2io as bi
except ImportError:
stdout_feedback_logger.warning(
"`bw2io` not installed; not updating `migrations` filenames"
)
return
missing = []
for name in bi.migrations:
current = Path(
projects.request_directory("migrations")
/ (safe_filename(name, full=True) + ".json")
)
if not current.is_file():
missing.append(name)
continue
target = Path(
projects.request_directory("migrations") / (safe_filename(name) + ".json")
)
current.replace(target)
if missing:
for name in missing:
stdout_feedback_logger.warning(f"Purging migration with missing data: {name}")
del bi.migrations[name]
bi.migrations.flush()
@classmethod
[docs]
def _reprocess_all(cls):
objects = [
(methods, Method, "LCIA methods"),
(weightings, Weighting, "LCIA weightings"),
(normalizations, Normalization, "LCIA normalizations"),
(databases, Database, "LCI databases"),
]
for meta, klass, name in objects:
if meta.list:
stdout_feedback_logger.info("Updating all %s" % name)
for index, key in tqdm(enumerate(meta)):
obj = klass(key)
obj.process()
# Free memory
obj = None