Skip to content

Module vexpresso.utils#

View Source
from __future__ import annotations

import inspect

import os

from collections.abc import Iterable

from dataclasses import dataclass, field

from functools import reduce, wraps

from typing import Any, Callable, Dict, List, Optional, Tuple

import daft

import numpy as np

ResourceRequest = daft.resource_request.ResourceRequest

DataType = daft.datatype.DataType

def get_batch_size(embeddings: Iterable[Any]) -> int:

    if isinstance(embeddings, np.ndarray):

        if len(embeddings.shape) == 1:

            return 1

    return len(embeddings)

# LANGCHAIN

@dataclass

class Document:

    """Interface for interacting with a document."""

    page_content: str

    metadata: dict = field(default_factory=dict)

def batchify_args(args, batch_size):

    if isinstance(args, Iterable) and not isinstance(args, str):

        if len(args) != batch_size:

            raise ValueError("ARG needs to be size batch size")

    else:

        args = [args for _ in range(batch_size)]

    return args

def lazy(default: bool = True):

    def dec(func):

        @wraps(func)

        def wrapper(*args, lazy=default, **kwargs):

            collection = func(*args, **kwargs)

            if not lazy:

                if isinstance(collection, list):

                    collection = [c.execute() for c in collection]

                else:

                    collection = collection.execute()

            return collection

        return wrapper

    return dec

def convert_args(*args):

    return [

        arg.to_pylist() if isinstance(arg, daft.series.Series) else arg for arg in args

    ]

def convert_kwargs(**kwargs):

    return {

        k: kwargs[k].to_pylist()

        if isinstance(kwargs[k], daft.series.Series)

        else kwargs[k]

        for k in kwargs

    }

# TODO: CHANGE TO ENUM

def transform_wrapper(

    original_transform: Transformation = None,

    datatype: DataType = DataType.python(),

    init_kwargs: Dict[str, Any] = {},

    function: str = "__call__",

):

    if inspect.isfunction(original_transform):

        def _decorate(function: Transformation):

            @wraps(function)

            def wrapped(*args, **kwargs):

                args = convert_args(*args)

                kwargs = convert_kwargs(**kwargs)

                return function(*args, **kwargs)

            wrapped.__signature__ = inspect.signature(function)

            _udf = daft.udf(return_dtype=datatype)(wrapped)

            _udf.__vexpresso_transform = True

            return _udf

        return _decorate(original_transform)

    else:

        if isinstance(original_transform, type):

            sig = inspect.signature(getattr(original_transform, function))

        else:

            sig = inspect.signature(getattr(original_transform.__class__, function))

        class _Transformation:

            def __init__(self):

                if isinstance(original_transform, type):

                    # hasn't been initialized yet

                    self._transform = original_transform(**init_kwargs)

                else:

                    self._transform = original_transform

            def __call__(self, *args, **kwargs):

                args = convert_args(*args)

                kwargs = convert_kwargs(**kwargs)

                return getattr(self._transform, function)(*args, **kwargs)

        _Transformation.__call__.__signature__ = sig

        _udf = daft.udf(return_dtype=datatype)(_Transformation)

        _udf.__vexpresso_transform = True

        return _udf

def transformation(

    original_function: Transformation = None,

    datatype: DataType = DataType.python(),

    init_kwargs={},

    function: str = "__call__",

):

    if getattr(original_function, "__vexpresso_transform", None) is None:

        wrapper = transform_wrapper(

            original_function,

            datatype=datatype,

            init_kwargs=init_kwargs,

            function=function,

        )

        return wrapper

    return original_function

def get_field_name_and_key(field: str, column_names: List[str] = []) -> Tuple[str, str]:

    if field in column_names:

        return field, None

    field_name = field.split(".")[0]

    keys = None

    if "." in field:

        keys = field.split(".", 1)[-1]

    return field_name, keys

def deep_get(dictionary, keys=None, default=None):

    if keys is None:

        return dictionary

    if isinstance(dictionary, dict):

        return reduce(

            lambda d, key: d.get(key, default) if isinstance(d, dict) else default,

            keys.split("."),

            dictionary,

        )

    return dictionary

class HFHubHelper:

    def __init__(self):

        self._hf_hub = None

        try:

            import huggingface_hub  # noqa

            self._hf_hub = huggingface_hub

        except ImportError:

            raise ImportError(

                "Could not import huggingface_hub python package."

                "Please install it with `pip install huggingface_hub`."

            )

        self.api = self._hf_hub.HfApi()

    def create_repo(

        self, repo_id: str, token: Optional[str] = None, *args, **kwargs

    ) -> str:

        if token is None:

            token = os.environ.get("HUGGINGFACEHUB_API_TOKEN")

        return self.api.create_repo(

            repo_id=repo_id,

            token=token,

            exist_ok=True,

            repo_type="dataset",

            *args,

            **kwargs,

        )

    def upload(

        self,

        repo_id: str,

        folder_path: str,

        token: Optional[str] = None,

        private: bool = True,

        *args,

        **kwargs,

    ) -> str:

        if token is None:

            token = os.environ.get("HUGGINGFACEHUB_API_TOKEN")

        self.create_repo(repo_id, token, private=private)

        return self.api.upload_folder(

            repo_id=repo_id,

            folder_path=folder_path,

            token=token,

            repo_type="dataset",

            *args,

            **kwargs,

        )

    def download(

        self,

        repo_id: str,

        token: Optional[str] = None,

        local_dir: Optional[str] = None,

        *args,

        **kwargs,

    ) -> str:

        if token is None:

            token = os.environ.get("HUGGINGFACEHUB_API_TOKEN")

        return self._hf_hub.snapshot_download(

            repo_id=repo_id,

            token=token,

            local_dir=local_dir,

            repo_type="dataset",

            *args,

            **kwargs,

        )

