datasets#

This module provides specialized dataset implementations that extend PyTorch’s Dataset class with additional functionality.

SizedDataset#

class fkat.data.datasets.SizedDataset(*args, **kwargs)[source]#

Bases: Protocol[T_in, T_out]

A Dataset with a known size.

MapDataset#

class fkat.data.datasets.MapDataset(dataset: SizedDataset[T_in, T_from], fn: Callable[[T_from], T_to])[source]#

Bases: SizedDataset[T_in, T_to]

A Dataset that transforms the samples from another Dataset using a function.

IterableMapDataset#

class fkat.data.datasets.IterableMapDataset(dataset: IterableDataset[T_from], fn: Callable[[T_from], T_to])[source]#

Bases: IterableDataset[T_to]

An IterableDataset that transforms the samples from another IterableDataset using a function.

DictDataset#

class fkat.data.datasets.DictDataset(datasets: dict[str, fkat.data.datasets.sized.SizedDataset[Any, dict[str, Any]]], key: str = 'dataset')[source]#

Bases: SizedDataset[tuple[str, Any], dict[str, Any]]

Dataset that can get samples from one of the Dataset using a mapping.

JsonDataset#

class fkat.data.datasets.JsonDataset(uri: str | list[str], read_options: Optional[ReadOptions] = None, parse_options: Optional[ParseOptions] = None, memory_pool: Optional[MemoryPool] = None, replace_nan: bool = True, **s3wr_args: Any)[source]#

Bases: SizedDataset[int, dict[str, Any]]

Create a Dataset from JSON data at the specified URI.

Parameters:
  • uri (str | list[str]) – URI of JSON data.

  • read_options (pa.json.ReadOptions | None) – JSON read options.

  • parse_options (pa.json.ParseOptions | None) – JSON parse options.

  • memory_pool (pa.MemoryPool | None) – JSON processing memory pool configuration.

  • replace_nan (bool) – Whether to replace np.nan as None. Default to True

  • s3wr_args (Any) – config for s3wr.s3.read_json, refer to https://aws-sdk-pandas.readthedocs.io/en/3.5.1/stubs/awswrangler.s3.read_json.html

IterableJsonDataset#

class fkat.data.datasets.IterableJsonDataset(uri: str | list[str], read_options: Optional[ReadOptions] = None, parse_options: Optional[ParseOptions] = None, memory_pool: Optional[MemoryPool] = None, chunk_size: int = 10000, replace_nan: bool = True, **s3wr_args: Any)[source]#

Bases: IterableDataset[dict[str, Any]]

An IterableDataset backed by Json data.

Parameters:
  • uri (str | list[str]) – URI of Parquet data.

  • read_options – pyarrow.json.ReadOptions, optional Options for the JSON reader (see ReadOptions constructor for defaults).

  • parse_options – pyarrow.json.ParseOptions, optional Options for the JSON parser (see ParseOptions constructor for defaults).

  • memory_pool – MemoryPool, optional Pool to allocate Table memory from.

  • chunk_size (int) – An iterable of DataFrames is returned with maximum rows equal to the received INTEGER.

  • replace_nan (bool) – Whether to replace np.nan as None. Default to True

  • s3wr_args (dict) – config for s3wr.s3.read_json, refer to https://aws-sdk-pandas.readthedocs.io/en/3.5.1/stubs/awswrangler.s3.read_parquet.html

ParquetDataset#

class fkat.data.datasets.ParquetDataset(uri: str | list[str], columns: Optional[list[str]] = None, use_threads: bool = True, replace_nan: bool = True, **s3wr_args: Any)[source]#

Bases: SizedDataset[int, dict[str, Any]]

A Dataset backed by Parquet data.

Create a Dataset from Parquet data at the specified URI.

Note

If you want to keep the original type from reading parquet, you should set dtype_backend='pyarrow'.

example config:

_target_: fkat.data.datasets.parquet.ParquetDataset
uri: s3://path/to/fkat.parquet
dtype_backend: pyarrow

Difference for dtype_backend between pyarrow and numpy_nullable

from fkat.data.datasets.parquet import ParquetDataset
from fkat.utils.s3_utils import fs_save_prediction_output_parquet

uri = "s3://path/to/fkat.parquet"

saved_data = {
    "purchased_items": [
        [
            {"product_id": "PROD001", "item_index": 12345, "quantity": "1"},
            {"product_id": "PROD002", "item_index": None, "quantity": "1"},
        ],
        [{"product_id": "PROD001", "item_index": 12345, "quantity": "1"}],
    ],
    "ground_truth": [[1, 2, 3], [1, 2]],
    "embeddings": [np.random.randn(128), np.random.randn(128)],
}
fs_save_prediction_output_parquet()(saved_data, uri)

dataset = ParquetDataset(uri)  # dtype_backend: numpy_nullable
print(type(dataset[0]["embeddings"]))  # type: numpy.ndarray
print(type(dataset[0]["purchased_items"]))  # type: numpy.ndarray of object
print(type(dataset[0]["ground_truth"]))  # type: numpy.ndarray of object

pyarrow_dataset = ParquetDataset(uri, dtype_backend="pyarrow")  # dtype_backend: pyarrow
print(type(pyarrow_dataset[0]["embeddings"]))  # type: list
print(type(pyarrow_dataset[0]["purchased_items"]))  # type: list of dictionary
print(type(pyarrow_dataset[0]["ground_truth"]))  # type: list of int
Parameters:

IterableParquetDataset#

class fkat.data.datasets.IterableParquetDataset(uri: str | list[str], columns: Optional[list[str]] = None, use_threads: bool = True, chunk_size: int = 10000, replace_nan: bool = True, **s3wr_args: Any)[source]#

Bases: IterableDataset[dict[str, Any]]

An IterableDataset backed by Parquet data.

Note

If you want to keep the original type from reading parquet, you should set dtype_backend='pyarrow'.

example config:

_target_: fkat.data.datasets.parquet.IterableParquetDataset
uri: s3://path/to/fkat.parquet
dtype_backend: pyarrow
Parameters:
  • uri (str or list[str]) – URI of Parquet data.

  • columns (List[str], optional) – Columns to load. Default to None

  • use_threads (bool) – Use multi-threaded processing. Default to True.

  • chunk_size (int) – An iterable of DataFrames is returned with maximum rows equal to the received INTEGER. Default to 10000

  • replace_nan (bool) – Whether to replace np.nan as None. Default to True

  • s3wr_args (dict) – config for s3wr.s3.read_parquet, refer to https://aws-sdk-pandas.readthedocs.io/en/3.5.1/stubs/awswrangler.s3.read_parquet.html