import hashlib
import os
import warnings
from collections import defaultdict
from functools import partial
import bw2data as bd
import fiona
import geopandas as gp
import numpy as np
from scipy.sparse import coo_matrix
from shapely.geometry import shape
from bw2regional import geocollections
[docs]
def add_attributes(dct, func, row_index, col_index):
if func is None:
return dct
else:
dct.update(func(row_index, col_index))
return dct
[docs]
def unplottable(key):
return key == "GLO" or (isinstance(key, tuple) and key[0] == "RoW")
[docs]
def create_geodataframe(
matrix,
used_geocollections,
row_dict,
col_dict,
spatial_dim="col",
attribute_adder=None,
cutoff=None,
):
if cutoff is not None and not (0 < cutoff < 1):
warnings.warn(f"Ignoring invalid cutoff value {cutoff}")
cutoff = None
if not isinstance(matrix, coo_matrix):
matrix = matrix.tocoo()
total = matrix.sum()
include = lambda x: x / total >= cutoff if cutoff is not None else True
geom_mapping = {}
for gc in used_geocollections:
try:
field = geocollections[gc]["field"]
gdf = gp.read_file(geocollections[gc]["filepath"])
for _, row in gdf.iterrows():
if gc != "world":
geom_mapping[(gc, row[field])] = row.geometry
else:
geom_mapping[row[field]] = row.geometry
except KeyError:
pass
reversed_geomapping = {v: k for k, v in bd.geomapping.items()}
if spatial_dim == "row":
spatial_dict = row_dict
spatial_index = lambda x, y: x
else:
spatial_dict = col_dict
spatial_index = lambda x, y: y
return gp.GeoDataFrame(
[
add_attributes(
{
"row_id": int(row),
"row_index": row_dict.reversed[row],
"col_id": int(col),
"col_index": col_dict.reversed[col],
"score_abs": value,
"score_rel": value / total,
"location_key": str(
reversed_geomapping[
spatial_dict.reversed[spatial_index(row, col)]
]
),
"geometry": geom_mapping[
reversed_geomapping[
spatial_dict.reversed[spatial_index(row, col)]
]
],
},
attribute_adder,
row_dict.reversed[row],
col_dict.reversed[col],
)
for row, col, value in zip(matrix.row, matrix.col, matrix.data)
if include(value)
and not unplottable(
reversed_geomapping[spatial_dict.reversed[spatial_index(row, col)]]
)
]
)
[docs]
def _generic_exporter(
lca,
geocollection,
filepath,
spatial_dict,
spatial_func,
score_column_absolute="score_abs",
score_column_relative="score_rel",
cutoff=1e-3,
):
from bw2regional.lca.base_class import RegionalizationBase
assert isinstance(lca, RegionalizationBase)
assert geocollection in geocollections
assert hasattr(lca, spatial_dict)
assert os.path.isfile(geocollections[geocollection].get("filepath"))
assert not os.path.isfile(filepath)
vector = np.ravel(getattr(lca, spatial_func)().sum(axis=0))
lca.fix_spatial_dictionaries()
total = lca.score
cut = abs(lca.score * cutoff)
field = geocollections[geocollection].get("field")
assert field
# TODO: Might need to make this nicer/more robust
if not filepath.endswith(".geojson"):
filepath += ".geojson"
if geocollection == "world":
# Special case; "world" is just a string, not a tuple
results = {
spatial_key: vector[index]
for spatial_key, index in getattr(lca, spatial_dict).items()
if abs(vector[index]) >= cut
}
else:
results = {
spatial_key[1]: vector[index]
for spatial_key, index in getattr(lca, spatial_dict).items()
if abs(vector[index]) >= cut
}
with fiona.Env():
with fiona.open(geocollections[geocollection]["filepath"]) as source:
meta = source.meta
meta["driver"] = "GeoJSON"
meta["schema"]["properties"].update(
{score_column_absolute: "float", score_column_relative: "float"}
)
with fiona.open(filepath, "w", **meta) as sink:
for feature in source:
try:
score = results[feature["properties"][field]]
feature["properties"][score_column_absolute] = score
feature["properties"][score_column_relative] = abs(
score / total
)
sink.write(feature)
except KeyError:
continue
[docs]
as_inv_spatial_scale = partial(
_generic_exporter,
spatial_dict="inv_spatial_dict",
spatial_func="results_inv_spatial_scale",
)
[docs]
as_ia_spatial_scale = partial(
_generic_exporter,
spatial_dict="ia_spatial_dict",
spatial_func="results_ia_spatial_scale",
)
[docs]
as_xt_spatial_scale = partial(
_generic_exporter,
spatial_dict="xtable_spatial_dict",
spatial_func="results_xtable_spatial_scale",
)
[docs]
def _hash_feature(feature):
"""Calculate SHA256 hash of feature geometry as WKT"""
geom = shape(feature["geometry"])
return hashlib.sha256(geom.wkt.encode("utf-8")).hexdigest()
[docs]
def add_two_geojson_results(
first,
second,
output_filepath,
first_column_name="score_abs",
second_column_name="score_abs",
score_column_absolute="score_abs",
score_column_relative="score_rel",
cutoff=1e-4,
):
"""Sum results from two regionalized LCA calculations on the same spatial scale."""
results, features = defaultdict(float), {}
# TODO: Might need to make this nicer/more robust
if not output_filepath.endswith(".geojson"):
output_filepath += ".geojson"
with fiona.open(first, "r") as obj:
meta = obj.meta
for feature in obj:
hashed = _hash_feature(feature)
features[hashed] = feature
results[hashed] += feature["properties"][first_column_name]
with fiona.open(second, "r") as obj:
for feature in obj:
hashed = _hash_feature(feature)
features[hashed] = feature
results[hashed] += feature["properties"][second_column_name]
total = sum(results.values())
meta["driver"] = "GeoJSON"
meta["schema"]["properties"].update(
{score_column_absolute: "float", score_column_relative: "float"}
)
with fiona.open(output_filepath, "w", **meta) as sink:
for k, feature in features.items():
score = results[k]
if abs(score) < abs(total * cutoff):
continue
feature["properties"][score_column_absolute] = score
feature["properties"][score_column_relative] = abs(score / total)
sink.write(feature)