Transformation = Callable[[List[Any], Any], List[Any]]

Variables#

Transformation

Functions#

batchify_args#

def batchify_args(
    args,
    batch_size
)
View Source
def batchify_args(args, batch_size):

    if isinstance(args, Iterable) and not isinstance(args, str):

        if len(args) != batch_size:

            raise ValueError("ARG needs to be size batch size")

    else:

        args = [args for _ in range(batch_size)]

    return args

convert_args#

def convert_args(
    *args
)
View Source
def convert_args(*args):

    return [

        arg.to_pylist() if isinstance(arg, daft.series.Series) else arg for arg in args

    ]

convert_kwargs#

def convert_kwargs(
    **kwargs
)
View Source
def convert_kwargs(**kwargs):

    return {

        k: kwargs[k].to_pylist()

        if isinstance(kwargs[k], daft.series.Series)

        else kwargs[k]

        for k in kwargs

    }

deep_get#

def deep_get(
    dictionary,
    keys=None,
    default=None
)
View Source
def deep_get(dictionary, keys=None, default=None):

    if keys is None:

        return dictionary

    if isinstance(dictionary, dict):

        return reduce(

            lambda d, key: d.get(key, default) if isinstance(d, dict) else default,

            keys.split("."),

            dictionary,

        )

    return dictionary

get_batch_size#

def get_batch_size(
    embeddings: 'Iterable[Any]'
) -> 'int'
View Source
def get_batch_size(embeddings: Iterable[Any]) -> int:

    if isinstance(embeddings, np.ndarray):

        if len(embeddings.shape) == 1:

            return 1

    return len(embeddings)

get_field_name_and_key#

def get_field_name_and_key(
    field: 'str',
    column_names: 'List[str]' = []
) -> 'Tuple[str, str]'
View Source
def get_field_name_and_key(field: str, column_names: List[str] = []) -> Tuple[str, str]:

    if field in column_names:

        return field, None

    field_name = field.split(".")[0]

    keys = None

    if "." in field:

        keys = field.split(".", 1)[-1]

    return field_name, keys

lazy#

def lazy(
    default: 'bool' = True
)
View Source
def lazy(default: bool = True):

    def dec(func):

        @wraps(func)

        def wrapper(*args, lazy=default, **kwargs):

            collection = func(*args, **kwargs)

            if not lazy:

                if isinstance(collection, list):

                    collection = [c.execute() for c in collection]

                else:

                    collection = collection.execute()

            return collection

        return wrapper

    return dec

transform_wrapper#

def transform_wrapper(
    original_transform: 'Transformation' = None,
    datatype: 'DataType' = Python,
    init_kwargs: 'Dict[str, Any]' = {},
    function: 'str' = '__call__'
)
View Source
def transform_wrapper(

    original_transform: Transformation = None,

    datatype: DataType = DataType.python(),

    init_kwargs: Dict[str, Any] = {},

    function: str = "__call__",

):

    if inspect.isfunction(original_transform):

        def _decorate(function: Transformation):

            @wraps(function)

            def wrapped(*args, **kwargs):

                args = convert_args(*args)

                kwargs = convert_kwargs(**kwargs)

                return function(*args, **kwargs)

            wrapped.__signature__ = inspect.signature(function)

            _udf = daft.udf(return_dtype=datatype)(wrapped)

            _udf.__vexpresso_transform = True

            return _udf

        return _decorate(original_transform)

    else:

        if isinstance(original_transform, type):

            sig = inspect.signature(getattr(original_transform, function))

        else:

            sig = inspect.signature(getattr(original_transform.__class__, function))

        class _Transformation:

            def __init__(self):

                if isinstance(original_transform, type):

                    # hasn't been initialized yet

                    self._transform = original_transform(**init_kwargs)

                else:

                    self._transform = original_transform

            def __call__(self, *args, **kwargs):

                args = convert_args(*args)

                kwargs = convert_kwargs(**kwargs)

                return getattr(self._transform, function)(*args, **kwargs)

        _Transformation.__call__.__signature__ = sig

        _udf = daft.udf(return_dtype=datatype)(_Transformation)

        _udf.__vexpresso_transform = True

        return _udf

transformation#

