Source code for bw2io.strategies.csv

import math
import re

import numpy as np

from ..errors import StrategyError


[docs] def csv_restore_tuples(data): """ Convert tuple-like strings to actual tuples. Parameters ---------- data : list of dict A list of datasets. Returns ------- list of dict A list of datasets with tuples restored from string. Examples -------- >>> data = [{'categories': 'category1::category2'}, {'exchanges': [{'categories': 'category3::category4', 'amount': '10.0'}]}] >>> csv_restore_tuples(data) [{'categories': ('category1', 'category2')}, {'exchanges': [{'categories': ('category3', 'category4'), 'amount': '10.0'}]}] """ _ = lambda x: tuple(x.split("::")) if "::" in x else x for ds in data: for key, value in ds.items(): if isinstance(value, str): ds[key] = _(value) if key == "categories" and isinstance(ds[key], str): ds[key] = (ds[key],) for exc in ds.get("exchanges", []): for key, value in exc.items(): if isinstance(value, str): exc[key] = _(value) if key == "categories" and isinstance(exc[key], str): exc[key] = (exc[key],) return data
[docs] def csv_restore_booleans(data): """ Convert boolean-like strings to booleans where possible. Parameters ---------- data : list of dict A list of datasets. Returns ------- list of dict A list of datasets with booleans restored. Examples -------- >>> data = [{'categories': 'category1', 'is_animal': 'true'}, {'exchanges': [{'categories': 'category2', 'amount': '10.0', 'uncertainty type': 'undefined', 'is_biomass': 'False'}]}] >>> csv_restore_booleans(data) [{'categories': 'category1', 'is_animal': True}, {'exchanges': [{'categories': 'category2', 'amount': '10.0', 'uncertainty type': 'undefined', 'is_biomass': False}]}] """ def _(x): if x.lower() == "true": return True elif x.lower() == "false": return False else: return x for ds in data: for key, value in ds.items(): if isinstance(value, str): ds[key] = _(value) for exc in ds.get("exchanges", []): for key, value in exc.items(): if isinstance(value, str): exc[key] = _(value) return data
[docs] def csv_numerize(data): """ Convert string values to float or int where possible Parameters ---------- data : list of dict A list of datasets. Returns ------- list of dict A list of datasets with string values converted to float or int where possible. Examples -------- >>> data = [{'amount': '10.0'}, {'exchanges': [{'amount': '20', 'uncertainty type': 'undefined'}]}] >>> csv_numerize(data) [{'amount': 10.0}, {'exchanges': [{'amount': 20, 'uncertainty type': 'undefined'}]}] """ def _(x): try: return float(x) except: return x for ds in data: for key, value in ds.items(): if isinstance(value, str): ds[key] = _(value) for exc in ds.get("exchanges", []): for key, value in exc.items(): if isinstance(value, str): exc[key] = _(value) return data
[docs] def csv_drop_unknown(data): """ Remove any keys whose values are `(Unknown)`. Parameters ---------- data : list[dict] A list of dictionaries, where each dictionary represents a row of data. Returns ------- list[dict] The updated list of dictionaries with `(Unknown)` values removed from the keys. Examples -------- >>> data = [ {"name": "John", "age": 30, "gender": "(Unknown)"}, {"name": "Alice", "age": 25, "gender": "Female"}, {"name": "Bob", "age": 40, "gender": "Male"} ] >>> csv_drop_unknown(data) [ {"name": "Alice", "age": 25, "gender": "Female"}, {"name": "Bob", "age": 40, "gender": "Male"} ] """ _ = lambda x: None if x == "(Unknown)" else x data = [{k: v for k, v in ds.items() if v != "(Unknown)"} for ds in data] for ds in data: if "exchanges" in ds: ds["exchanges"] = [ {k: v for k, v in exc.items() if v != "(Unknown)"} for exc in ds["exchanges"] ] return data
[docs] def csv_add_missing_exchanges_section(data): """ Add an empty `exchanges` section to any dictionary in `data` that doesn't already have one. Parameters ---------- data: list of dict A list of dictionaries, where each dictionary represents a row of data. Returns ------- list[dict] The updated list of dictionaries with an empty `exchanges` section added to any dictionary that doesn't already have one. Examples -------- >>> data = [ {"name": "John", "age": 30}, {"name": "Alice", "age": 25, "exchanges": []}, {"name": "Bob", "age": 40, "exchanges": [{"name": "NYSE"}]} ] >>> csv_add_missing_exchanges_section(data) [ {"name": "John", "age": 30, "exchanges": []}, {"name": "Alice", "age": 25, "exchanges": []}, {"name": "Bob", "age": 40, "exchanges": [{"name": "NYSE"}]} ] """ for ds in data: if "exchanges" not in ds: ds["exchanges"] = [] return data
[docs] def _is_blank(value): if value is None: return True if isinstance(value, str) and value.strip() == "": return True if isinstance(value, (list, tuple)) and len(value) == 0: return True return False
[docs] def _normalize_kind(value): if not isinstance(value, str): return None value = value.strip().lower() if value in {"delta", "relative", "timedelta64"}: return "delta" if value in {"abs", "absolute", "datetime64"}: return "abs" if value in {"easy_timedelta_distribution", "easy_timedelta", "easy_td"}: return "easy_timedelta_distribution" if value in {"easy_datetime_distribution", "easy_datetime", "easy_dt"}: return "easy_datetime_distribution" return None
[docs] def _normalize_resolution(value, exc): if not isinstance(value, str): raise StrategyError( "Invalid resolution '{}' in exchange {}".format( value, exc.get("name", "<unknown>") ) ) value = value.strip() if not value: raise StrategyError( "Empty resolution in exchange {}".format(exc.get("name", "<unknown>")) ) if len(value) == 1: lower = value.lower() if lower == "y": return "Y" if lower == "d": return "D" if lower == "h": return "h" if lower == "s": return "s" if lower == "m": # Lowercase 'm' β†’ minutes; uppercase 'M' β†’ months return "m" if value == "m" else "M" return value
[docs] def _parse_sequence(value, field, exc): if isinstance(value, (list, tuple)): seq = list(value) elif isinstance(value, str): if "," in value: seq = [s.strip() for s in value.split(",") if s.strip() != ""] elif "::" in value: seq = [s.strip() for s in value.split("::") if s.strip() != ""] else: seq = [value.strip()] else: seq = [value] if not seq: raise StrategyError( "Field '{}' is empty in exchange {}".format( field, exc.get("name", "<unknown>") ) ) return seq
[docs] def _coerce_int_list(seq, field, exc): values = [] for item in seq: if isinstance(item, bool): raise StrategyError( "Invalid integer value '{}' in field '{}' for exchange {}".format( item, field, exc.get("name", "<unknown>") ) ) if isinstance(item, (int, np.integer)): values.append(int(item)) continue if isinstance(item, float): if item.is_integer(): values.append(int(item)) continue raise StrategyError( "Invalid integer value '{}' in field '{}' for exchange {}".format( item, field, exc.get("name", "<unknown>") ) ) if isinstance(item, str): if re.match(r"^-?\d+$", item.strip()): values.append(int(item.strip())) continue raise StrategyError( "Invalid integer value '{}' in field '{}' for exchange {}".format( item, field, exc.get("name", "<unknown>") ) ) return values
[docs] def _coerce_float_list(seq, field, exc): values = [] for item in seq: try: values.append(float(item)) except (ValueError, TypeError): raise StrategyError( "Invalid float value '{}' in field '{}' for exchange {}".format( item, field, exc.get("name", "<unknown>") ) ) return values
[docs] def _normalize_abs_dates(seq, resolution, exc): resolution = resolution.strip() if resolution == "Y": normalized = [] for item in seq: if isinstance(item, bool): raise StrategyError( "Invalid year value '{}' in exchange {}".format( item, exc.get("name", "<unknown>") ) ) if isinstance(item, (int, np.integer)): year = str(int(item)) elif isinstance(item, float) and item.is_integer(): year = str(int(item)) elif isinstance(item, str): year = item.strip() else: raise StrategyError( "Invalid year value '{}' in exchange {}".format( item, exc.get("name", "<unknown>") ) ) if not re.match(r"^\d{4}$", year): raise StrategyError( "Year value '{}' does not match YYYY format in exchange {}".format( year, exc.get("name", "<unknown>") ) ) normalized.append(year) return normalized if resolution == "M": normalized = [] for item in seq: if not isinstance(item, str): raise StrategyError( "Month value '{}' must be a string in exchange {}".format( item, exc.get("name", "<unknown>") ) ) item = item.strip() match = re.match(r"^(0?[1-9]|1[0-2])-(\d{4})$", item) if not match: raise StrategyError( "Month value '{}' does not match M-YYYY format in exchange {}".format( item, exc.get("name", "<unknown>") ) ) month = int(match.group(1)) year = match.group(2) normalized.append("{}-{:02d}".format(year, month)) return normalized if resolution == "D": normalized = [] for item in seq: if not isinstance(item, str): raise StrategyError( "Day value '{}' must be a string in exchange {}".format( item, exc.get("name", "<unknown>") ) ) item = item.strip() match = re.match(r"^(0?[1-9]|[12][0-9]|3[01])-(0?[1-9]|1[0-2])-(\d{4})$", item) if not match: raise StrategyError( "Day value '{}' does not match D-M-YYYY format in exchange {}".format( item, exc.get("name", "<unknown>") ) ) day = int(match.group(1)) month = int(match.group(2)) year = match.group(3) normalized.append("{}-{:02d}-{:02d}".format(year, month, day)) return normalized # For other resolutions (h, m, s, etc.), require ISO-formatted strings normalized = [] for item in seq: if not isinstance(item, str): raise StrategyError( "Date value '{}' must be a string for resolution '{}' in exchange {}".format( item, resolution, exc.get("name", "<unknown>") ) ) normalized.append(item.strip()) return normalized
[docs] def csv_restore_temporal_distributions(data): """ Reconstruct TemporalDistribution objects from exchange row columns. Accepts both ``temporal_distribution`` (underscore) and ``temporal distribution`` (space) as the key name; the underscore form takes precedence when both are present. Expected exchange fields: - ``temporal_distribution``: one of ``delta`` / ``relative`` / ``timedelta64``, ``abs`` / ``absolute`` / ``datetime64``, ``easy_timedelta_distribution`` (aliases: ``easy_timedelta``, ``easy_td``), or ``easy_datetime_distribution`` (aliases: ``easy_datetime``, ``easy_dt``). - ``date``: list/tuple or comma-separated string of offsets (delta) or formatted date strings (abs). - ``value``: list/tuple or comma-separated string of floats. Rescaled to sum to 1 if necessary. - ``resolution``: numpy time-unit code such as ``Y``, ``M``, ``D``, ``h``, ``m``, ``s``. Case is significant for ``m`` (minutes) vs ``M`` (months); all other single-letter codes are case-insensitive. - For ``easy_timedelta_distribution``: also ``start``, ``end``, ``steps``, ``resolution``; and optionally ``td_kind``, ``td_param``. - For ``easy_datetime_distribution``: also ``start``, ``end``, ``steps``. Raises ------ StrategyError On any validation failure (unknown kind, missing fields, bad values, mismatched lengths, zero-sum amounts). StrategyError If ``bw_temporalis`` is not installed and a temporal distribution is encountered. """ try: from bw_temporalis import ( TemporalDistribution, easy_datetime_distribution, easy_timedelta_distribution, ) except Exception: TemporalDistribution = None easy_timedelta_distribution = None easy_datetime_distribution = None for ds in data: for exc in ds.get("exchanges", []): # Normalise the space-variant key; underscore form wins if both present. if "temporal distribution" in exc: if "temporal_distribution" not in exc: exc["temporal_distribution"] = exc.pop("temporal distribution") else: del exc["temporal distribution"] kind = _normalize_kind(exc.get("temporal_distribution")) if not kind: if not _is_blank(exc.get("temporal_distribution")): raise StrategyError( "Unknown temporal_distribution value '{}' in exchange {}".format( exc.get("temporal_distribution"), exc.get("name", "<unknown>"), ) ) continue if TemporalDistribution is None: raise StrategyError( "Temporal distributions require `bw_temporalis` to be installed" ) if kind == "easy_timedelta_distribution": required = ("start", "end", "steps", "td_kind", "td_param", "resolution") elif kind == "easy_datetime_distribution": required = ("start", "end", "steps") else: required = ("date", "value", "resolution") missing = [k for k in required if k not in exc] if missing: raise StrategyError( "Missing required temporal distribution fields {} in exchange {}".format( missing, exc.get("name", "<unknown>") ) ) if kind == "easy_timedelta_distribution": resolution = _normalize_resolution(exc.get("resolution"), exc) try: start = int(exc.get("start")) end = int(exc.get("end")) steps = int(exc.get("steps")) except (ValueError, TypeError): raise StrategyError( "Invalid start/end/steps values in exchange {}".format( exc.get("name", "<unknown>") ) ) td_kind = exc.get("td_kind") td_param = exc.get("td_param") if _is_blank(td_kind): td_kind = "uniform" if _is_blank(td_param): td_param = None else: try: td_param = float(td_param) except (ValueError, TypeError): raise StrategyError( "Invalid td_param '{}' in exchange {}".format( td_param, exc.get("name", "<unknown>") ) ) try: td_obj = easy_timedelta_distribution( start=start, end=end, resolution=resolution, steps=steps, kind=str(td_kind), param=td_param, ) except Exception as exc_err: raise StrategyError( "Failed to build easy_timedelta_distribution in exchange {}: {}".format( exc.get("name", "<unknown>"), exc_err ) ) from exc_err exc["temporal_distribution_kind"] = exc.get("temporal_distribution") exc["temporal_distribution"] = td_obj for key in ("start", "end", "steps", "td_kind", "td_param", "resolution"): exc.pop(key, None) elif kind == "easy_datetime_distribution": try: start = str(exc.get("start")) end = str(exc.get("end")) steps = int(exc.get("steps")) except (ValueError, TypeError): raise StrategyError( "Invalid start/end/steps values in exchange {}".format( exc.get("name", "<unknown>") ) ) if _is_blank(start) or _is_blank(end): raise StrategyError( "Missing start/end in exchange {}".format( exc.get("name", "<unknown>") ) ) try: td_obj = easy_datetime_distribution(start=start, end=end, steps=steps) except Exception as exc_err: raise StrategyError( "Failed to build easy_datetime_distribution in exchange {}: {}".format( exc.get("name", "<unknown>"), exc_err ) ) from exc_err exc["temporal_distribution_kind"] = exc.get("temporal_distribution") exc["temporal_distribution"] = td_obj for key in ("start", "end", "steps"): exc.pop(key, None) else: # delta or abs resolution = _normalize_resolution(exc.get("resolution"), exc) dates_raw = _parse_sequence(exc["date"], "date", exc) amounts_raw = _parse_sequence(exc["value"], "value", exc) if len(dates_raw) != len(amounts_raw): raise StrategyError( "Mismatched date/value lengths in exchange {}".format( exc.get("name", "<unknown>") ) ) if kind == "delta": date_values = _coerce_int_list(dates_raw, "date", exc) try: date_array = np.array( date_values, dtype="timedelta64[{}]".format(resolution) ) except Exception as exc_err: raise StrategyError( "Invalid timedelta resolution '{}' in exchange {}".format( resolution, exc.get("name", "<unknown>") ) ) from exc_err else: date_values = _normalize_abs_dates(dates_raw, resolution, exc) try: date_array = np.array( date_values, dtype="datetime64[{}]".format(resolution) ) except Exception as exc_err: raise StrategyError( "Invalid date values for resolution '{}' in exchange {}".format( resolution, exc.get("name", "<unknown>") ) ) from exc_err amount_values = _coerce_float_list(amounts_raw, "value", exc) total = sum(amount_values) if total == 0: raise StrategyError( "Temporal distribution amounts sum to zero in exchange {}".format( exc.get("name", "<unknown>") ) ) if not math.isclose(total, 1.0, rel_tol=1e-9, abs_tol=1e-12): amount_values = [a / total for a in amount_values] amount_array = np.array(amount_values, dtype=float) exc["temporal_distribution_kind"] = exc.get("temporal_distribution") exc["temporal_distribution"] = TemporalDistribution( date_array, amount_array ) exc.pop("date", None) exc.pop("value", None) exc.pop("resolution", None) return data