Source code for bw_timex.utils
import functools
import json
from datetime import datetime, timedelta
from typing import Callable, List, Optional, Union
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from bw2data.backends import ActivityDataset as AD
from bw2data.backends.proxies import Exchange
from bw2data.backends.schema import ExchangeDataset
from bw2data.errors import MultipleResults, UnknownObject
from bw_temporalis import TemporalDistribution, easy_timedelta_distribution
from IPython.display import Javascript, display
from ipywidgets import (
Button,
Dropdown,
FloatSlider,
HBox,
IntSlider,
IntText,
Label,
Layout,
Output,
Textarea,
ToggleButtons,
VBox,
)
from loguru import logger
[docs]
time_res_mapping_strftime = {
"year": "%Y",
"month": "%Y%m",
"day": "%Y%m%d",
"hour": "%Y%m%d%H",
}
[docs]
def extract_date_as_integer(dt_obj: datetime, time_res: Optional[str] = "year") -> int:
"""
Converts a datetime object to an integer for a given temporal resolution `time_res`
Parameters
----------
dt_obj : Datetime object.
Datetime object to be converted to an integer.
time_res : str, optional
time resolution to be returned: year=YYYY, month=YYYYMM, day=YYYYMMDD, hour=YYYYMMDDHH
Returns
-------
date_as_integer : int
Datetime object converted to an integer in the format of time_res
"""
if time_res not in time_res_mapping_strftime:
available = list(time_res_mapping_strftime.keys())
raise ValueError(
f"Invalid time_res: '{time_res}'. Please choose from: {available}."
)
formatted_date = dt_obj.strftime(time_res_mapping_strftime[time_res])
date_as_integer = int(formatted_date)
return date_as_integer
[docs]
def extract_date_as_string(timestamp: datetime, temporal_grouping: str) -> str:
"""
Extracts the grouping date as a string from a datetime object, based on the chosen temporal
grouping. E.g. for `temporal_grouping` = 'month', and `timestamp` = 2023-03-29T01:00:00, it
extracts the string '202303'.
Parameters
----------
timestamp : Datetime object
Datetime object to be converted to a string.
temporal_grouping : str
Temporal grouping for the date string. Options are: 'year', 'month', 'day', 'hour'
Returns
-------
date_as_string: str
Date as a string in the format of the chosen temporal grouping.
"""
if temporal_grouping not in time_res_mapping_strftime.keys():
raise ValueError(
f'temporal_grouping: {temporal_grouping} is not a valid option. Please \
choose from: {list(time_res_mapping_strftime.keys())}, defaulting to "year"',
)
return timestamp.strftime(time_res_mapping_strftime[temporal_grouping])
@functools.lru_cache(maxsize=4096)
[docs]
def convert_date_string_to_datetime(temporal_grouping, date_string) -> datetime:
"""
Converts the string of a date to datetime object.
e.g. for `temporal_grouping` = 'month', and `date_string` = '202303', it extracts 2023-03-01
Parameters
----------
temporal_grouping : str
Temporal grouping for the date string. Options are: 'year', 'month', 'day', 'hour'
date_string : str
Date as a string
Returns
-------
datetime
Datetime object of the date string at the chosen temporal resolution.
"""
time_res_dict = {
"year": "%Y",
"month": "%Y%m",
"day": "%Y%m%d",
"hour": "%Y%m%d%H",
}
if temporal_grouping not in time_res_dict.keys():
raise ValueError(
f'temporal grouping: {temporal_grouping} is not a valid option. Please \
choose from: {list(time_res_dict.keys())}, defaulting to "year"',
)
return datetime.strptime(date_string, time_res_dict[temporal_grouping])
[docs]
def round_datetime(date: datetime, resolution: str) -> datetime:
"""
Round a datetime object based on a given resolution
Parameters
----------
date : datetime
datetime object to be rounded
resolution: str
Temporal resolution to round the datetime object to. Options are: 'year', 'month', 'day' and
'hour'.
Returns
-------
datetime
rounded datetime object
"""
if resolution == "year":
mid_year = pd.Timestamp(f"{date.year}-07-01")
return (
pd.Timestamp(f"{date.year+1}-01-01")
if date >= mid_year
else pd.Timestamp(f"{date.year}-01-01")
)
if resolution == "month":
start_of_month = pd.Timestamp(f"{date.year}-{date.month}-01")
next_month = start_of_month + pd.DateOffset(months=1)
mid_month = start_of_month + (next_month - start_of_month) / 2
return next_month if date >= mid_month else start_of_month
if resolution == "day":
start_of_day = datetime(date.year, date.month, date.day)
mid_day = start_of_day + timedelta(hours=12)
return start_of_day + timedelta(days=1) if date >= mid_day else start_of_day
if resolution == "hour":
start_of_hour = datetime(date.year, date.month, date.day, date.hour)
mid_hour = start_of_hour + timedelta(minutes=30)
return start_of_hour + timedelta(hours=1) if date >= mid_hour else start_of_hour
raise ValueError("Resolution must be one of 'year', 'month', 'day', or 'hour'.")
[docs]
def round_datetime_series_to_year(dates: pd.Series) -> pd.Series:
"""
Vectorized equivalent of ``round_datetime(..., resolution="year")`` for a Series.
Dates on/after July 1st round up to January 1st of the next year, otherwise
down to January 1st of the same year. Matches ``round_datetime`` exactly but
avoids the per-row Python ``apply``.
"""
dt = pd.to_datetime(dates)
years = dt.dt.year.to_numpy()
mid_year = pd.to_datetime(
{"year": years, "month": 7, "day": 1}
).to_numpy()
round_up = dt.to_numpy() >= mid_year
rounded_years = years + round_up.astype(int)
return pd.to_datetime(
{"year": rounded_years, "month": 1, "day": 1}
).set_axis(dates.index)
[docs]
def add_flows_to_characterization_functions(
flows: Union[str, List[str]],
func: Callable,
characterization_functions: Optional[dict] = None,
) -> dict:
"""
Add a new flow or a list of flows to the available characterization functions.
Parameters
----------
flows : Union[str, List[str]]
Flow or list of flows to be added to the characterization function dictionary.
func : Callable
Dynamic characterization function for flow.
characterization_functions : dict, optional
Dictionary of flows and their corresponding characterization functions. Default is an empty
dictionary.
Returns
-------
dict
Updated characterization function dictionary with the new flow(s) and function(s).
"""
if characterization_functions is None:
characterization_functions = {}
# Check if the input is a single flow (str) or a list of flows (List[str])
if isinstance(flows, str):
# It's a single flow, add it directly
characterization_functions[flows] = func
elif isinstance(flows, list):
# It's a list of flows, iterate and add each one
for flow in flows:
characterization_functions[flow] = func
return characterization_functions
[docs]
def resolve_temporalized_node_name(code: str) -> str:
"""
Getting the name of a node based on the code only.
Works for non-unique codes if the name is the same across all databases.
Parameters
----------
code: str
Code of the node to resolve.
Returns
-------
str
Name of the node.
"""
qs = AD.select().where(AD.code == code)
names = set([obj.name for obj in qs])
if len(qs) > 1:
if len(names) > 1:
raise ValueError(
"Found multiple names for the given code: {}".format(names)
)
elif not qs:
raise UnknownObject
return names.pop()
[docs]
def plot_characterized_inventory_as_waterfall(
lca_obj,
static_scores=None,
prospective_scores=None,
order_stacked_activities=None,
):
"""
Plot a stacked waterfall chart of characterized inventory data. As comparison,
static and prospective scores can be added. Only works for metric GWP at the moment.
Parameters
----------
lca_obj : TimexLCA
LCA object with characterized inventory data.
static_scores : dict, optional
Dictionary of static scores. Default is None.
prospective_scores : dict, optional
Dictionary of prospective scores. Default is None.
order_stacked_activities : list, optional
List of activities to order the stacked bars in the waterfall plot. Default is None.
Returns
-------
None
plots the waterfall chart.
"""
if not hasattr(lca_obj, "characterized_inventory"):
raise ValueError("LCA object does not have characterized inventory data.")
if not hasattr(lca_obj, "activity_time_mapping"):
raise ValueError("Make sure to pass an instance of a TimexLCA.")
time_res_dict = {
"year": "%Y",
"month": "%Y-%m",
"day": "%Y-%m-%d",
"hour": "%Y-%m-%d %H",
}
plot_data = lca_obj.characterized_inventory.copy()
plot_data["year"] = plot_data["date"].dt.strftime(
time_res_dict[lca_obj.temporal_grouping]
) # TODO make temporal resolution flexible
# Optimized activity label fetching using the TimexLCA's built-in method
unique_activities = plot_data["activity"].unique()
activity_labels = {
idx: lca_obj.get_activity_name_from_time_mapped_id(idx)
for idx in unique_activities
}
plot_data["activity_label"] = plot_data["activity"].map(activity_labels)
plot_data = plot_data.groupby(["year", "activity_label"], as_index=False)[
"amount"
].sum()
pivoted_data = plot_data.pivot(
index="year", columns="activity_label", values="amount"
)
combined_data = []
# Adding exchange_scores as a static column
if static_scores:
static_data = pd.DataFrame(
static_scores.items(), columns=["activity_label", "amount"]
)
static_data["year"] = "static"
pivoted_static_data = static_data.pivot(
index="year", columns="activity_label", values="amount"
)
combined_data.append(pivoted_static_data)
combined_data.append(pivoted_data) # making sure the order is correct
# Adding exchange_scores as a prospective column
if prospective_scores:
prospective_data = pd.DataFrame(
prospective_scores.items(), columns=["activity_label", "amount"]
)
prospective_data["year"] = "prospective"
pivoted_prospective_data = prospective_data.pivot(
index="year", columns="activity_label", values="amount"
)
combined_data.append(pivoted_prospective_data)
combined_df = pd.concat(combined_data, axis=0)
if order_stacked_activities:
combined_df = combined_df[
order_stacked_activities
] # change order of activities in the stacked bars of the waterfall
# Calculate the bottom for only the dynamic data
dynamic_bottom = pivoted_data.sum(axis=1).cumsum().shift(1).fillna(0)
if static_scores and prospective_scores:
bottom = pd.concat([pd.Series([0]), dynamic_bottom, pd.Series([0])])
elif static_scores:
bottom = pd.concat([pd.Series([0]), dynamic_bottom])
elif prospective_scores:
bottom = pd.concat([dynamic_bottom, pd.Series([0])])
else:
bottom = dynamic_bottom
# Plotting
ax = combined_df.plot(
kind="bar",
stacked=True,
bottom=bottom,
figsize=(14, 6),
edgecolor="black",
linewidth=0.5,
)
ax.set_ylabel("GWP [kg CO2-eq]")
ax.set_xlabel("")
plt.xticks(rotation=45, ha="right")
if static_scores:
ax.axvline(x=0.5, color="black", linestyle="--", lw=1)
if prospective_scores:
ax.axvline(x=len(combined_df) - 1.5, color="black", linestyle="--", lw=1)
handles, labels = ax.get_legend_handles_labels()
ax.legend(
handles[::-1],
labels[::-1],
loc="center left",
bbox_to_anchor=(1.02, 0.5), # x=1.02 moves it outside, y=0.5 centers vertically
fontsize="small",
)
ax.set_axisbelow(True)
plt.grid(True)
plt.show()
[docs]
def get_exchange(**kwargs) -> Exchange:
"""
Get an exchange from the database.
Parameters
----------
**kwargs :
Arguments to specify an exchange.
- input_node: Input node object
- input_code: Input node code
- input_database: Input node database
- output_node: Output node object
- output_code: Output node code
- output_database: Output node database
Returns
-------
Exchange
The exchange object matching the criteria.
Raises
------
MultipleResults
If multiple exchanges match the criteria.
UnknownObject
If no exchange matches the criteria.
"""
# Process input_node if present
input_node = kwargs.pop("input_node", None)
if input_node:
kwargs["input_code"] = input_node["code"]
kwargs["input_database"] = input_node["database"]
# Process output_node if present
output_node = kwargs.pop("output_node", None)
if output_node:
kwargs["output_code"] = output_node["code"]
kwargs["output_database"] = output_node["database"]
# Map kwargs to database fields
mapping = {
"input_code": ExchangeDataset.input_code,
"input_database": ExchangeDataset.input_database,
"output_code": ExchangeDataset.output_code,
"output_database": ExchangeDataset.output_database,
}
# Build query filters
filters = []
for key, value in kwargs.items():
field = mapping.get(key)
if field is not None:
filters.append(field == value)
# Execute query with filters
qs = ExchangeDataset.select().where(*filters)
candidates = [Exchange(obj) for obj in qs]
num_candidates = len(candidates)
if num_candidates > 1:
raise MultipleResults(
f"Found {num_candidates} results for the given search. "
"Please be more specific or double-check your system model for duplicates."
)
if num_candidates == 0:
raise UnknownObject("No exchange found matching the criteria.")
return candidates[0]
[docs]
def add_temporal_distribution_to_exchange(
temporal_distribution: TemporalDistribution, **kwargs
):
"""
Adds a temporal distribution to an exchange specified by kwargs.
Parameters
----------
temporal_distribution : TemporalDistribution
TemporalDistribution to be added to the exchange.
**kwargs :
Arguments to specify an exchange.
- input_node: Input node object
- input_id: Input node database ID
- input_code: Input node code
- input_database: Input node database
- output_node: Output node object
- output_id: Output node database ID
- output_code: Output node code
- output_database: Output node database
Returns
-------
None
The exchange is saved with the temporal distribution.
"""
from .validation import TemporalDistributionExchangeInputs
TemporalDistributionExchangeInputs(temporal_distribution=temporal_distribution)
exchange = get_exchange(**kwargs)
exchange["temporal_distribution"] = temporal_distribution
exchange.save()
logger.info(f"Added temporal distribution to exchange {exchange}.")
[docs]
def add_temporal_evolution_to_exchange(
temporal_evolution_factors: dict = None,
temporal_evolution_amounts: dict = None,
**kwargs,
):
"""Add temporal evolution data to an exchange specified by kwargs.
Parameters
----------
temporal_evolution_factors : dict, optional
Dictionary mapping datetime keys to scaling factors.
temporal_evolution_amounts : dict, optional
Dictionary mapping datetime keys to absolute amounts.
**kwargs :
Arguments to specify an exchange (same as get_exchange).
Returns
-------
None
The exchange is saved with the temporal evolution data.
"""
from .validation import TemporalEvolutionExchangeInputs
TemporalEvolutionExchangeInputs(
temporal_evolution_factors=temporal_evolution_factors,
temporal_evolution_amounts=temporal_evolution_amounts,
)
exchange = get_exchange(**kwargs)
if temporal_evolution_factors is not None:
exchange["temporal_evolution_factors"] = temporal_evolution_factors
if temporal_evolution_amounts is not None:
exchange["temporal_evolution_amounts"] = temporal_evolution_amounts
exchange.save()
logger.info(f"Added temporal evolution to exchange {exchange}.")
[docs]
def interactive_td_widget():
"""
Create an interactive ipywidget for drafting and visualizing temporal distributions and copying
them to the clipboard.
For use in jupyter notebooks.
Returns
-------
ipywidgets.VBox
Interactive widget for drafting temporal distributions.
"""
# ---------- Controls ----------
mode = ToggleButtons(
options=["Generator", "Manual"], value="Generator", description="Mode"
)
# Generator controls
start = IntText(value=0, description="start")
end = IntText(value=10, description="end")
resolution = Dropdown(
options=[("Years", "Y"), ("Months", "M"), ("Days", "D")],
value="Y",
description="resolution",
)
steps = IntSlider(
value=11, min=2, max=11, step=1, description="steps", continuous_update=False
)
kind = ToggleButtons(
options=["uniform", "triangular", "normal"], value="uniform", description="kind"
)
# Give wide initial bounds; we'll override on kind changes
param = FloatSlider(
value=1.0,
min=0.01,
max=50.0,
step=0.01,
description="param",
disabled=True,
continuous_update=False,
)
# Manual controls
manual_unit = Dropdown(
options=[
("Years", "Y"),
("Months", "M"),
("Days", "D"),
("Hours", "h"),
("Minutes", "m"),
("Seconds", "s"),
],
value="Y",
description="resolution",
)
dates_text = Textarea(
value="0, 2, 4, 6, 8, 10",
description="dates",
layout=Layout(width="100%", min_height="70px"),
)
amounts_text = Textarea(
value="0.1, 0.1, 0.2, 0.2, 0.2, 0.2",
description="amounts",
layout=Layout(width="100%", min_height="70px"),
)
for widget in (start, end, resolution, steps, param, manual_unit):
widget.style.description_width = "initial"
steps.layout = Layout(width="220px")
param.layout = Layout(width="220px")
start.layout = Layout(width="160px")
end.layout = Layout(width="160px")
resolution.layout = Layout(width="180px")
manual_unit.layout = Layout(width="220px")
copy_btn = Button(description="Copy TD code", button_style="success")
copy_import_btn = Button(description="Copy TD + imports", button_style="")
copy_btn.layout = Layout(width="160px")
copy_import_btn.layout = Layout(width="200px")
status = Label(value="")
status.layout = Layout(margin="0 0 0 8px")
plot_out = Output(layout=Layout(width="100%"))
# ---------- Helpers ----------
def _parse_num_list(txt: str, label: str) -> List[float]:
parts = [p for p in txt.replace(",", " ").split() if p]
if not parts:
raise ValueError(f"{label} cannot be empty.")
values = []
for p in parts:
try:
if ("." in p) or ("e" in p.lower()):
values.append(float(p))
else:
values.append(int(p))
except ValueError as exc:
raise ValueError(f"Could not parse '{p}' in {label}.") from exc
return values
def _format_number(value: float) -> str:
as_float = float(value)
if np.isfinite(as_float) and as_float.is_integer():
return str(int(as_float))
return (f"{as_float:.6f}").rstrip("0").rstrip(".")
def _make_td_generator():
return easy_timedelta_distribution(
start=min(start.value, end.value),
end=max(start.value, end.value),
resolution=resolution.value,
steps=int(steps.value),
kind=kind.value,
param=None if param.disabled else float(param.value),
)
def _make_td_manual():
d = _parse_num_list(dates_text.value, "dates")
a = _parse_num_list(amounts_text.value, "amounts")
if len(d) != len(a):
raise ValueError("dates and amounts must have the same length.")
if not d:
raise ValueError("Provide at least one date and amount.")
date = np.array(d, dtype=f"timedelta64[{manual_unit.value}]")
amount = np.array(a, dtype=float)
if np.any(np.isnan(amount)):
raise ValueError("Amounts must be numeric values.")
return TemporalDistribution(date=date, amount=amount)
def _current_td():
return _make_td_generator() if mode.value == "Generator" else _make_td_manual()
def _current_resolution_for_graph():
return resolution.value if mode.value == "Generator" else manual_unit.value
def _draw_graph(td: TemporalDistribution):
with plot_out:
plot_out.clear_output(wait=True)
fig = plt.figure(figsize=(7, 3))
td.graph(style="default", resolution=_current_resolution_for_graph())
plt.show()
plt.close(fig)
status.value = (
f"OK · steps={len(td.amount)} · sum(amount)={float(np.sum(td.amount)):.6f}"
)
def refresh_preview(*_):
try:
td = _current_td()
_draw_graph(td)
except Exception as exc:
with plot_out:
plot_out.clear_output(wait=True)
status.value = f"Error: {exc}"
# --- robust param updater (avoid value snapping back to 1.0) ---
def _with_param_unobserved(fn):
try:
param.unobserve(refresh_preview, names="value")
fn()
finally:
param.observe(refresh_preview, names="value")
def _reset_param_for_kind():
def _apply():
s, e = sorted([start.value, end.value])
span = abs(e - s)
def _set_slider_value(target: float) -> None:
bounded = min(max(target, param.min), param.max)
step = param.step or 0
if step <= 0:
param.value = bounded
return
base = param.min
ticks = round((bounded - base) / step)
param.value = base + ticks * step
if kind.value == "uniform":
param.description = "param"
param.disabled = True
param.min = 0.1
param.max = 50.0
param.step = 0.1
param.value = 1.0
param.layout.display = "none"
return
if kind.value == "triangular":
param.description = "mode"
if s == e:
param.disabled = True
exact_value = float(s)
param.min = exact_value
param.max = exact_value
param.step = 1.0
param.value = exact_value
else:
param.disabled = False
param.min = float(s)
param.max = float(e)
param.step = max((param.max - param.min) / 20.0, 0.01)
_set_slider_value((param.min + param.max) / 2.0)
param.layout.display = ""
return
# normal
param.description = "std dev"
param.disabled = False
span = max(span, 1)
param.min = 0.02
param.max = 1.0
param.step = 0.01
_set_slider_value(0.15)
param.layout.display = ""
_with_param_unobserved(_apply)
def _code_generator():
s, e = sorted([start.value, end.value])
k = kind.value
p = None if param.disabled else float(param.value)
code = (
"td = easy_timedelta_distribution(\n"
f" start={s},\n"
f" end={e},\n"
f" resolution='{resolution.value}',\n"
f" steps={int(steps.value)},\n"
f" kind='{k}'"
)
if p is not None:
code += f",\n param={_format_number(p)}"
code += "\n)"
return code
def _code_manual():
d = _parse_num_list(dates_text.value, "dates")
a = _parse_num_list(amounts_text.value, "amounts")
unit = manual_unit.value
d_str = ", ".join(str(int(x)) for x in d)
a_str = ", ".join(_format_number(x) for x in a)
return (
f"date = np.array([{d_str}], dtype='timedelta64[{unit}]')\n"
f"amount = np.array([{a_str}], dtype=float)\n"
"td = TemporalDistribution(date=date, amount=amount)"
)
def _build_code(include_imports: bool = False) -> str:
body = _code_generator() if mode.value == "Generator" else _code_manual()
if not include_imports:
return body
if mode.value == "Generator":
imports = [
"from bw_temporalis import easy_timedelta_distribution",
]
else:
imports = [
"import numpy as np",
"from bw_temporalis import TemporalDistribution",
]
return "\n".join(imports + ["", body])
def _copy_code(include_imports: bool) -> None:
try:
code = _build_code(include_imports=include_imports)
display(Javascript(f"navigator.clipboard.writeText({json.dumps(code)})"))
suffix = " + imports" if include_imports else ""
status.value = f"✅ Code{suffix} copied to clipboard!"
except Exception as exc:
status.value = f"Error: {exc}"
# ---------- Updates ----------
def _on_kind_change(_):
_reset_param_for_kind()
refresh_preview()
def _on_start_end_change(_):
_reset_param_for_kind()
refresh_preview()
for w in (start, end):
w.observe(_on_start_end_change, names="value")
kind.observe(_on_kind_change, names="value")
for w in (resolution, steps):
w.observe(refresh_preview, names="value")
param.observe(
refresh_preview, names="value"
) # reattached in _with_param_unobserved
for w in (mode, manual_unit, dates_text, amounts_text):
w.observe(refresh_preview, names="value")
copy_btn.on_click(lambda _: _copy_code(include_imports=False))
copy_import_btn.on_click(lambda _: _copy_code(include_imports=True))
# Initial state
_reset_param_for_kind()
refresh_preview()
# ---------- UI ----------
gen_box = VBox(
[
HBox([start, end, resolution], layout=Layout(gap="10px")),
HBox([steps, param], layout=Layout(gap="10px")),
],
layout=Layout(gap="10px"),
)
man_box = VBox(
[manual_unit, dates_text, amounts_text],
layout=Layout(gap="8px", width="100%"),
)
# Keep steps.max synced to end for nicer defaults
def _sync_steps_max(_=None):
new_max = abs(end.value - start.value) + 1
if steps.max != new_max:
steps.max = new_max
_sync_steps_max()
end.observe(_sync_steps_max, names="value")
start.observe(_sync_steps_max, names="value")
buttons_box = HBox(
[copy_btn, copy_import_btn, status],
layout=Layout(align_items="center", gap="10px"),
)
def _layout_children():
if mode.value == "Generator":
return [
mode,
kind,
gen_box,
buttons_box,
plot_out,
]
return [
mode,
man_box,
buttons_box,
plot_out,
]
container = VBox(_layout_children(), layout=Layout(gap="12px", width="100%"))
def _mode_refresh(_):
container.children = _layout_children()
if mode.value == "Generator":
_sync_steps_max()
_reset_param_for_kind()
refresh_preview()
mode.observe(_mode_refresh, names="value")
return container
[docs]
def get_temporal_evolution_factor(
temporal_evolution: dict,
target_date: datetime,
) -> float:
"""Linearly interpolate a scaling factor for a given date from a temporal evolution dict.
Parameters
----------
temporal_evolution : dict or None
Dictionary mapping datetime keys to float scaling factors.
If None or empty, returns 1.0 (no scaling).
target_date : datetime
The calendar date to look up the factor for.
Returns
-------
float
The interpolated scaling factor. Clamped to the nearest boundary
value for dates outside the specified range.
"""
if not temporal_evolution:
return 1.0
sorted_dates = sorted(temporal_evolution.keys())
if len(sorted_dates) == 1:
return temporal_evolution[sorted_dates[0]]
# Clamp: below minimum
if target_date <= sorted_dates[0]:
return temporal_evolution[sorted_dates[0]]
# Clamp: above maximum
if target_date >= sorted_dates[-1]:
return temporal_evolution[sorted_dates[-1]]
# Find surrounding dates and interpolate
for i in range(len(sorted_dates) - 1):
lower = sorted_dates[i]
upper = sorted_dates[i + 1]
if lower <= target_date <= upper:
weight = (target_date - lower).total_seconds() / (
upper - lower
).total_seconds()
return (
temporal_evolution[lower] * (1 - weight)
+ temporal_evolution[upper] * weight
)
return 1.0 # fallback