def transformation(
    original_function: 'Transformation' = None,
    datatype: 'DataType' = Python,
    init_kwargs={},
    function: 'str' = '__call__'
)
View Source
def transformation(

    original_function: Transformation = None,

    datatype: DataType = DataType.python(),

    init_kwargs={},

    function: str = "__call__",

):

    if getattr(original_function, "__vexpresso_transform", None) is None:

        wrapper = transform_wrapper(

            original_function,

            datatype=datatype,

            init_kwargs=init_kwargs,

            function=function,

        )

        return wrapper

    return original_function

Classes#

DataType#

class DataType(

)

A Daft DataType defines the type of all the values in an Expression or DataFrame column

View Source
class DataType:

    """A Daft DataType defines the type of all the values in an Expression or DataFrame column"""

    _dtype: PyDataType

    def __init__(self) -> None:

        raise NotImplementedError(

            "We do not support creating a DataType via __init__ "

            "use a creator method like DataType.int32() or use DataType.from_arrow_type(pa_type)"

        )

    @staticmethod

    def _from_pydatatype(pydt: PyDataType) -> DataType:

        dt = DataType.__new__(DataType)

        dt._dtype = pydt

        return dt

    @classmethod

    def int8(cls) -> DataType:

        """Create an 8-bit integer DataType"""

        return cls._from_pydatatype(PyDataType.int8())

    @classmethod

    def int16(cls) -> DataType:

        """Create an 16-bit integer DataType"""

        return cls._from_pydatatype(PyDataType.int16())

    @classmethod

    def int32(cls) -> DataType:

        """Create an 32-bit integer DataType"""

        return cls._from_pydatatype(PyDataType.int32())

    @classmethod

    def int64(cls) -> DataType:

        """Create an 64-bit integer DataType"""

        return cls._from_pydatatype(PyDataType.int64())

    @classmethod

    def uint8(cls) -> DataType:

        """Create an unsigned 8-bit integer DataType"""

        return cls._from_pydatatype(PyDataType.uint8())

    @classmethod

    def uint16(cls) -> DataType:

        """Create an unsigned 16-bit integer DataType"""

        return cls._from_pydatatype(PyDataType.uint16())

    @classmethod

    def uint32(cls) -> DataType:

        """Create an unsigned 32-bit integer DataType"""

        return cls._from_pydatatype(PyDataType.uint32())

    @classmethod

    def uint64(cls) -> DataType:

        """Create an unsigned 64-bit integer DataType"""

        return cls._from_pydatatype(PyDataType.uint64())

    @classmethod

    def float32(cls) -> DataType:

        """Create a 32-bit float DataType"""

        return cls._from_pydatatype(PyDataType.float32())

    @classmethod

    def float64(cls) -> DataType:

        """Create a 64-bit float DataType"""

        return cls._from_pydatatype(PyDataType.float64())

    @classmethod

    def string(cls) -> DataType:

        """Create a String DataType: A string of UTF8 characters"""

        return cls._from_pydatatype(PyDataType.string())

    @classmethod

    def bool(cls) -> DataType:

        """Create the Boolean DataType: Either ``True`` or ``False``"""

        return cls._from_pydatatype(PyDataType.bool())

    @classmethod

    def binary(cls) -> DataType:

        """Create a Binary DataType: A string of bytes"""

        return cls._from_pydatatype(PyDataType.binary())

    @classmethod

    def null(cls) -> DataType:

        """Creates the Null DataType: Always the ``Null`` value"""

        return cls._from_pydatatype(PyDataType.null())

    @classmethod

    def date(cls) -> DataType:

        """Create a Date DataType: A date with a year, month and day"""

        return cls._from_pydatatype(PyDataType.date())

    @classmethod

    def list(cls, name: str, dtype: DataType) -> DataType:

        """Create a List DataType: Variable-length list, where each element in the list has type ``dtype``

        Args:

            dtype: DataType of each element in the list

        """

        return cls._from_pydatatype(PyDataType.list(name, dtype._dtype))

    @classmethod

    def fixed_size_list(cls, name: str, dtype: DataType, size: int) -> DataType:

        """Create a FixedSizeList DataType: Fixed-size list, where each element in the list has type ``dtype``

        and each list has length ``size``.

        Args:

            dtype: DataType of each element in the list

            size: length of each list

        """

        if not isinstance(size, int) or size <= 0:

            raise ValueError("The size for a fixed-size list must be a positive integer, but got: ", size)

        return cls._from_pydatatype(PyDataType.fixed_size_list(name, dtype._dtype, size))

    @classmethod

    def struct(cls, fields: dict[str, DataType]) -> DataType:

        """Create a Struct DataType: a nested type which has names mapped to child types

        Args:

            fields: Nested fields of the Struct

        """

        return cls._from_pydatatype(PyDataType.struct({name: datatype._dtype for name, datatype in fields.items()}))

    @classmethod

    def extension(cls, name: str, storage_dtype: DataType, metadata: str | None = None) -> DataType:

        return cls._from_pydatatype(PyDataType.extension(name, storage_dtype._dtype, metadata))

    @classmethod

    def embedding(cls, name: str, dtype: DataType, size: int) -> DataType:

        """Create an Embedding DataType: embeddings are fixed size arrays, where each element

        in the array has a **numeric** ``dtype`` and each array has a fixed length of ``size``.

        Args:

            dtype: DataType of each element in the list (must be numeric)

            size: length of each list

        """

        if not isinstance(size, int) or size <= 0:

            raise ValueError("The size for a embedding must be a positive integer, but got: ", size)

        return cls._from_pydatatype(PyDataType.embedding(name, dtype._dtype, size))

    @classmethod

    def image(

        cls, mode: str | ImageMode | None = None, height: int | None = None, width: int | None = None

    ) -> DataType:

        if isinstance(mode, str):

            mode = ImageMode.from_mode_string(mode)

        if height is not None and width is not None:

            if not isinstance(height, int) or height <= 0:

                raise ValueError("Image height must be a positive integer, but got: ", height)

            if not isinstance(width, int) or width <= 0:

                raise ValueError("Image width must be a positive integer, but got: ", width)

        elif height is not None or width is not None:

            raise ValueError(

                f"Image height and width must either both be specified, or both not be specified, but got height={height}, width={width}"

            )

        return cls._from_pydatatype(PyDataType.image(mode, height, width))

    @classmethod

    def from_arrow_type(cls, arrow_type: pa.lib.DataType) -> DataType:

        """Maps a PyArrow DataType to a Daft DataType"""

        if pa.types.is_int8(arrow_type):

            return cls.int8()

        elif pa.types.is_int16(arrow_type):

            return cls.int16()

        elif pa.types.is_int32(arrow_type):

            return cls.int32()

        elif pa.types.is_int64(arrow_type):

            return cls.int64()

        elif pa.types.is_uint8(arrow_type):

            return cls.uint8()

        elif pa.types.is_uint16(arrow_type):

            return cls.uint16()

        elif pa.types.is_uint32(arrow_type):

            return cls.uint32()

        elif pa.types.is_uint64(arrow_type):

            return cls.uint64()

        elif pa.types.is_float32(arrow_type):

            return cls.float32()

        elif pa.types.is_float64(arrow_type):

            return cls.float64()

        elif pa.types.is_string(arrow_type) or pa.types.is_large_string(arrow_type):

            return cls.string()

        elif pa.types.is_binary(arrow_type) or pa.types.is_large_binary(arrow_type):

            return cls.binary()

        elif pa.types.is_boolean(arrow_type):

            return cls.bool()

        elif pa.types.is_null(arrow_type):

            return cls.null()

        elif pa.types.is_date32(arrow_type):

            return cls.date()

        elif pa.types.is_list(arrow_type) or pa.types.is_large_list(arrow_type):

            assert isinstance(arrow_type, (pa.ListType, pa.LargeListType))

            field = arrow_type.value_field

            return cls.list(field.name, cls.from_arrow_type(field.type))

        elif pa.types.is_fixed_size_list(arrow_type):

            assert isinstance(arrow_type, pa.FixedSizeListType)

            field = arrow_type.value_field

            return cls.fixed_size_list(field.name, cls.from_arrow_type(field.type), arrow_type.list_size)

        elif pa.types.is_struct(arrow_type):

            assert isinstance(arrow_type, pa.StructType)

            fields = [arrow_type[i] for i in range(arrow_type.num_fields)]

            return cls.struct({field.name: cls.from_arrow_type(field.type) for field in fields})

        elif _RAY_DATA_EXTENSIONS_AVAILABLE and isinstance(arrow_type, tuple(_TENSOR_EXTENSION_TYPES)):

            # TODO(Clark): Add a native cross-lang extension type representation for Ray's tensor extension types.

            return cls.python()

        elif isinstance(arrow_type, pa.PyExtensionType):

            # TODO(Clark): Add a native cross-lang extension type representation for PyExtensionTypes.

            raise ValueError(

                "pyarrow extension types that subclass pa.PyExtensionType can't be used in Daft, since they can't be "

                f"used in non-Python Arrow implementations and Daft uses the Rust Arrow2 implementation: {arrow_type}"

            )

        elif isinstance(arrow_type, pa.BaseExtensionType):

            name = arrow_type.extension_name

            if (get_context().runner_config.name == "ray") and (

                type(arrow_type).__reduce__ == pa.BaseExtensionType.__reduce__

            ):

                raise ValueError(

                    f"You are attempting to use a Extension Type: {arrow_type} with the default pyarrow `__reduce__` which breaks pickling for Extensions"

                    "To fix this, implement your own `__reduce__` on your extension type"

                    "For more details see this issue: "

                    "https://github.com/apache/arrow/issues/35599"

                )

            try:

                metadata = arrow_type.__arrow_ext_serialize__().decode()

            except AttributeError:

                metadata = None

            return cls.extension(

                name,

                cls.from_arrow_type(arrow_type.storage_type),

                metadata,

            )

        else:

            # Fall back to a Python object type.

            # TODO(Clark): Add native support for remaining Arrow types.

            return cls.python()

    @classmethod

    def from_numpy_dtype(cls, np_type) -> DataType:

        """Maps a Numpy datatype to a Daft DataType"""

        arrow_type = pa.from_numpy_dtype(np_type)

        return cls.from_arrow_type(arrow_type)

    @classmethod

    def python(cls) -> DataType:

        """Create a Python DataType: a type which refers to an arbitrary Python object"""

        return cls._from_pydatatype(PyDataType.python())

    def _is_python_type(self) -> builtins.bool:

        # NOTE: This is currently used in a few places still. We can get rid of it once these are refactored away. To be discussed.

        # 1. Visualizations - we can get rid of it if we do all our repr and repr_html logic in a Series instead of in Python

        # 2. Hypothesis test data generation - we can get rid of it if we allow for creation of Series from a Python list and DataType

        return self == DataType.python()

    def __repr__(self) -> str:

        return self._dtype.__repr__()

    def __eq__(self, other: object) -> builtins.bool:

        return isinstance(other, DataType) and self._dtype.is_equal(other._dtype)

    def __getstate__(self) -> bytes:

        return self._dtype.__getstate__()

    def __setstate__(self, state: bytes) -> None:

        self._dtype = PyDataType.__new__(PyDataType)

        self._dtype.__setstate__(state)

    def __hash__(self) -> int:

        return self._dtype.__hash__()

