datasets#
This module provides specialized dataset implementations that extend PyTorch’s Dataset class with additional functionality.
SizedDataset#
MapDataset#
- class fkat.data.datasets.MapDataset(dataset: SizedDataset[T_in, T_from], fn: Callable[[T_from], T_to])[source]#
Bases:
SizedDataset[T_in,T_to]A
Datasetthat transforms the samples from anotherDatasetusing a function.
IterableMapDataset#
DictDataset#
- class fkat.data.datasets.DictDataset(datasets: dict[str, fkat.data.datasets.sized.SizedDataset[Any, dict[str, Any]]], key: str = 'dataset')[source]#
Bases:
SizedDataset[tuple[str,Any],dict[str,Any]]Datasetthat can get samples from one of theDatasetusing a mapping.
JsonDataset#
- class fkat.data.datasets.JsonDataset(uri: str | list[str], read_options: Optional[ReadOptions] = None, parse_options: Optional[ParseOptions] = None, memory_pool: Optional[MemoryPool] = None, replace_nan: bool = True, **s3wr_args: Any)[source]#
Bases:
SizedDataset[int,dict[str,Any]]Create a
Datasetfrom JSON data at the specified URI.- Parameters:
uri (str | list[str]) – URI of JSON data.
read_options (pa.json.ReadOptions | None) – JSON read options.
parse_options (pa.json.ParseOptions | None) – JSON parse options.
memory_pool (pa.MemoryPool | None) – JSON processing memory pool configuration.
replace_nan (bool) – Whether to replace np.nan as None. Default to
Trues3wr_args (Any) – config for s3wr.s3.read_json, refer to https://aws-sdk-pandas.readthedocs.io/en/3.5.1/stubs/awswrangler.s3.read_json.html
IterableJsonDataset#
- class fkat.data.datasets.IterableJsonDataset(uri: str | list[str], read_options: Optional[ReadOptions] = None, parse_options: Optional[ParseOptions] = None, memory_pool: Optional[MemoryPool] = None, chunk_size: int = 10000, replace_nan: bool = True, **s3wr_args: Any)[source]#
Bases:
IterableDataset[dict[str,Any]]An
IterableDatasetbacked by Json data.- Parameters:
uri (str | list[str]) – URI of Parquet data.
read_options – pyarrow.json.ReadOptions, optional Options for the JSON reader (see ReadOptions constructor for defaults).
parse_options – pyarrow.json.ParseOptions, optional Options for the JSON parser (see ParseOptions constructor for defaults).
memory_pool – MemoryPool, optional Pool to allocate Table memory from.
chunk_size (int) – An iterable of DataFrames is returned with maximum rows equal to the received INTEGER.
replace_nan (bool) – Whether to replace np.nan as None. Default to
Trues3wr_args (dict) – config for s3wr.s3.read_json, refer to https://aws-sdk-pandas.readthedocs.io/en/3.5.1/stubs/awswrangler.s3.read_parquet.html
ParquetDataset#
- class fkat.data.datasets.ParquetDataset(uri: str | list[str], columns: Optional[list[str]] = None, use_threads: bool = True, replace_nan: bool = True, **s3wr_args: Any)[source]#
Bases:
SizedDataset[int,dict[str,Any]]A
Datasetbacked by Parquet data.Create a
Datasetfrom Parquet data at the specified URI.Note
If you want to keep the original type from reading parquet, you should set
dtype_backend='pyarrow'.example config:
_target_: fkat.data.datasets.parquet.ParquetDataset uri: s3://path/to/fkat.parquet dtype_backend: pyarrow
Difference for
dtype_backendbetweenpyarrowandnumpy_nullablefrom fkat.data.datasets.parquet import ParquetDataset from fkat.utils.s3_utils import fs_save_prediction_output_parquet uri = "s3://path/to/fkat.parquet" saved_data = { "purchased_items": [ [ {"product_id": "PROD001", "item_index": 12345, "quantity": "1"}, {"product_id": "PROD002", "item_index": None, "quantity": "1"}, ], [{"product_id": "PROD001", "item_index": 12345, "quantity": "1"}], ], "ground_truth": [[1, 2, 3], [1, 2]], "embeddings": [np.random.randn(128), np.random.randn(128)], } fs_save_prediction_output_parquet()(saved_data, uri) dataset = ParquetDataset(uri) # dtype_backend: numpy_nullable print(type(dataset[0]["embeddings"])) # type: numpy.ndarray print(type(dataset[0]["purchased_items"])) # type: numpy.ndarray of object print(type(dataset[0]["ground_truth"])) # type: numpy.ndarray of object pyarrow_dataset = ParquetDataset(uri, dtype_backend="pyarrow") # dtype_backend: pyarrow print(type(pyarrow_dataset[0]["embeddings"])) # type: list print(type(pyarrow_dataset[0]["purchased_items"])) # type: list of dictionary print(type(pyarrow_dataset[0]["ground_truth"])) # type: list of int
- Parameters:
uri (str | list[str]) – URI of Parquet data.
columns (list[str] | None) – Columns to load.
use_threads (bool) – Use multi-threaded processing. Defaults to
True.replace_nan (bool) – Whether to replace np.nan as None. Default to
Trues3wr_args (dict) – config for s3wr.s3.read_parquet, refer to https://aws-sdk-pandas.readthedocs.io/en/3.5.1/stubs/awswrangler.s3.read_parquet.html
IterableParquetDataset#
- class fkat.data.datasets.IterableParquetDataset(uri: str | list[str], columns: Optional[list[str]] = None, use_threads: bool = True, chunk_size: int = 10000, replace_nan: bool = True, **s3wr_args: Any)[source]#
Bases:
IterableDataset[dict[str,Any]]An
IterableDatasetbacked by Parquet data.Note
If you want to keep the original type from reading parquet, you should set
dtype_backend='pyarrow'.example config:
_target_: fkat.data.datasets.parquet.IterableParquetDataset uri: s3://path/to/fkat.parquet dtype_backend: pyarrow
- Parameters:
uri (str or list[str]) – URI of Parquet data.
columns (List[str], optional) – Columns to load. Default to
Noneuse_threads (bool) – Use multi-threaded processing. Default to
True.chunk_size (int) – An iterable of DataFrames is returned with maximum rows equal to the received INTEGER. Default to
10000replace_nan (bool) – Whether to replace np.nan as None. Default to
Trues3wr_args (dict) – config for s3wr.s3.read_parquet, refer to https://aws-sdk-pandas.readthedocs.io/en/3.5.1/stubs/awswrangler.s3.read_parquet.html