# -*- coding: utf-8 -*-
"""
This module contains some helpers to convert `nympy.ndarrays` to/from Apache `Arrow` `Table`.
We use `pyarrow.Table` objects to save/retrieve data into/from `parquet` format files.
We use a `metadata` section in the `pyarrow.Table` (and the `parquet` files) to be able
to recognize what type of data was serialized. Specific and generic codes exist.
The metadata object is a `dict` object that looks like this:
`{"object": "vector", "type": "generic"}`. `object` can be `vector` (`ndim == 1`) or `matrix` (`ndim == 2`),
`type` can be:
- `indices` (`dtype` is `INDICES_DTYPE`);
- `distributions` (`dtype` is `UNCERTAINTY_DTYPE`);
- `generic` (`dtype` is a common type);
"""
import numpy as np
import pyarrow as pa
from bw_processing import INDICES_DTYPE, UNCERTAINTY_DTYPE
###########
# VECTORS #
###########
[docs]
def numpy_generic_vector_to_pyarrow_generic_vector_table(arr: np.ndarray) -> pa.Table:
"""
Convert a generic (numpy) vector to a (arrow) table.
Args:
arr (ndarray): A numpy array that corresponds to a vector, i.e. its dimension is 1.
See:
`pyarrow_generic_vector_table_to_numpy_generic_vector`.
Returns:
The corresponding `pyarrow.Table` object.
"""
assert isinstance(arr, np.ndarray)
assert arr.ndim == 1
generic_schema = pa.schema(
[pa.field(str(0), pa.from_numpy_dtype(arr.dtype))],
metadata={"object": "vector", "type": "generic"},
)
table = pa.table({str(0): arr}, schema=generic_schema)
return table
[docs]
def pyarrow_generic_vector_table_to_numpy_generic_vector(table: pa.Table) -> np.ndarray:
"""
Convert a generic (arrow) vector table to a (numpy) array.
Args:
table (pa.Table): A `pyarrow` table that corresponds to a vector.
See:
`numpy_generic_vector_to_pyarrow_generic_vector_table`.
Returns:
The corresponding `np.ndarray` object.
"""
assert isinstance(table, pa.Table)
assert len(table.schema) == 1
assert table.schema.metadata[b"object"] == b"vector"
assert table.schema.metadata[b"type"] == b"generic"
# find numpy dtype
pa_field = table.schema[0] # pa.Field(), only one common type for all elements
numpy_dtype = pa_field.type.to_pandas_dtype() # numpy type
arr = np.array(table[str(0)], dtype=numpy_dtype) # col name is "0"
return arr
# specific arrow schema for indices vectors
[docs]
INDICES_SCHEMA = pa.schema(
[
pa.field(INDICES_DTYPE[0][0], pa.from_numpy_dtype(INDICES_DTYPE[0][1])),
pa.field(INDICES_DTYPE[1][0], pa.from_numpy_dtype(INDICES_DTYPE[1][1])),
],
metadata={"object": "vector", "type": "indices"},
)
[docs]
def numpy_indices_vector_to_pyarrow_indices_vector_table(arr: np.ndarray) -> pa.Table:
"""
Convert a specific indices (numpy) vector to a (arrow) table.
Args:
arr (ndarray): A numpy array that corresponds to an indices vector, i.e. its dimension is 1 and its
`dtype` is `INDICES_DTYPE`.
See:
`pyarrow_indices_vector_table_to_numpy_indices_vector`.
Returns:
The corresponding `pyarrow.Table` object.
"""
assert isinstance(arr, np.ndarray)
assert arr.ndim == 1
assert arr.dtype == INDICES_DTYPE
row_indices = []
col_indices = []
for row, col in arr:
row_indices.append(row)
col_indices.append(col)
table = pa.table(
{
INDICES_DTYPE[0][0]: row_indices, # col name is "row"
INDICES_DTYPE[1][0]: col_indices,
}, # col name is "col"
schema=INDICES_SCHEMA,
)
return table
[docs]
def pyarrow_indices_vector_table_to_numpy_indices_vector(table: pa.Table) -> np.ndarray:
"""
Convert a specific indices (arrow) vector table to a (numpy) array.
Args:
table (pa.Table): A `pyarrow` table that corresponds to an indices vector.
See:
`numpy_indices_vector_to_pyarrow_indices_vector_table`.
Returns:
The corresponding `np.ndarray` object.
"""
assert isinstance(table, pa.Table)
assert len(table.schema) == 2
assert table.schema.metadata[b"object"] == b"vector"
assert table.schema.metadata[b"type"] == b"indices"
indices_array = []
for row, col in zip(table["row"], table["col"]):
indices_array.append((row.as_py(), col.as_py()))
arr = np.array(indices_array, dtype=INDICES_DTYPE)
return arr
# create UNCERTAINTY schema
[docs]
NBR_UNCERTAINTY_FIELDS = len(UNCERTAINTY_DTYPE)
[docs]
PA_UNCERTAINTY_FIELDS = [
pa.field(UNCERTAINTY_DTYPE[i][0], pa.from_numpy_dtype(UNCERTAINTY_DTYPE[i][1]))
for i in range(NBR_UNCERTAINTY_FIELDS)
]
[docs]
UNCERTAINTY_SCHEMA = pa.schema(
PA_UNCERTAINTY_FIELDS, metadata={"object": "vector", "type": "distributions"}
)
[docs]
UNCERTAINTY_FIELDS_NAMES = [UNCERTAINTY_DTYPE[i][0] for i in range(NBR_UNCERTAINTY_FIELDS)]
[docs]
def numpy_distributions_vector_to_pyarrow_distributions_vector_table(
arr: np.ndarray,
) -> pa.Table:
"""
Convert a specific distributions (numpy) vector to a (arrow) table.
Args:
arr (np.ndarray): A numpy array that corresponds to a distributions vector, i.e. its dimension is 1 and its
`dtype` is `UNCERTAINTY_DTYPE`.
See:
`pyarrow_distributions_vector_table_to_numpy_distributions_vector`
Returns:
The corresponding `pyarrow.Table` object.
"""
assert isinstance(arr, np.ndarray)
assert arr.ndim == 1
assert arr.dtype == UNCERTAINTY_DTYPE
from_dict = {}
for i in range(NBR_UNCERTAINTY_FIELDS):
from_dict[UNCERTAINTY_FIELDS_NAMES[i]] = []
for arr_el in arr:
for i, el in enumerate(arr_el):
from_dict[UNCERTAINTY_FIELDS_NAMES[i]].append(el)
table = pa.table(from_dict, schema=UNCERTAINTY_SCHEMA)
return table
[docs]
def pyarrow_distributions_vector_table_to_numpy_distributions_vector(
table: pa.Table,
) -> np.ndarray:
"""
Convert a specific distributions (arrow) vector table to a (numpy) array.
Args:
table (pa.Table): A `pyarrow` table that corresponds to a distributions vector.
See:
`numpy_distributions_vector_to_pyarrow_distributions_vector_table`.
Returns:
The corresponding `np.ndarray` object.
"""
assert isinstance(table, pa.Table)
assert len(table.schema) == NBR_UNCERTAINTY_FIELDS
assert table.schema.metadata[b"object"] == b"vector"
assert table.schema.metadata[b"type"] == b"distributions"
distributions_arrays_list = [
table[UNCERTAINTY_FIELDS_NAMES[i]] for i in range(NBR_UNCERTAINTY_FIELDS)
]
distributions_array = []
for el in zip(*distributions_arrays_list):
distributions_array.append(tuple(el[i].as_py() for i in range(NBR_UNCERTAINTY_FIELDS)))
arr = np.array(distributions_array, dtype=UNCERTAINTY_DTYPE)
return arr
############
# MATRICES #
############
[docs]
def numpy_generic_matrix_to_pyarrow_generic_matrix_table(arr: np.ndarray) -> pa.Table:
"""
Convert a generic (numpy) matrix to a (arrow) table.
Args:
arr (ndarray): A numpy array that corresponds to a generic matrix, i.e. its dimension is 2.
See:
`pyarrow_generic_matrix_table_to_numpy_generic_matrix`.
Returns:
The corresponding `pyarrow.Table` object.
"""
assert isinstance(arr, np.ndarray)
assert arr.ndim == 2
arr_dtype = arr.dtype
metadata = {"object": "matrix", "type": "generic"}
nbr_rows, nbr_cols = arr.shape
arrays = [pa.array(arr[:, j], type=pa.from_numpy_dtype(arr_dtype)) for j in range(nbr_cols)]
table = pa.Table.from_arrays(
arrays=arrays,
names=[str(j) for j in range(nbr_cols)], # give names to each column
metadata=metadata,
)
return table
[docs]
def pyarrow_generic_matrix_table_to_numpy_generic_matrix(table: pa.Table) -> np.ndarray:
"""
Convert a generic (arrow) matrix table to a (numpy) array.
Args:
table (pa.Table): A `pyarrow` table that corresponds to a generic matrix.
See:
`numpy_generic_matrix_to_pyarrow_generic_matrix_table`.
Returns:
The corresponding `np.ndarray` object.
"""
assert isinstance(table, pa.Table)
assert table.schema.metadata[b"object"] == b"matrix"
assert table.schema.metadata[b"type"] == b"generic"
arr = table.to_pandas().to_numpy()
return arr