Static methods#

binary#

def binary(

) -> 'DataType'

Create a Binary DataType: A string of bytes

View Source
    @classmethod

    def binary(cls) -> DataType:

        """Create a Binary DataType: A string of bytes"""

        return cls._from_pydatatype(PyDataType.binary())

bool#

def bool(

) -> 'DataType'

Create the Boolean DataType: Either True or False

View Source
    @classmethod

    def bool(cls) -> DataType:

        """Create the Boolean DataType: Either ``True`` or ``False``"""

        return cls._from_pydatatype(PyDataType.bool())

date#

def date(

) -> 'DataType'

Create a Date DataType: A date with a year, month and day

View Source
    @classmethod

    def date(cls) -> DataType:

        """Create a Date DataType: A date with a year, month and day"""

        return cls._from_pydatatype(PyDataType.date())

embedding#

def embedding(
    name: 'str',
    dtype: 'DataType',
    size: 'int'
) -> 'DataType'

Create an Embedding DataType: embeddings are fixed size arrays, where each element

in the array has a numeric dtype and each array has a fixed length of size.

Parameters:

Name Type Description Default
dtype None DataType of each element in the list (must be numeric) None
size None length of each list None
View Source
    @classmethod

    def embedding(cls, name: str, dtype: DataType, size: int) -> DataType:

        """Create an Embedding DataType: embeddings are fixed size arrays, where each element

        in the array has a **numeric** ``dtype`` and each array has a fixed length of ``size``.

        Args:

            dtype: DataType of each element in the list (must be numeric)

            size: length of each list

        """

        if not isinstance(size, int) or size <= 0:

            raise ValueError("The size for a embedding must be a positive integer, but got: ", size)

        return cls._from_pydatatype(PyDataType.embedding(name, dtype._dtype, size))

