Source code for bw_temporalis.convolution

import numpy as np
import numpy.typing as npt
from scipy.sparse import csr_array

[docs] OFFSET = 31536000000000
[docs] datetime_type = np.dtype("datetime64[s]")
[docs] timedelta_type = np.dtype("timedelta64[s]")
[docs] time_types = {datetime_type, timedelta_type}
[docs] def consolidate( *, indices: npt.NDArray[np.int64], amounts: npt.NDArray[np.float64] ) -> tuple[npt.NDArray[np.int64], npt.NDArray[np.float64]]: """Sum all values in ``amount`` which have the same index in ``indices``""" # Sparse matrices don't allow for negative indices, but this can easily happen # with two timedelta64 arrays. Instead of checking we just always offset (still fast). # Choose one million years as a reasonable offset (native resolution is seconds) # 1_000_000 * 60 * 60 * 24 * 365 = 31536000000000 matrix = csr_array( (amounts, (np.zeros_like(indices), indices + OFFSET)), shape=(1, indices.max() + OFFSET + 1), ) coo = matrix.tocoo() mask = coo.data != 0 return (coo.col[mask] - OFFSET), coo.data[mask]
[docs] def convolve( *, first_date: npt.NDArray, first_amount: npt.NDArray[np.float64], second_date: npt.NDArray, second_amount: npt.NDArray[np.float64], return_dtype: npt.DTypeLike | str, ) -> tuple[npt.NDArray, npt.NDArray[np.float64]]: date = (first_date.reshape((-1, 1)) + second_date.reshape((1, -1))).ravel() amount = (first_amount.reshape((-1, 1)) * second_amount.reshape((1, -1))).ravel() date, amount = consolidate(indices=date.astype(np.int64), amounts=amount) return date.astype(return_dtype), amount.astype(np.float64)
[docs] def temporal_convolution_datetime_timedelta( *, first_date: npt.NDArray[datetime_type], first_amount: npt.NDArray[np.float64], second_date: npt.NDArray[timedelta_type], second_amount: npt.NDArray[np.float64], ) -> tuple[npt.NDArray[datetime_type], npt.NDArray[np.float64]]: if not (first_date.dtype == datetime_type): raise ValueError( f"`first_date` must have dtype `datetime64[s]`, but got `{first_date.dtype}`" ) if not (second_date.dtype == timedelta_type): raise ValueError( f"`second_date` must have dtype `timedelta64[s]`, but got `{second_date.dtype}`" ) return convolve( first_date=first_date, first_amount=first_amount, second_date=second_date, second_amount=second_amount, return_dtype=datetime_type, )
[docs] def temporal_convolution_timedelta_timedelta( *, first_date: npt.NDArray[timedelta_type], first_amount: npt.NDArray[np.float64], second_date: npt.NDArray[timedelta_type], second_amount: npt.NDArray[np.float64], ) -> tuple[npt.NDArray[timedelta_type], npt.NDArray[np.float64]]: if not (first_date.dtype == timedelta_type): raise ValueError( f"`first_date` must have dtype `timedelta64[s]`, but got {first_date.dtype}" ) if not (second_date.dtype == timedelta_type): raise ValueError( f"`second_date` must have dtype `timedelta64[s]`, but got {second_date.dtype}" ) return convolve( first_date=first_date, first_amount=first_amount, second_date=second_date, second_amount=second_amount, return_dtype=timedelta_type, )