Module vexpresso.utils#
View Source
from __future__ import annotations
import inspect
import os
from collections.abc import Iterable
from dataclasses import dataclass, field
from functools import reduce, wraps
from typing import Any, Callable, Dict, List, Optional, Tuple
import daft
import numpy as np
ResourceRequest = daft.resource_request.ResourceRequest
DataType = daft.datatype.DataType
def get_batch_size(embeddings: Iterable[Any]) -> int:
if isinstance(embeddings, np.ndarray):
if len(embeddings.shape) == 1:
return 1
return len(embeddings)
# LANGCHAIN
@dataclass
class Document:
"""Interface for interacting with a document."""
page_content: str
metadata: dict = field(default_factory=dict)
def batchify_args(args, batch_size):
if isinstance(args, Iterable) and not isinstance(args, str):
if len(args) != batch_size:
raise ValueError("ARG needs to be size batch size")
else:
args = [args for _ in range(batch_size)]
return args
def lazy(default: bool = True):
def dec(func):
@wraps(func)
def wrapper(*args, lazy=default, **kwargs):
collection = func(*args, **kwargs)
if not lazy:
if isinstance(collection, list):
collection = [c.execute() for c in collection]
else:
collection = collection.execute()
return collection
return wrapper
return dec
def convert_args(*args):
return [
arg.to_pylist() if isinstance(arg, daft.series.Series) else arg for arg in args
]
def convert_kwargs(**kwargs):
return {
k: kwargs[k].to_pylist()
if isinstance(kwargs[k], daft.series.Series)
else kwargs[k]
for k in kwargs
}
# TODO: CHANGE TO ENUM
def transform_wrapper(
original_transform: Transformation = None,
datatype: DataType = DataType.python(),
init_kwargs: Dict[str, Any] = {},
function: str = "__call__",
):
if inspect.isfunction(original_transform):
def _decorate(function: Transformation):
@wraps(function)
def wrapped(*args, **kwargs):
args = convert_args(*args)
kwargs = convert_kwargs(**kwargs)
return function(*args, **kwargs)
wrapped.__signature__ = inspect.signature(function)
_udf = daft.udf(return_dtype=datatype)(wrapped)
_udf.__vexpresso_transform = True
return _udf
return _decorate(original_transform)
else:
if isinstance(original_transform, type):
sig = inspect.signature(getattr(original_transform, function))
else:
sig = inspect.signature(getattr(original_transform.__class__, function))
class _Transformation:
def __init__(self):
if isinstance(original_transform, type):
# hasn't been initialized yet
self._transform = original_transform(**init_kwargs)
else:
self._transform = original_transform
def __call__(self, *args, **kwargs):
args = convert_args(*args)
kwargs = convert_kwargs(**kwargs)
return getattr(self._transform, function)(*args, **kwargs)
_Transformation.__call__.__signature__ = sig
_udf = daft.udf(return_dtype=datatype)(_Transformation)
_udf.__vexpresso_transform = True
return _udf
def transformation(
original_function: Transformation = None,
datatype: DataType = DataType.python(),
init_kwargs={},
function: str = "__call__",
):
if getattr(original_function, "__vexpresso_transform", None) is None:
wrapper = transform_wrapper(
original_function,
datatype=datatype,
init_kwargs=init_kwargs,
function=function,
)
return wrapper
return original_function
def get_field_name_and_key(field: str, column_names: List[str] = []) -> Tuple[str, str]:
if field in column_names:
return field, None
field_name = field.split(".")[0]
keys = None
if "." in field:
keys = field.split(".", 1)[-1]
return field_name, keys
def deep_get(dictionary, keys=None, default=None):
if keys is None:
return dictionary
if isinstance(dictionary, dict):
return reduce(
lambda d, key: d.get(key, default) if isinstance(d, dict) else default,
keys.split("."),
dictionary,
)
return dictionary
class HFHubHelper:
def __init__(self):
self._hf_hub = None
try:
import huggingface_hub # noqa
self._hf_hub = huggingface_hub
except ImportError:
raise ImportError(
"Could not import huggingface_hub python package."
"Please install it with `pip install huggingface_hub`."
)
self.api = self._hf_hub.HfApi()
def create_repo(
self, repo_id: str, token: Optional[str] = None, *args, **kwargs
) -> str:
if token is None:
token = os.environ.get("HUGGINGFACEHUB_API_TOKEN")
return self.api.create_repo(
repo_id=repo_id,
token=token,
exist_ok=True,
repo_type="dataset",
*args,
**kwargs,
)
def upload(
self,
repo_id: str,
folder_path: str,
token: Optional[str] = None,
private: bool = True,
*args,
**kwargs,
) -> str:
if token is None:
token = os.environ.get("HUGGINGFACEHUB_API_TOKEN")
self.create_repo(repo_id, token, private=private)
return self.api.upload_folder(
repo_id=repo_id,
folder_path=folder_path,
token=token,
repo_type="dataset",
*args,
**kwargs,
)
def download(
self,
repo_id: str,
token: Optional[str] = None,
local_dir: Optional[str] = None,
*args,
**kwargs,
) -> str:
if token is None:
token = os.environ.get("HUGGINGFACEHUB_API_TOKEN")
return self._hf_hub.snapshot_download(
repo_id=repo_id,
token=token,
local_dir=local_dir,
repo_type="dataset",
*args,
**kwargs,
)
Transformation = Callable[[List[Any], Any], List[Any]]
Variables#
Transformation
Functions#
batchify_args#
def batchify_args(
args,
batch_size
)
View Source
def batchify_args(args, batch_size):
if isinstance(args, Iterable) and not isinstance(args, str):
if len(args) != batch_size:
raise ValueError("ARG needs to be size batch size")
else:
args = [args for _ in range(batch_size)]
return args
convert_args#
def convert_args(
*args
)
View Source
def convert_args(*args):
return [
arg.to_pylist() if isinstance(arg, daft.series.Series) else arg for arg in args
]
convert_kwargs#
def convert_kwargs(
**kwargs
)
View Source
def convert_kwargs(**kwargs):
return {
k: kwargs[k].to_pylist()
if isinstance(kwargs[k], daft.series.Series)
else kwargs[k]
for k in kwargs
}
deep_get#
def deep_get(
dictionary,
keys=None,
default=None
)
View Source
def deep_get(dictionary, keys=None, default=None):
if keys is None:
return dictionary
if isinstance(dictionary, dict):
return reduce(
lambda d, key: d.get(key, default) if isinstance(d, dict) else default,
keys.split("."),
dictionary,
)
return dictionary
get_batch_size#
def get_batch_size(
embeddings: 'Iterable[Any]'
) -> 'int'
View Source
def get_batch_size(embeddings: Iterable[Any]) -> int:
if isinstance(embeddings, np.ndarray):
if len(embeddings.shape) == 1:
return 1
return len(embeddings)
get_field_name_and_key#
def get_field_name_and_key(
field: 'str',
column_names: 'List[str]' = []
) -> 'Tuple[str, str]'
View Source
def get_field_name_and_key(field: str, column_names: List[str] = []) -> Tuple[str, str]:
if field in column_names:
return field, None
field_name = field.split(".")[0]
keys = None
if "." in field:
keys = field.split(".", 1)[-1]
return field_name, keys
lazy#
def lazy(
default: 'bool' = True
)
View Source
def lazy(default: bool = True):
def dec(func):
@wraps(func)
def wrapper(*args, lazy=default, **kwargs):
collection = func(*args, **kwargs)
if not lazy:
if isinstance(collection, list):
collection = [c.execute() for c in collection]
else:
collection = collection.execute()
return collection
return wrapper
return dec
transform_wrapper#
def transform_wrapper(
original_transform: 'Transformation' = None,
datatype: 'DataType' = Python,
init_kwargs: 'Dict[str, Any]' = {},
function: 'str' = '__call__'
)
View Source
def transform_wrapper(
original_transform: Transformation = None,
datatype: DataType = DataType.python(),
init_kwargs: Dict[str, Any] = {},
function: str = "__call__",
):
if inspect.isfunction(original_transform):
def _decorate(function: Transformation):
@wraps(function)
def wrapped(*args, **kwargs):
args = convert_args(*args)
kwargs = convert_kwargs(**kwargs)
return function(*args, **kwargs)
wrapped.__signature__ = inspect.signature(function)
_udf = daft.udf(return_dtype=datatype)(wrapped)
_udf.__vexpresso_transform = True
return _udf
return _decorate(original_transform)
else:
if isinstance(original_transform, type):
sig = inspect.signature(getattr(original_transform, function))
else:
sig = inspect.signature(getattr(original_transform.__class__, function))
class _Transformation:
def __init__(self):
if isinstance(original_transform, type):
# hasn't been initialized yet
self._transform = original_transform(**init_kwargs)
else:
self._transform = original_transform
def __call__(self, *args, **kwargs):
args = convert_args(*args)
kwargs = convert_kwargs(**kwargs)
return getattr(self._transform, function)(*args, **kwargs)
_Transformation.__call__.__signature__ = sig
_udf = daft.udf(return_dtype=datatype)(_Transformation)
_udf.__vexpresso_transform = True
return _udf
transformation#
def transformation(
original_function: 'Transformation' = None,
datatype: 'DataType' = Python,
init_kwargs={},
function: 'str' = '__call__'
)
View Source
def transformation(
original_function: Transformation = None,
datatype: DataType = DataType.python(),
init_kwargs={},
function: str = "__call__",
):
if getattr(original_function, "__vexpresso_transform", None) is None:
wrapper = transform_wrapper(
original_function,
datatype=datatype,
init_kwargs=init_kwargs,
function=function,
)
return wrapper
return original_function
Classes#
DataType#
class DataType(
)
A Daft DataType defines the type of all the values in an Expression or DataFrame column
View Source
class DataType:
"""A Daft DataType defines the type of all the values in an Expression or DataFrame column"""
_dtype: PyDataType
def __init__(self) -> None:
raise NotImplementedError(
"We do not support creating a DataType via __init__ "
"use a creator method like DataType.int32() or use DataType.from_arrow_type(pa_type)"
)
@staticmethod
def _from_pydatatype(pydt: PyDataType) -> DataType:
dt = DataType.__new__(DataType)
dt._dtype = pydt
return dt
@classmethod
def int8(cls) -> DataType:
"""Create an 8-bit integer DataType"""
return cls._from_pydatatype(PyDataType.int8())
@classmethod
def int16(cls) -> DataType:
"""Create an 16-bit integer DataType"""
return cls._from_pydatatype(PyDataType.int16())
@classmethod
def int32(cls) -> DataType:
"""Create an 32-bit integer DataType"""
return cls._from_pydatatype(PyDataType.int32())
@classmethod
def int64(cls) -> DataType:
"""Create an 64-bit integer DataType"""
return cls._from_pydatatype(PyDataType.int64())
@classmethod
def uint8(cls) -> DataType:
"""Create an unsigned 8-bit integer DataType"""
return cls._from_pydatatype(PyDataType.uint8())
@classmethod
def uint16(cls) -> DataType:
"""Create an unsigned 16-bit integer DataType"""
return cls._from_pydatatype(PyDataType.uint16())
@classmethod
def uint32(cls) -> DataType:
"""Create an unsigned 32-bit integer DataType"""
return cls._from_pydatatype(PyDataType.uint32())
@classmethod
def uint64(cls) -> DataType:
"""Create an unsigned 64-bit integer DataType"""
return cls._from_pydatatype(PyDataType.uint64())
@classmethod
def float32(cls) -> DataType:
"""Create a 32-bit float DataType"""
return cls._from_pydatatype(PyDataType.float32())
@classmethod
def float64(cls) -> DataType:
"""Create a 64-bit float DataType"""
return cls._from_pydatatype(PyDataType.float64())
@classmethod
def string(cls) -> DataType:
"""Create a String DataType: A string of UTF8 characters"""
return cls._from_pydatatype(PyDataType.string())
@classmethod
def bool(cls) -> DataType:
"""Create the Boolean DataType: Either ``True`` or ``False``"""
return cls._from_pydatatype(PyDataType.bool())
@classmethod
def binary(cls) -> DataType:
"""Create a Binary DataType: A string of bytes"""
return cls._from_pydatatype(PyDataType.binary())
@classmethod
def null(cls) -> DataType:
"""Creates the Null DataType: Always the ``Null`` value"""
return cls._from_pydatatype(PyDataType.null())
@classmethod
def date(cls) -> DataType:
"""Create a Date DataType: A date with a year, month and day"""
return cls._from_pydatatype(PyDataType.date())
@classmethod
def list(cls, name: str, dtype: DataType) -> DataType:
"""Create a List DataType: Variable-length list, where each element in the list has type ``dtype``
Args:
dtype: DataType of each element in the list
"""
return cls._from_pydatatype(PyDataType.list(name, dtype._dtype))
@classmethod
def fixed_size_list(cls, name: str, dtype: DataType, size: int) -> DataType:
"""Create a FixedSizeList DataType: Fixed-size list, where each element in the list has type ``dtype``
and each list has length ``size``.
Args:
dtype: DataType of each element in the list
size: length of each list
"""
if not isinstance(size, int) or size <= 0:
raise ValueError("The size for a fixed-size list must be a positive integer, but got: ", size)
return cls._from_pydatatype(PyDataType.fixed_size_list(name, dtype._dtype, size))
@classmethod
def struct(cls, fields: dict[str, DataType]) -> DataType:
"""Create a Struct DataType: a nested type which has names mapped to child types
Args:
fields: Nested fields of the Struct
"""
return cls._from_pydatatype(PyDataType.struct({name: datatype._dtype for name, datatype in fields.items()}))
@classmethod
def extension(cls, name: str, storage_dtype: DataType, metadata: str | None = None) -> DataType:
return cls._from_pydatatype(PyDataType.extension(name, storage_dtype._dtype, metadata))
@classmethod
def embedding(cls, name: str, dtype: DataType, size: int) -> DataType:
"""Create an Embedding DataType: embeddings are fixed size arrays, where each element
in the array has a **numeric** ``dtype`` and each array has a fixed length of ``size``.
Args:
dtype: DataType of each element in the list (must be numeric)
size: length of each list
"""
if not isinstance(size, int) or size <= 0:
raise ValueError("The size for a embedding must be a positive integer, but got: ", size)
return cls._from_pydatatype(PyDataType.embedding(name, dtype._dtype, size))
@classmethod
def image(
cls, mode: str | ImageMode | None = None, height: int | None = None, width: int | None = None
) -> DataType:
if isinstance(mode, str):
mode = ImageMode.from_mode_string(mode)
if height is not None and width is not None:
if not isinstance(height, int) or height <= 0:
raise ValueError("Image height must be a positive integer, but got: ", height)
if not isinstance(width, int) or width <= 0:
raise ValueError("Image width must be a positive integer, but got: ", width)
elif height is not None or width is not None:
raise ValueError(
f"Image height and width must either both be specified, or both not be specified, but got height={height}, width={width}"
)
return cls._from_pydatatype(PyDataType.image(mode, height, width))
@classmethod
def from_arrow_type(cls, arrow_type: pa.lib.DataType) -> DataType:
"""Maps a PyArrow DataType to a Daft DataType"""
if pa.types.is_int8(arrow_type):
return cls.int8()
elif pa.types.is_int16(arrow_type):
return cls.int16()
elif pa.types.is_int32(arrow_type):
return cls.int32()
elif pa.types.is_int64(arrow_type):
return cls.int64()
elif pa.types.is_uint8(arrow_type):
return cls.uint8()
elif pa.types.is_uint16(arrow_type):
return cls.uint16()
elif pa.types.is_uint32(arrow_type):
return cls.uint32()
elif pa.types.is_uint64(arrow_type):
return cls.uint64()
elif pa.types.is_float32(arrow_type):
return cls.float32()
elif pa.types.is_float64(arrow_type):
return cls.float64()
elif pa.types.is_string(arrow_type) or pa.types.is_large_string(arrow_type):
return cls.string()
elif pa.types.is_binary(arrow_type) or pa.types.is_large_binary(arrow_type):
return cls.binary()
elif pa.types.is_boolean(arrow_type):
return cls.bool()
elif pa.types.is_null(arrow_type):
return cls.null()
elif pa.types.is_date32(arrow_type):
return cls.date()
elif pa.types.is_list(arrow_type) or pa.types.is_large_list(arrow_type):
assert isinstance(arrow_type, (pa.ListType, pa.LargeListType))
field = arrow_type.value_field
return cls.list(field.name, cls.from_arrow_type(field.type))
elif pa.types.is_fixed_size_list(arrow_type):
assert isinstance(arrow_type, pa.FixedSizeListType)
field = arrow_type.value_field
return cls.fixed_size_list(field.name, cls.from_arrow_type(field.type), arrow_type.list_size)
elif pa.types.is_struct(arrow_type):
assert isinstance(arrow_type, pa.StructType)
fields = [arrow_type[i] for i in range(arrow_type.num_fields)]
return cls.struct({field.name: cls.from_arrow_type(field.type) for field in fields})
elif _RAY_DATA_EXTENSIONS_AVAILABLE and isinstance(arrow_type, tuple(_TENSOR_EXTENSION_TYPES)):
# TODO(Clark): Add a native cross-lang extension type representation for Ray's tensor extension types.
return cls.python()
elif isinstance(arrow_type, pa.PyExtensionType):
# TODO(Clark): Add a native cross-lang extension type representation for PyExtensionTypes.
raise ValueError(
"pyarrow extension types that subclass pa.PyExtensionType can't be used in Daft, since they can't be "
f"used in non-Python Arrow implementations and Daft uses the Rust Arrow2 implementation: {arrow_type}"
)
elif isinstance(arrow_type, pa.BaseExtensionType):
name = arrow_type.extension_name
if (get_context().runner_config.name == "ray") and (
type(arrow_type).__reduce__ == pa.BaseExtensionType.__reduce__
):
raise ValueError(
f"You are attempting to use a Extension Type: {arrow_type} with the default pyarrow `__reduce__` which breaks pickling for Extensions"
"To fix this, implement your own `__reduce__` on your extension type"
"For more details see this issue: "
"https://github.com/apache/arrow/issues/35599"
)
try:
metadata = arrow_type.__arrow_ext_serialize__().decode()
except AttributeError:
metadata = None
return cls.extension(
name,
cls.from_arrow_type(arrow_type.storage_type),
metadata,
)
else:
# Fall back to a Python object type.
# TODO(Clark): Add native support for remaining Arrow types.
return cls.python()
@classmethod
def from_numpy_dtype(cls, np_type) -> DataType:
"""Maps a Numpy datatype to a Daft DataType"""
arrow_type = pa.from_numpy_dtype(np_type)
return cls.from_arrow_type(arrow_type)
@classmethod
def python(cls) -> DataType:
"""Create a Python DataType: a type which refers to an arbitrary Python object"""
return cls._from_pydatatype(PyDataType.python())
def _is_python_type(self) -> builtins.bool:
# NOTE: This is currently used in a few places still. We can get rid of it once these are refactored away. To be discussed.
# 1. Visualizations - we can get rid of it if we do all our repr and repr_html logic in a Series instead of in Python
# 2. Hypothesis test data generation - we can get rid of it if we allow for creation of Series from a Python list and DataType
return self == DataType.python()
def __repr__(self) -> str:
return self._dtype.__repr__()
def __eq__(self, other: object) -> builtins.bool:
return isinstance(other, DataType) and self._dtype.is_equal(other._dtype)
def __getstate__(self) -> bytes:
return self._dtype.__getstate__()
def __setstate__(self, state: bytes) -> None:
self._dtype = PyDataType.__new__(PyDataType)
self._dtype.__setstate__(state)
def __hash__(self) -> int:
return self._dtype.__hash__()
Static methods#
binary#
def binary(
) -> 'DataType'
Create a Binary DataType: A string of bytes
View Source
@classmethod
def binary(cls) -> DataType:
"""Create a Binary DataType: A string of bytes"""
return cls._from_pydatatype(PyDataType.binary())
bool#
def bool(
) -> 'DataType'
Create the Boolean DataType: Either True or False
View Source
@classmethod
def bool(cls) -> DataType:
"""Create the Boolean DataType: Either ``True`` or ``False``"""
return cls._from_pydatatype(PyDataType.bool())
date#
def date(
) -> 'DataType'
Create a Date DataType: A date with a year, month and day
View Source
@classmethod
def date(cls) -> DataType:
"""Create a Date DataType: A date with a year, month and day"""
return cls._from_pydatatype(PyDataType.date())
embedding#
def embedding(
name: 'str',
dtype: 'DataType',
size: 'int'
) -> 'DataType'
Create an Embedding DataType: embeddings are fixed size arrays, where each element
in the array has a numeric dtype and each array has a fixed length of size.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
| dtype | None | DataType of each element in the list (must be numeric) | None |
| size | None | length of each list | None |
View Source
@classmethod
def embedding(cls, name: str, dtype: DataType, size: int) -> DataType:
"""Create an Embedding DataType: embeddings are fixed size arrays, where each element
in the array has a **numeric** ``dtype`` and each array has a fixed length of ``size``.
Args:
dtype: DataType of each element in the list (must be numeric)
size: length of each list
"""
if not isinstance(size, int) or size <= 0:
raise ValueError("The size for a embedding must be a positive integer, but got: ", size)
return cls._from_pydatatype(PyDataType.embedding(name, dtype._dtype, size))
extension#
def extension(
name: 'str',
storage_dtype: 'DataType',
metadata: 'str | None' = None
) -> 'DataType'
View Source
@classmethod
def extension(cls, name: str, storage_dtype: DataType, metadata: str | None = None) -> DataType:
return cls._from_pydatatype(PyDataType.extension(name, storage_dtype._dtype, metadata))
fixed_size_list#
def fixed_size_list(
name: 'str',
dtype: 'DataType',
size: 'int'
) -> 'DataType'
Create a FixedSizeList DataType: Fixed-size list, where each element in the list has type dtype
and each list has length size.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
| dtype | None | DataType of each element in the list | None |
| size | None | length of each list | None |
View Source
@classmethod
def fixed_size_list(cls, name: str, dtype: DataType, size: int) -> DataType:
"""Create a FixedSizeList DataType: Fixed-size list, where each element in the list has type ``dtype``
and each list has length ``size``.
Args:
dtype: DataType of each element in the list
size: length of each list
"""
if not isinstance(size, int) or size <= 0:
raise ValueError("The size for a fixed-size list must be a positive integer, but got: ", size)
return cls._from_pydatatype(PyDataType.fixed_size_list(name, dtype._dtype, size))
float32#
def float32(
) -> 'DataType'
Create a 32-bit float DataType
View Source
@classmethod
def float32(cls) -> DataType:
"""Create a 32-bit float DataType"""
return cls._from_pydatatype(PyDataType.float32())
float64#
def float64(
) -> 'DataType'
Create a 64-bit float DataType
View Source
@classmethod
def float64(cls) -> DataType:
"""Create a 64-bit float DataType"""
return cls._from_pydatatype(PyDataType.float64())
from_arrow_type#
def from_arrow_type(
arrow_type: 'pa.lib.DataType'
) -> 'DataType'
Maps a PyArrow DataType to a Daft DataType
View Source
@classmethod
def from_arrow_type(cls, arrow_type: pa.lib.DataType) -> DataType:
"""Maps a PyArrow DataType to a Daft DataType"""
if pa.types.is_int8(arrow_type):
return cls.int8()
elif pa.types.is_int16(arrow_type):
return cls.int16()
elif pa.types.is_int32(arrow_type):
return cls.int32()
elif pa.types.is_int64(arrow_type):
return cls.int64()
elif pa.types.is_uint8(arrow_type):
return cls.uint8()
elif pa.types.is_uint16(arrow_type):
return cls.uint16()
elif pa.types.is_uint32(arrow_type):
return cls.uint32()
elif pa.types.is_uint64(arrow_type):
return cls.uint64()
elif pa.types.is_float32(arrow_type):
return cls.float32()
elif pa.types.is_float64(arrow_type):
return cls.float64()
elif pa.types.is_string(arrow_type) or pa.types.is_large_string(arrow_type):
return cls.string()
elif pa.types.is_binary(arrow_type) or pa.types.is_large_binary(arrow_type):
return cls.binary()
elif pa.types.is_boolean(arrow_type):
return cls.bool()
elif pa.types.is_null(arrow_type):
return cls.null()
elif pa.types.is_date32(arrow_type):
return cls.date()
elif pa.types.is_list(arrow_type) or pa.types.is_large_list(arrow_type):
assert isinstance(arrow_type, (pa.ListType, pa.LargeListType))
field = arrow_type.value_field
return cls.list(field.name, cls.from_arrow_type(field.type))
elif pa.types.is_fixed_size_list(arrow_type):
assert isinstance(arrow_type, pa.FixedSizeListType)
field = arrow_type.value_field
return cls.fixed_size_list(field.name, cls.from_arrow_type(field.type), arrow_type.list_size)
elif pa.types.is_struct(arrow_type):
assert isinstance(arrow_type, pa.StructType)
fields = [arrow_type[i] for i in range(arrow_type.num_fields)]
return cls.struct({field.name: cls.from_arrow_type(field.type) for field in fields})
elif _RAY_DATA_EXTENSIONS_AVAILABLE and isinstance(arrow_type, tuple(_TENSOR_EXTENSION_TYPES)):
# TODO(Clark): Add a native cross-lang extension type representation for Ray's tensor extension types.
return cls.python()
elif isinstance(arrow_type, pa.PyExtensionType):
# TODO(Clark): Add a native cross-lang extension type representation for PyExtensionTypes.
raise ValueError(
"pyarrow extension types that subclass pa.PyExtensionType can't be used in Daft, since they can't be "
f"used in non-Python Arrow implementations and Daft uses the Rust Arrow2 implementation: {arrow_type}"
)
elif isinstance(arrow_type, pa.BaseExtensionType):
name = arrow_type.extension_name
if (get_context().runner_config.name == "ray") and (
type(arrow_type).__reduce__ == pa.BaseExtensionType.__reduce__
):
raise ValueError(
f"You are attempting to use a Extension Type: {arrow_type} with the default pyarrow `__reduce__` which breaks pickling for Extensions"
"To fix this, implement your own `__reduce__` on your extension type"
"For more details see this issue: "
"https://github.com/apache/arrow/issues/35599"
)
try:
metadata = arrow_type.__arrow_ext_serialize__().decode()
except AttributeError:
metadata = None
return cls.extension(
name,
cls.from_arrow_type(arrow_type.storage_type),
metadata,
)
else:
# Fall back to a Python object type.
# TODO(Clark): Add native support for remaining Arrow types.
return cls.python()
from_numpy_dtype#
def from_numpy_dtype(
np_type
) -> 'DataType'
Maps a Numpy datatype to a Daft DataType
View Source
@classmethod
def from_numpy_dtype(cls, np_type) -> DataType:
"""Maps a Numpy datatype to a Daft DataType"""
arrow_type = pa.from_numpy_dtype(np_type)
return cls.from_arrow_type(arrow_type)
image#
def image(
mode: 'str | ImageMode | None' = None,
height: 'int | None' = None,
width: 'int | None' = None
) -> 'DataType'
View Source
@classmethod
def image(
cls, mode: str | ImageMode | None = None, height: int | None = None, width: int | None = None
) -> DataType:
if isinstance(mode, str):
mode = ImageMode.from_mode_string(mode)
if height is not None and width is not None:
if not isinstance(height, int) or height <= 0:
raise ValueError("Image height must be a positive integer, but got: ", height)
if not isinstance(width, int) or width <= 0:
raise ValueError("Image width must be a positive integer, but got: ", width)
elif height is not None or width is not None:
raise ValueError(
f"Image height and width must either both be specified, or both not be specified, but got height={height}, width={width}"
)
return cls._from_pydatatype(PyDataType.image(mode, height, width))
int16#
def int16(
) -> 'DataType'
Create an 16-bit integer DataType
View Source
@classmethod
def int16(cls) -> DataType:
"""Create an 16-bit integer DataType"""
return cls._from_pydatatype(PyDataType.int16())
int32#
def int32(
) -> 'DataType'
Create an 32-bit integer DataType
View Source
@classmethod
def int32(cls) -> DataType:
"""Create an 32-bit integer DataType"""
return cls._from_pydatatype(PyDataType.int32())
int64#
def int64(
) -> 'DataType'
Create an 64-bit integer DataType
View Source
@classmethod
def int64(cls) -> DataType:
"""Create an 64-bit integer DataType"""
return cls._from_pydatatype(PyDataType.int64())
int8#
def int8(
) -> 'DataType'
Create an 8-bit integer DataType
View Source
@classmethod
def int8(cls) -> DataType:
"""Create an 8-bit integer DataType"""
return cls._from_pydatatype(PyDataType.int8())
list#
def list(
name: 'str',
dtype: 'DataType'
) -> 'DataType'
Create a List DataType: Variable-length list, where each element in the list has type dtype
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
| dtype | None | DataType of each element in the list | None |
View Source
@classmethod
def list(cls, name: str, dtype: DataType) -> DataType:
"""Create a List DataType: Variable-length list, where each element in the list has type ``dtype``
Args:
dtype: DataType of each element in the list
"""
return cls._from_pydatatype(PyDataType.list(name, dtype._dtype))
null#
def null(
) -> 'DataType'
Creates the Null DataType: Always the Null value
View Source
@classmethod
def null(cls) -> DataType:
"""Creates the Null DataType: Always the ``Null`` value"""
return cls._from_pydatatype(PyDataType.null())
python#
def python(
) -> 'DataType'
Create a Python DataType: a type which refers to an arbitrary Python object
View Source
@classmethod
def python(cls) -> DataType:
"""Create a Python DataType: a type which refers to an arbitrary Python object"""
return cls._from_pydatatype(PyDataType.python())
string#
def string(
) -> 'DataType'
Create a String DataType: A string of UTF8 characters
View Source
@classmethod
def string(cls) -> DataType:
"""Create a String DataType: A string of UTF8 characters"""
return cls._from_pydatatype(PyDataType.string())
struct#
def struct(
fields: 'dict[str, DataType]'
) -> 'DataType'
Create a Struct DataType: a nested type which has names mapped to child types
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
| fields | None | Nested fields of the Struct | None |
View Source
@classmethod
def struct(cls, fields: dict[str, DataType]) -> DataType:
"""Create a Struct DataType: a nested type which has names mapped to child types
Args:
fields: Nested fields of the Struct
"""
return cls._from_pydatatype(PyDataType.struct({name: datatype._dtype for name, datatype in fields.items()}))
uint16#
def uint16(
) -> 'DataType'
Create an unsigned 16-bit integer DataType
View Source
@classmethod
def uint16(cls) -> DataType:
"""Create an unsigned 16-bit integer DataType"""
return cls._from_pydatatype(PyDataType.uint16())
uint32#
def uint32(
) -> 'DataType'
Create an unsigned 32-bit integer DataType
View Source
@classmethod
def uint32(cls) -> DataType:
"""Create an unsigned 32-bit integer DataType"""
return cls._from_pydatatype(PyDataType.uint32())
uint64#
def uint64(
) -> 'DataType'
Create an unsigned 64-bit integer DataType
View Source
@classmethod
def uint64(cls) -> DataType:
"""Create an unsigned 64-bit integer DataType"""
return cls._from_pydatatype(PyDataType.uint64())
uint8#
def uint8(
) -> 'DataType'
Create an unsigned 8-bit integer DataType
View Source
@classmethod
def uint8(cls) -> DataType:
"""Create an unsigned 8-bit integer DataType"""
return cls._from_pydatatype(PyDataType.uint8())
Document#
class Document(
page_content: 'str',
metadata: 'dict' = <factory>
)
Interface for interacting with a document.
View Source
@dataclass
class Document:
"""Interface for interacting with a document."""
page_content: str
metadata: dict = field(default_factory=dict)
HFHubHelper#
class HFHubHelper(
)
View Source
class HFHubHelper:
def __init__(self):
self._hf_hub = None
try:
import huggingface_hub # noqa
self._hf_hub = huggingface_hub
except ImportError:
raise ImportError(
"Could not import huggingface_hub python package."
"Please install it with `pip install huggingface_hub`."
)
self.api = self._hf_hub.HfApi()
def create_repo(
self, repo_id: str, token: Optional[str] = None, *args, **kwargs
) -> str:
if token is None:
token = os.environ.get("HUGGINGFACEHUB_API_TOKEN")
return self.api.create_repo(
repo_id=repo_id,
token=token,
exist_ok=True,
repo_type="dataset",
*args,
**kwargs,
)
def upload(
self,
repo_id: str,
folder_path: str,
token: Optional[str] = None,
private: bool = True,
*args,
**kwargs,
) -> str:
if token is None:
token = os.environ.get("HUGGINGFACEHUB_API_TOKEN")
self.create_repo(repo_id, token, private=private)
return self.api.upload_folder(
repo_id=repo_id,
folder_path=folder_path,
token=token,
repo_type="dataset",
*args,
**kwargs,
)
def download(
self,
repo_id: str,
token: Optional[str] = None,
local_dir: Optional[str] = None,
*args,
**kwargs,
) -> str:
if token is None:
token = os.environ.get("HUGGINGFACEHUB_API_TOKEN")
return self._hf_hub.snapshot_download(
repo_id=repo_id,
token=token,
local_dir=local_dir,
repo_type="dataset",
*args,
**kwargs,
)
Methods#
create_repo#
def create_repo(
self,
repo_id: 'str',
token: 'Optional[str]' = None,
*args,
**kwargs
) -> 'str'
View Source
def create_repo(
self, repo_id: str, token: Optional[str] = None, *args, **kwargs
) -> str:
if token is None:
token = os.environ.get("HUGGINGFACEHUB_API_TOKEN")
return self.api.create_repo(
repo_id=repo_id,
token=token,
exist_ok=True,
repo_type="dataset",
*args,
**kwargs,
)
download#
def download(
self,
repo_id: 'str',
token: 'Optional[str]' = None,
local_dir: 'Optional[str]' = None,
*args,
**kwargs
) -> 'str'
View Source
def download(
self,
repo_id: str,
token: Optional[str] = None,
local_dir: Optional[str] = None,
*args,
**kwargs,
) -> str:
if token is None:
token = os.environ.get("HUGGINGFACEHUB_API_TOKEN")
return self._hf_hub.snapshot_download(
repo_id=repo_id,
token=token,
local_dir=local_dir,
repo_type="dataset",
*args,
**kwargs,
)
upload#
def upload(
self,
repo_id: 'str',
folder_path: 'str',
token: 'Optional[str]' = None,
private: 'bool' = True,
*args,
**kwargs
) -> 'str'
View Source
def upload(
self,
repo_id: str,
folder_path: str,
token: Optional[str] = None,
private: bool = True,
*args,
**kwargs,
) -> str:
if token is None:
token = os.environ.get("HUGGINGFACEHUB_API_TOKEN")
self.create_repo(repo_id, token, private=private)
return self.api.upload_folder(
repo_id=repo_id,
folder_path=folder_path,
token=token,
repo_type="dataset",
*args,
**kwargs,
)
ResourceRequest#
class ResourceRequest(
num_cpus: 'int | float | None' = None,
num_gpus: 'int | float | None' = None,
memory_bytes: 'int | float | None' = None
)
ResourceRequest(num_cpus: 'int | float | None' = None, num_gpus: 'int | float | None' = None, memory_bytes: 'int | float | None' = None)
View Source
@dataclasses.dataclass(frozen=True)
class ResourceRequest:
num_cpus: int | float | None = None
num_gpus: int | float | None = None
memory_bytes: int | float | None = None
@staticmethod
def max_resources(resource_requests: list[ResourceRequest]) -> ResourceRequest:
"""Gets the maximum of all resources in a list of ResourceRequests as a new ResourceRequest"""
return functools.reduce(
lambda acc, req: acc._max_for_each_resource(req),
resource_requests,
ResourceRequest(num_cpus=None, num_gpus=None, memory_bytes=None),
)
def _max_for_each_resource(self, other: ResourceRequest) -> ResourceRequest:
"""Get a new ResourceRequest that consists of the maximum requests for each resource"""
resource_names = [f.name for f in dataclasses.fields(ResourceRequest)]
max_resources = {}
for name in resource_names:
if getattr(self, name) is None:
max_resources[name] = getattr(other, name)
elif getattr(other, name) is None:
max_resources[name] = getattr(self, name)
else:
max_resources[name] = max(getattr(self, name), getattr(other, name))
return ResourceRequest(**max_resources)
def __add__(self, other: ResourceRequest) -> ResourceRequest:
return ResourceRequest(
num_cpus=add_optional_numeric(self.num_cpus, other.num_cpus),
num_gpus=add_optional_numeric(self.num_gpus, other.num_gpus),
memory_bytes=add_optional_numeric(self.memory_bytes, other.memory_bytes),
)
Class variables#
memory_bytes
num_cpus
num_gpus
Static methods#
max_resources#
def max_resources(
resource_requests: 'list[ResourceRequest]'
) -> 'ResourceRequest'
Gets the maximum of all resources in a list of ResourceRequests as a new ResourceRequest
View Source
@staticmethod
def max_resources(resource_requests: list[ResourceRequest]) -> ResourceRequest:
"""Gets the maximum of all resources in a list of ResourceRequests as a new ResourceRequest"""
return functools.reduce(
lambda acc, req: acc._max_for_each_resource(req),
resource_requests,
ResourceRequest(num_cpus=None, num_gpus=None, memory_bytes=None),
)