extension#

def extension(
    name: 'str',
    storage_dtype: 'DataType',
    metadata: 'str | None' = None
) -> 'DataType'
View Source
    @classmethod

    def extension(cls, name: str, storage_dtype: DataType, metadata: str | None = None) -> DataType:

        return cls._from_pydatatype(PyDataType.extension(name, storage_dtype._dtype, metadata))

fixed_size_list#

def fixed_size_list(
    name: 'str',
    dtype: 'DataType',
    size: 'int'
) -> 'DataType'

Create a FixedSizeList DataType: Fixed-size list, where each element in the list has type dtype

and each list has length size.

Parameters:

Name Type Description Default
dtype None DataType of each element in the list None
size None length of each list None
View Source
    @classmethod

    def fixed_size_list(cls, name: str, dtype: DataType, size: int) -> DataType:

        """Create a FixedSizeList DataType: Fixed-size list, where each element in the list has type ``dtype``

        and each list has length ``size``.

        Args:

            dtype: DataType of each element in the list

            size: length of each list

        """

        if not isinstance(size, int) or size <= 0:

            raise ValueError("The size for a fixed-size list must be a positive integer, but got: ", size)

        return cls._from_pydatatype(PyDataType.fixed_size_list(name, dtype._dtype, size))

float32#

def float32(

) -> 'DataType'

Create a 32-bit float DataType

View Source
    @classmethod

    def float32(cls) -> DataType:

        """Create a 32-bit float DataType"""

        return cls._from_pydatatype(PyDataType.float32())

float64#

def float64(

) -> 'DataType'

Create a 64-bit float DataType

View Source
    @classmethod

    def float64(cls) -> DataType:

        """Create a 64-bit float DataType"""

        return cls._from_pydatatype(PyDataType.float64())

from_arrow_type#

def from_arrow_type(
    arrow_type: 'pa.lib.DataType'
) -> 'DataType'

Maps a PyArrow DataType to a Daft DataType

View Source
    @classmethod

    def from_arrow_type(cls, arrow_type: pa.lib.DataType) -> DataType:

        """Maps a PyArrow DataType to a Daft DataType"""

        if pa.types.is_int8(arrow_type):

            return cls.int8()

        elif pa.types.is_int16(arrow_type):

            return cls.int16()

        elif pa.types.is_int32(arrow_type):

            return cls.int32()

        elif pa.types.is_int64(arrow_type):

            return cls.int64()

        elif pa.types.is_uint8(arrow_type):

            return cls.uint8()

        elif pa.types.is_uint16(arrow_type):

            return cls.uint16()

        elif pa.types.is_uint32(arrow_type):

            return cls.uint32()

        elif pa.types.is_uint64(arrow_type):

            return cls.uint64()

        elif pa.types.is_float32(arrow_type):

            return cls.float32()

        elif pa.types.is_float64(arrow_type):

            return cls.float64()

        elif pa.types.is_string(arrow_type) or pa.types.is_large_string(arrow_type):

            return cls.string()

        elif pa.types.is_binary(arrow_type) or pa.types.is_large_binary(arrow_type):

            return cls.binary()

        elif pa.types.is_boolean(arrow_type):

            return cls.bool()

        elif pa.types.is_null(arrow_type):

            return cls.null()

        elif pa.types.is_date32(arrow_type):

            return cls.date()

        elif pa.types.is_list(arrow_type) or pa.types.is_large_list(arrow_type):

            assert isinstance(arrow_type, (pa.ListType, pa.LargeListType))

            field = arrow_type.value_field

            return cls.list(field.name, cls.from_arrow_type(field.type))

        elif pa.types.is_fixed_size_list(arrow_type):

            assert isinstance(arrow_type, pa.FixedSizeListType)

            field = arrow_type.value_field

            return cls.fixed_size_list(field.name, cls.from_arrow_type(field.type), arrow_type.list_size)

        elif pa.types.is_struct(arrow_type):

            assert isinstance(arrow_type, pa.StructType)

            fields = [arrow_type[i] for i in range(arrow_type.num_fields)]

            return cls.struct({field.name: cls.from_arrow_type(field.type) for field in fields})

        elif _RAY_DATA_EXTENSIONS_AVAILABLE and isinstance(arrow_type, tuple(_TENSOR_EXTENSION_TYPES)):

            # TODO(Clark): Add a native cross-lang extension type representation for Ray's tensor extension types.

            return cls.python()

        elif isinstance(arrow_type, pa.PyExtensionType):

            # TODO(Clark): Add a native cross-lang extension type representation for PyExtensionTypes.

            raise ValueError(

                "pyarrow extension types that subclass pa.PyExtensionType can't be used in Daft, since they can't be "

                f"used in non-Python Arrow implementations and Daft uses the Rust Arrow2 implementation: {arrow_type}"

            )

        elif isinstance(arrow_type, pa.BaseExtensionType):

            name = arrow_type.extension_name

            if (get_context().runner_config.name == "ray") and (

                type(arrow_type).__reduce__ == pa.BaseExtensionType.__reduce__

            ):

                raise ValueError(

                    f"You are attempting to use a Extension Type: {arrow_type} with the default pyarrow `__reduce__` which breaks pickling for Extensions"

                    "To fix this, implement your own `__reduce__` on your extension type"

                    "For more details see this issue: "

                    "https://github.com/apache/arrow/issues/35599"

                )

            try:

                metadata = arrow_type.__arrow_ext_serialize__().decode()

            except AttributeError:

                metadata = None

            return cls.extension(

                name,

                cls.from_arrow_type(arrow_type.storage_type),

                metadata,

            )

        else:

            # Fall back to a Python object type.

            # TODO(Clark): Add native support for remaining Arrow types.

            return cls.python()

from_numpy_dtype#

def from_numpy_dtype(
    np_type
) -> 'DataType'

Maps a Numpy datatype to a Daft DataType

View Source
    @classmethod

    def from_numpy_dtype(cls, np_type) -> DataType:

        """Maps a Numpy datatype to a Daft DataType"""

        arrow_type = pa.from_numpy_dtype(np_type)

        return cls.from_arrow_type(arrow_type)

image#

def image(
    mode: 'str | ImageMode | None' = None,
    height: 'int | None' = None,
    width: 'int | None' = None
) -> 'DataType'
View Source
    @classmethod

    def image(

        cls, mode: str | ImageMode | None = None, height: int | None = None, width: int | None = None

    ) -> DataType:

        if isinstance(mode, str):

            mode = ImageMode.from_mode_string(mode)

        if height is not None and width is not None:

            if not isinstance(height, int) or height <= 0:

                raise ValueError("Image height must be a positive integer, but got: ", height)

            if not isinstance(width, int) or width <= 0:

                raise ValueError("Image width must be a positive integer, but got: ", width)

        elif height is not None or width is not None:

            raise ValueError(

                f"Image height and width must either both be specified, or both not be specified, but got height={height}, width={width}"

            )

        return cls._from_pydatatype(PyDataType.image(mode, height, width))

int16#

def int16(

) -> 'DataType'

Create an 16-bit integer DataType

View Source
    @classmethod

    def int16(cls) -> DataType:

        """Create an 16-bit integer DataType"""

        return cls._from_pydatatype(PyDataType.int16())

int32#

def int32(

) -> 'DataType'

Create an 32-bit integer DataType

View Source
    @classmethod

    def int32(cls) -> DataType:

        """Create an 32-bit integer DataType"""

        return cls._from_pydatatype(PyDataType.int32())

int64#

def int64(

) -> 'DataType'

Create an 64-bit integer DataType

View Source
    @classmethod

    def int64(cls) -> DataType:

        """Create an 64-bit integer DataType"""

        return cls._from_pydatatype(PyDataType.int64())

int8#

def int8(

) -> 'DataType'

Create an 8-bit integer DataType

View Source
    @classmethod

    def int8(cls) -> DataType:

        """Create an 8-bit integer DataType"""

        return cls._from_pydatatype(PyDataType.int8())

list#

def list(
    name: 'str',
    dtype: 'DataType'
) -> 'DataType'

Create a List DataType: Variable-length list, where each element in the list has type dtype

Parameters:

Name Type Description Default
dtype None DataType of each element in the list None
View Source
    @classmethod

    def list(cls, name: str, dtype: DataType) -> DataType:

        """Create a List DataType: Variable-length list, where each element in the list has type ``dtype``

        Args:

            dtype: DataType of each element in the list

        """

        return cls._from_pydatatype(PyDataType.list(name, dtype._dtype))

null#

def null(

) -> 'DataType'

Creates the Null DataType: Always the Null value

View Source
    @classmethod

    def null(cls) -> DataType:

        """Creates the Null DataType: Always the ``Null`` value"""

        return cls._from_pydatatype(PyDataType.null())

python#

def python(

) -> 'DataType'

Create a Python DataType: a type which refers to an arbitrary Python object

View Source
    @classmethod

    def python(cls) -> DataType:

        """Create a Python DataType: a type which refers to an arbitrary Python object"""

        return cls._from_pydatatype(PyDataType.python())

string#

def string(

) -> 'DataType'

Create a String DataType: A string of UTF8 characters

View Source
    @classmethod

    def string(cls) -> DataType:

        """Create a String DataType: A string of UTF8 characters"""

        return cls._from_pydatatype(PyDataType.string())

struct#

def struct(
    fields: 'dict[str, DataType]'
) -> 'DataType'

Create a Struct DataType: a nested type which has names mapped to child types

Parameters:

Name Type Description Default
fields None Nested fields of the Struct None
View Source
    @classmethod

    def struct(cls, fields: dict[str, DataType]) -> DataType:

        """Create a Struct DataType: a nested type which has names mapped to child types

        Args:

            fields: Nested fields of the Struct

        """

        return cls._from_pydatatype(PyDataType.struct({name: datatype._dtype for name, datatype in fields.items()}))

uint16#

def uint16(

) -> 'DataType'

Create an unsigned 16-bit integer DataType

View Source
    @classmethod

    def uint16(cls) -> DataType:

        """Create an unsigned 16-bit integer DataType"""

        return cls._from_pydatatype(PyDataType.uint16())

uint32#

def uint32(

) -> 'DataType'

Create an unsigned 32-bit integer DataType

View Source
    @classmethod

    def uint32(cls) -> DataType:

        """Create an unsigned 32-bit integer DataType"""

        return cls._from_pydatatype(PyDataType.uint32())

uint64#

def uint64(

) -> 'DataType'

Create an unsigned 64-bit integer DataType

View Source
    @classmethod

    def uint64(cls) -> DataType:

        """Create an unsigned 64-bit integer DataType"""

        return cls._from_pydatatype(PyDataType.uint64())

uint8#

def uint8(

) -> 'DataType'

Create an unsigned 8-bit integer DataType

View Source
    @classmethod

    def uint8(cls) -> DataType:

        """Create an unsigned 8-bit integer DataType"""

        return cls._from_pydatatype(PyDataType.uint8())

Document#

class Document(
    page_content: 'str',
    metadata: 'dict' = <factory>
)

Interface for interacting with a document.

View Source
@dataclass

class Document:

    """Interface for interacting with a document."""

    page_content: str

    metadata: dict = field(default_factory=dict)

HFHubHelper#

class HFHubHelper(

)
View Source
class HFHubHelper:

    def __init__(self):

        self._hf_hub = None

        try:

            import huggingface_hub  # noqa

            self._hf_hub = huggingface_hub

        except ImportError:

            raise ImportError(

                "Could not import huggingface_hub python package."

                "Please install it with `pip install huggingface_hub`."

            )

        self.api = self._hf_hub.HfApi()

    def create_repo(

        self, repo_id: str, token: Optional[str] = None, *args, **kwargs

    ) -> str:

        if token is None:

            token = os.environ.get("HUGGINGFACEHUB_API_TOKEN")

        return self.api.create_repo(

            repo_id=repo_id,

            token=token,

            exist_ok=True,

            repo_type="dataset",

            *args,

            **kwargs,

        )

    def upload(

        self,

        repo_id: str,

        folder_path: str,

        token: Optional[str] = None,

        private: bool = True,

        *args,

        **kwargs,

    ) -> str:

        if token is None:

            token = os.environ.get("HUGGINGFACEHUB_API_TOKEN")

        self.create_repo(repo_id, token, private=private)

        return self.api.upload_folder(

            repo_id=repo_id,

            folder_path=folder_path,

            token=token,

            repo_type="dataset",

            *args,

            **kwargs,

        )

    def download(

        self,

        repo_id: str,

        token: Optional[str] = None,

        local_dir: Optional[str] = None,

        *args,

        **kwargs,

    ) -> str:

        if token is None:

            token = os.environ.get("HUGGINGFACEHUB_API_TOKEN")

        return self._hf_hub.snapshot_download(

            repo_id=repo_id,

            token=token,

            local_dir=local_dir,

            repo_type="dataset",

            *args,

            **kwargs,

        )

Methods#

create_repo#

def create_repo(
    self,
    repo_id: 'str',
    token: 'Optional[str]' = None,
    *args,
    **kwargs
) -> 'str'
View Source
    def create_repo(

        self, repo_id: str, token: Optional[str] = None, *args, **kwargs

    ) -> str:

        if token is None:

            token = os.environ.get("HUGGINGFACEHUB_API_TOKEN")

        return self.api.create_repo(

            repo_id=repo_id,

            token=token,

            exist_ok=True,

            repo_type="dataset",

            *args,

            **kwargs,

        )

download#

def download(
    self,
    repo_id: 'str',
    token: 'Optional[str]' = None,
    local_dir: 'Optional[str]' = None,
    *args,
    **kwargs
) -> 'str'
View Source
    def download(

        self,

        repo_id: str,

        token: Optional[str] = None,

        local_dir: Optional[str] = None,

        *args,

        **kwargs,

    ) -> str:

        if token is None:

            token = os.environ.get("HUGGINGFACEHUB_API_TOKEN")

        return self._hf_hub.snapshot_download(

            repo_id=repo_id,

            token=token,

            local_dir=local_dir,

            repo_type="dataset",

            *args,

            **kwargs,

        )

upload#

def upload(
    self,
    repo_id: 'str',
    folder_path: 'str',
    token: 'Optional[str]' = None,
    private: 'bool' = True,
    *args,
    **kwargs
) -> 'str'
View Source
    def upload(

        self,

        repo_id: str,

        folder_path: str,

        token: Optional[str] = None,

        private: bool = True,

        *args,

        **kwargs,

    ) -> str:

        if token is None:

            token = os.environ.get("HUGGINGFACEHUB_API_TOKEN")

        self.create_repo(repo_id, token, private=private)

        return self.api.upload_folder(

            repo_id=repo_id,

            folder_path=folder_path,

            token=token,

            repo_type="dataset",

            *args,

            **kwargs,

        )

ResourceRequest#

class ResourceRequest(
    num_cpus: 'int | float | None' = None,
    num_gpus: 'int | float | None' = None,
    memory_bytes: 'int | float | None' = None
)

ResourceRequest(num_cpus: 'int | float | None' = None, num_gpus: 'int | float | None' = None, memory_bytes: 'int | float | None' = None)

View Source
@dataclasses.dataclass(frozen=True)

class ResourceRequest:

    num_cpus: int | float | None = None

    num_gpus: int | float | None = None

    memory_bytes: int | float | None = None

    @staticmethod

    def max_resources(resource_requests: list[ResourceRequest]) -> ResourceRequest:

        """Gets the maximum of all resources in a list of ResourceRequests as a new ResourceRequest"""

        return functools.reduce(

            lambda acc, req: acc._max_for_each_resource(req),

            resource_requests,

            ResourceRequest(num_cpus=None, num_gpus=None, memory_bytes=None),

        )

    def _max_for_each_resource(self, other: ResourceRequest) -> ResourceRequest:

        """Get a new ResourceRequest that consists of the maximum requests for each resource"""

        resource_names = [f.name for f in dataclasses.fields(ResourceRequest)]

        max_resources = {}

        for name in resource_names:

            if getattr(self, name) is None:

                max_resources[name] = getattr(other, name)

            elif getattr(other, name) is None:

                max_resources[name] = getattr(self, name)

            else:

                max_resources[name] = max(getattr(self, name), getattr(other, name))

        return ResourceRequest(**max_resources)

    def __add__(self, other: ResourceRequest) -> ResourceRequest:

        return ResourceRequest(

            num_cpus=add_optional_numeric(self.num_cpus, other.num_cpus),

            num_gpus=add_optional_numeric(self.num_gpus, other.num_gpus),

            memory_bytes=add_optional_numeric(self.memory_bytes, other.memory_bytes),

        )

Class variables#

memory_bytes
num_cpus
num_gpus

Static methods#

max_resources#

def max_resources(
    resource_requests: 'list[ResourceRequest]'
) -> 'ResourceRequest'

Gets the maximum of all resources in a list of ResourceRequests as a new ResourceRequest

View Source
    @staticmethod

    def max_resources(resource_requests: list[ResourceRequest]) -> ResourceRequest:

        """Gets the maximum of all resources in a list of ResourceRequests as a new ResourceRequest"""

        return functools.reduce(

            lambda acc, req: acc._max_for_each_resource(req),

            resource_requests,

            ResourceRequest(num_cpus=None, num_gpus=None, memory_bytes=None),

        )