Skip to content

Module vexpresso.daft.collection#

View Source
from __future__ import annotations

import json

import os

from typing import Any, Dict, Iterable, List, Optional, Union

import daft

import pandas as pd

import pyarrow.parquet as pq

from daft import col

from vexpresso.collection import Collection

from vexpresso.daft.filter import FilterHelper

from vexpresso.daft.utils import Wrapper, indices, retrieve

from vexpresso.embedding_functions import get_embedding_fn

from vexpresso.retrievers import BaseRetriever, Retriever

from vexpresso.utils import (

    DataType,

    Document,

    ResourceRequest,

    Transformation,

    lazy,

    transformation,

)

class DaftCollection(Collection):

    def __init__(

        self,

        data: Optional[Union[str, pd.DataFrame, Dict[str, Any]]] = None,

        retriever: BaseRetriever = Retriever(),

        embeddings: Optional[List[Any]] = None,

        embedding_functions: Dict[str, Any] = {},

        daft_df: Optional[daft.DataFrame] = None,

        lazy: bool = False,

    ):

        self.daft_df = daft_df

        self.retriever = retriever

        self.embedding_functions = embedding_functions

        _metadata = {}

        if data is not None:

            if isinstance(data, str):

                if data.endswith(".json"):

                    with open(data, "r") as f:

                        _metadata = pd.DataFrame(json.load(f))

            elif isinstance(data, pd.DataFrame):

                _metadata = data.to_dict("list")

            else:

                _metadata = data

        if daft_df is None and len(_metadata) > 0:

            if isinstance(_metadata, list):

                self.daft_df = daft.from_pylist(_metadata)

            else:

                self.daft_df = daft.from_pydict({**_metadata})

            if embeddings is not None:

                self.daft_df = self.add_column("embeddings", embeddings).daft_df

            if not lazy:

                self.daft_df = self.daft_df.collect()

    def __repr__(self) -> str:

        return self.daft_df.__repr__()

    @property

    def on_df(self) -> Wrapper:

        return Wrapper(self)

    @property

    def df(self) -> daft.DataFrame:

        return self.daft_df

    def __len__(self) -> int:

        return self.daft_df.count_rows()

    def __getitem__(self, column: str) -> DaftCollection:

        return self.select(column)

    def cast(

        self, column: str = None, datatype: DataType = DataType.python()

    ) -> DaftCollection:

        if column is None:

            columns = [col(c).cast(datatype) for c in self.column_names]

        else:

            columns = []

            for c in self.column_names:

                if c == column:

                    columns.append(col(column).cast(datatype))

                else:

                    columns.append(c)

        return self.from_daft_df(self.daft_df.select(*columns))

    def add_rows(self, entries: List[Dict[str, Any]]) -> DaftCollection:

        dic = self.to_dict()

        for k in dic:

            for d in entries:

                value = d.get(k, None)

                dic[k].append(value)

        return self.from_data(dic)

    def add(self, entries: List[Dict[str, Any]]) -> DaftCollection:

        return self.add_row(entries)

    def set_embedding_function(self, column: str, embedding_function: Transformation):

        self.embedding_functions[column] = embedding_function

    @property

    def column_names(self) -> List[str]:

        return self.daft_df.column_names

    def from_daft_df(self, df: daft.DataFrame) -> DaftCollection:

        return DaftCollection(

            retriever=self.retriever,

            embedding_functions=self.embedding_functions,

            daft_df=df,

        )

    def from_data(self, data: Any) -> DaftCollection:

        return DaftCollection(

            data=data,

            retriever=self.retriever,

            embedding_functions=self.embedding_functions,

        )

    def collect(self, in_place: bool = False):

        if in_place:

            self.daft_df = self.daft_df.collect(num_preview_rows=None)

            return self

        return self.from_daft_df(self.daft_df.collect(num_preview_rows=None))

    def execute(self) -> DaftCollection:

        return self.collect()

    def to_pandas(self) -> pd.DataFrame:

        collection = self.execute()

        return collection.daft_df.to_pandas()

    def to_dict(self) -> Dict[str, List[Any]]:

        collection = self.execute()

        return collection.daft_df.to_pydict()

    def to_list(self) -> List[Any]:

        collection = self.execute()

        values = list(collection.daft_df.to_pydict().values())

        if len(values) == 1:

            return values[0]

        return values

    def show(self, num_rows: Optional[int] = None):

        if num_rows is None:

            return self.daft_df.show(self.__len__())

        return self.daft_df.show(num_rows)

    @lazy(default=True)

    def iloc(self, idx: Union[int, Iterable[int]]) -> DaftCollection:

        # for some reason this is super slow

        if isinstance(idx, int):

            idx = [idx]

        collection = (

            self.on_df.with_column(

                "_vexpresso_index", indices(col(self.column_names[0]))

            )

            .filter({"_vexpresso_index": {"isin": idx}})

            .exclude("_vexpresso_index")

        )

        return collection

    @lazy(default=True)

    def rename(self, columns: Dict[str, str]) -> DaftCollection:

        expressions = []

        for column in self.column_names:

            if column in columns:

                expressions.append(col(column).alias(columns[column]))

            else:

                expressions.append(col(column))

        return self.on_df.select(*expressions)

    @lazy(default=True)

    def agg(self, *args, **kwargs) -> DaftCollection:

        return self.from_df(self.daft_df.agg(*args, **kwargs))

    @lazy(default=True)

    def add_column(self, name: str, column: List[Any]) -> DaftCollection:

        df = self.df

        if name in self.column_names:

            df = df.exclude(name)

        df = df.with_column("_vexpresso_index", indices(col(self.column_names[0])))

        second_df = daft.from_pydict(

            {name: column, "_vexpresso_index": list(range(len(self)))}

        )

        df = df.join(second_df, on="_vexpresso_index").exclude("_vexpresso_index")

        return self.from_daft_df(df)

    @lazy(default=True)

    def sort(self, column, desc=True) -> DaftCollection:

        return self.from_daft_df(self.daft_df.sort(col(column), desc=desc))

    def embed_query(

        self,

        query: Any,

        embedding_column_name: Optional[str] = None,

        embedding_fn: Optional[Transformation] = None,

        resource_request=ResourceRequest(),

        *args,

        **kwargs,

    ) -> Any:

        return self.embed_queries(

            queries=[query],

            embedding_column_name=embedding_column_name,

            embedding_fn=embedding_fn,

            resource_request=resource_request,

            *args,

            **kwargs,

        )[0]

    def embed_queries(

        self,

        queries: List[Any],

        embedding_column_name: Optional[str] = None,

        embedding_fn: Optional[Transformation] = None,

        resource_request=ResourceRequest(),

        *args,

        **kwargs,

    ) -> Any:

        if embedding_fn is None:

            if embedding_column_name is None:

                raise ValueError("Column name must be provided if embedding_fn is None")

            embedding_fn = self.embedding_functions[embedding_column_name]

        elif isinstance(embedding_fn, str):

            embedding_fn = self.embedding_functions[embedding_fn]

        query_embeddings = (

            daft.from_pydict({"queries": queries})

            .with_column(

                "query_embeddings",

                embedding_fn(col("queries"), *args, **kwargs),

                resource_request=resource_request,

            )

            .select("query_embeddings")

            .collect()

            .to_pydict()["query_embeddings"]

        )

        return query_embeddings

    @lazy(default=True)

    def query(

        self,

        column: str,

        query: List[Any] = None,

        query_embedding: List[Any] = None,

        filter_conditions: Optional[Dict[str, Dict[str, str]]] = None,

        k: int = None,

        sort: bool = True,

        embedding_fn: Optional[Transformation] = None,

        return_scores: bool = False,

        score_column_name: Optional[str] = None,

        resource_request: ResourceRequest = ResourceRequest(),

        retriever: Optional[BaseRetriever] = None,

        *args,

        **kwargs,

    ) -> Collection:

        if query is not None:

            query = [query]

        if query_embedding is not None:

            query_embedding = [query_embedding]

        return self.batch_query(

            column=column,

            queries=query,

            query_embeddings=query_embedding,

            filter_conditions=filter_conditions,

            k=k,

            sort=sort,

            embedding_fn=embedding_fn,

            return_scores=return_scores,

            score_column_name=score_column_name,

            resource_request=resource_request,

            retriever=retriever,

            *args,

            **kwargs,

        )[0]

    @lazy(default=True)

    def batch_query(

        self,

        column: str,

        queries: List[Any] = None,

        query_embeddings: List[Any] = None,

        filter_conditions: Optional[Dict[str, Dict[str, str]]] = None,

        k: int = None,

        sort: bool = True,

        embedding_fn: Optional[Union[Transformation, str]] = None,

        return_scores: bool = False,

        score_column_name: Optional[str] = None,

        resource_request: ResourceRequest = ResourceRequest(),

        retriever: Optional[BaseRetriever] = None,

        *args,

        **kwargs,

    ) -> List[Collection]:

        batch_size = len(queries) if query_embeddings is None else len(query_embeddings)

        if embedding_fn is not None:

            if isinstance(embedding_fn, str):

                embedding_fn = self.embedding_functions[embedding_fn]

            else:

                if column in self.embedding_functions:

                    if embedding_fn != self.embedding_functions[column]:

                        print(

                            "embedding_fn may not be the same as whats in map! Updating what's in map..."

                        )

                self.embedding_functions[column] = get_embedding_fn(embedding_fn)

                embedding_fn = self.embedding_functions[column]

        if query_embeddings is None:

            query_embeddings = self.embed_queries(

                queries,

                column,

                embedding_fn,

                resource_request,

                *args,

                **kwargs,

            )

        if retriever is None:

            retriever = self.retriever

        if k is None:

            k = self.__len__()

        dfs = retrieve(

            batch_size,

            self.daft_df,

            column,

            query_embeddings,

            retriever,

            k,

            sort,

            return_scores,

            score_column_name,

            resource_request,

        )

        for i in range(len(dfs)):

            if filter_conditions is not None:

                dfs[i] = FilterHelper.filter(dfs[i], filter_conditions)

        return [self.from_daft_df(df) for df in dfs]

    @lazy(default=True)

    def select(

        self,

        *args,

    ) -> DaftCollection:

        return self.from_daft_df(FilterHelper.select(self.daft_df, *args))

    @lazy(default=True)

    def exclude(

        self,

        *args,

    ) -> DaftCollection:

        return self.from_daft_df(self.daft_df.exclude(*args))

    @lazy(default=True)

    def filter(

        self, filter_conditions: Dict[str, Dict[str, str]], *args, **kwargs

    ) -> DaftCollection:

        return self.from_daft_df(

            FilterHelper.filter(self.daft_df, filter_conditions, *args, **kwargs)

        )

    @lazy(default=True)

    def apply(

        self,

        transform_fn: Transformation,

        column: DaftCollection,

        *args,

        to: Optional[str] = None,

        resource_request: ResourceRequest = ResourceRequest(),

        datatype: DataType = DataType.python(),

        init_kwargs: Dict[str, Any] = {},

        function: str = "__call__",

        **kwargs,

    ) -> DaftCollection:

        transform_fn = transformation(

            transform_fn, datatype=datatype, init_kwargs=init_kwargs, function=function

        )

        if not isinstance(column, DaftCollection):

            raise TypeError(

                "first args in apply must be a DaftCollection! use `collection['column_name']`"

            )

        collection = self

        args = [column, *args]

        _args = []

        for _arg in args:

            if isinstance(_arg, DaftCollection):

                if len(_arg.column_names) > 1:

                    raise ValueError(

                        "When passing in a Daft collection into `embed`, they must only have 1 column!"

                    )

                column_name = _arg.column_names[0]

                if column_name not in collection.column_names:

                    content = _arg.select(column_name).to_dict()[column_name]

                    collection = collection.add_column(column_name, content)

                _args.append(col(column_name))

            else:

                _args.append(_arg)

        _kwargs = {}

        for k in kwargs:

            _kwargs[k] = kwargs[k]

            if isinstance(_kwargs[k], DaftCollection):

                # only support first column

                column = _kwargs[k].daft_df.columns[0]

                _kwargs[k] = column

        if to is None:

            to = f"tranformed_{_args[0].name()}"

        return collection.on_df.with_column(

            to, transform_fn(*_args, **_kwargs), resource_request=resource_request

        )

    @lazy(default=True)

    def embed(

        self,

        column: Union[DaftCollection, List[Any], str],

        *args,

        embedding_fn: Optional[Transformation] = None,

        to: Optional[str] = None,

        resource_request: ResourceRequest = ResourceRequest(),

        datatype: DataType = DataType.python(),

        init_kwargs: Dict[str, Any] = {},

        **kwargs,

    ) -> DaftCollection:

        collection = self

        column_name = None

        if isinstance(column, str):

            column_name = column

        elif not isinstance(column, DaftCollection):

            # raw content

            column_name = f"content_{len(collection.column_names)}"

            collection = collection.add_column(column_name, column)

        else:

            column_name = column.column_names[0]

        if to is None:

            to = f"embeddings_{column_name}"

        if embedding_fn is None:

            embedding_fn = self.embedding_functions[to]

        else:

            self.embedding_functions[to] = embedding_fn

        self.embedding_functions[to] = get_embedding_fn(

            self.embedding_functions[to], datatype=datatype, init_kwargs=init_kwargs

        )

        return collection.apply(

            self.embedding_functions[to],

            collection[column_name],

            *args,

            to=to,

            resource_request=resource_request,

            **kwargs,

        )

    def save_local(self, directory: str) -> str:

        os.makedirs(directory, exist_ok=True)

        table = self.daft_df.to_arrow()

        pq.write_table(table, os.path.join(directory, "content.parquet"))

    @classmethod

    def from_local_dir(cls, local_dir: str, *args, **kwargs) -> DaftCollection:

        df = daft.read_parquet(os.path.join(local_dir, "content.parquet"))

        return DaftCollection(daft_df=df, *args, **kwargs)

    @classmethod

    def connect(

        cls, address: str = None, cluster_kwargs: Dict[str, Any] = {}, *args, **kwargs

    ) -> DaftCollection:

        import ray

        if address is None:

            addy = ray.init(**cluster_kwargs)

        else:

            addy = ray.init(address=address, **cluster_kwargs)

        daft.context.set_runner_ray(address=addy.address_info["address"])

        return DaftCollection(*args, **kwargs)

    def to_langchain(self, document_column: str, embeddings_column: str):

        from langchain.docstore.document import Document

        from langchain.vectorstores import VectorStore

        class VexpressoVectorStore(VectorStore):

            def __init__(self, collection: DaftCollection):

                self.collection = collection

                self.document_column = document_column

                self.embeddings_column = embeddings_column

            def add_texts(

                self,

                texts: Iterable[str],

                metadatas: Optional[List[dict]] = None,

                **kwargs: Any,

            ) -> List[str]:

                if metadatas is None:

                    metadatas = [{} for _ in range(len(texts))]

                combined = [

                    {self.document_column: t, **m} for t, m in zip(texts, metadatas)

                ]

                self.collection = self.collection.add_rows(combined)

            def similarity_search(

                self, query: str, k: int = 4, **kwargs: Any

            ) -> List[Document]:

                dictionary = self.collection.query(

                    self.embeddings_column, query=query, k=k, lazy=False, **kwargs

                ).to_dict()

                documents = dictionary[self.document_column]

                metadatas = {

                    k: dictionary[k] for k in dictionary if k != self.document_column

                }

                out = []

                for i in range(len(documents)):

                    doc = documents[i]

                    d = {k: metadatas[k][i] for k in metadatas}

                    out.append(Document(page_content=doc, metadata=d))

                return out

            @classmethod

            def from_texts(

                cls,

                *args,

                **kwargs: Any,

            ):

                """Return VectorStore initialized from texts and embeddings."""

                return None

        return VexpressoVectorStore(self)

    @classmethod

    def from_documents(

        cls, documents: List[Document], *args, **kwargs

    ) -> DaftCollection:

        # for langchain integration

        raw = [{"text": d.page_content, **d.metadata} for d in documents]

        return DaftCollection(data=raw, *args, **kwargs)

Classes#

DaftCollection#

class DaftCollection(
    data: 'Optional[Union[str, pd.DataFrame, Dict[str, Any]]]' = None,
    retriever: 'BaseRetriever' = <vexpresso.retrievers.np.Retriever object at 0x7f1607d50580>,
    embeddings: 'Optional[List[Any]]' = None,
    embedding_functions: 'Dict[str, Any]' = {},
    daft_df: 'Optional[daft.DataFrame]' = None,
    lazy: 'bool' = False
)
View Source
class DaftCollection(Collection):

    def __init__(

        self,

        data: Optional[Union[str, pd.DataFrame, Dict[str, Any]]] = None,

        retriever: BaseRetriever = Retriever(),

        embeddings: Optional[List[Any]] = None,

        embedding_functions: Dict[str, Any] = {},

        daft_df: Optional[daft.DataFrame] = None,

        lazy: bool = False,

    ):

        self.daft_df = daft_df

        self.retriever = retriever

        self.embedding_functions = embedding_functions

        _metadata = {}

        if data is not None:

            if isinstance(data, str):

                if data.endswith(".json"):

                    with open(data, "r") as f:

                        _metadata = pd.DataFrame(json.load(f))

            elif isinstance(data, pd.DataFrame):

                _metadata = data.to_dict("list")

            else:

                _metadata = data

        if daft_df is None and len(_metadata) > 0:

            if isinstance(_metadata, list):

                self.daft_df = daft.from_pylist(_metadata)

            else:

                self.daft_df = daft.from_pydict({**_metadata})

            if embeddings is not None:

                self.daft_df = self.add_column("embeddings", embeddings).daft_df

            if not lazy:

                self.daft_df = self.daft_df.collect()

    def __repr__(self) -> str:

        return self.daft_df.__repr__()

    @property

    def on_df(self) -> Wrapper:

        return Wrapper(self)

    @property

    def df(self) -> daft.DataFrame:

        return self.daft_df

    def __len__(self) -> int:

        return self.daft_df.count_rows()

    def __getitem__(self, column: str) -> DaftCollection:

        return self.select(column)

    def cast(

        self, column: str = None, datatype: DataType = DataType.python()

    ) -> DaftCollection:

        if column is None:

            columns = [col(c).cast(datatype) for c in self.column_names]

        else:

            columns = []

            for c in self.column_names:

                if c == column:

                    columns.append(col(column).cast(datatype))

                else:

                    columns.append(c)

        return self.from_daft_df(self.daft_df.select(*columns))

    def add_rows(self, entries: List[Dict[str, Any]]) -> DaftCollection:

        dic = self.to_dict()

        for k in dic:

            for d in entries:

                value = d.get(k, None)

                dic[k].append(value)

        return self.from_data(dic)

    def add(self, entries: List[Dict[str, Any]]) -> DaftCollection:

        return self.add_row(entries)

    def set_embedding_function(self, column: str, embedding_function: Transformation):

        self.embedding_functions[column] = embedding_function

    @property

    def column_names(self) -> List[str]:

        return self.daft_df.column_names

    def from_daft_df(self, df: daft.DataFrame) -> DaftCollection:

        return DaftCollection(

            retriever=self.retriever,

            embedding_functions=self.embedding_functions,

            daft_df=df,

        )

    def from_data(self, data: Any) -> DaftCollection:

        return DaftCollection(

            data=data,

            retriever=self.retriever,

            embedding_functions=self.embedding_functions,

        )

    def collect(self, in_place: bool = False):

        if in_place:

            self.daft_df = self.daft_df.collect(num_preview_rows=None)

            return self

        return self.from_daft_df(self.daft_df.collect(num_preview_rows=None))

    def execute(self) -> DaftCollection:

        return self.collect()

    def to_pandas(self) -> pd.DataFrame:

        collection = self.execute()

        return collection.daft_df.to_pandas()

    def to_dict(self) -> Dict[str, List[Any]]:

        collection = self.execute()

        return collection.daft_df.to_pydict()

    def to_list(self) -> List[Any]:

        collection = self.execute()

        values = list(collection.daft_df.to_pydict().values())

        if len(values) == 1:

            return values[0]

        return values

    def show(self, num_rows: Optional[int] = None):

        if num_rows is None:

            return self.daft_df.show(self.__len__())

        return self.daft_df.show(num_rows)

    @lazy(default=True)

    def iloc(self, idx: Union[int, Iterable[int]]) -> DaftCollection:

        # for some reason this is super slow

        if isinstance(idx, int):

            idx = [idx]

        collection = (

            self.on_df.with_column(

                "_vexpresso_index", indices(col(self.column_names[0]))

            )

            .filter({"_vexpresso_index": {"isin": idx}})

            .exclude("_vexpresso_index")

        )

        return collection

    @lazy(default=True)

    def rename(self, columns: Dict[str, str]) -> DaftCollection:

        expressions = []

        for column in self.column_names:

            if column in columns:

                expressions.append(col(column).alias(columns[column]))

            else:

                expressions.append(col(column))

        return self.on_df.select(*expressions)

    @lazy(default=True)

    def agg(self, *args, **kwargs) -> DaftCollection:

        return self.from_df(self.daft_df.agg(*args, **kwargs))

    @lazy(default=True)

    def add_column(self, name: str, column: List[Any]) -> DaftCollection:

        df = self.df

        if name in self.column_names:

            df = df.exclude(name)

        df = df.with_column("_vexpresso_index", indices(col(self.column_names[0])))

        second_df = daft.from_pydict(

            {name: column, "_vexpresso_index": list(range(len(self)))}

        )

        df = df.join(second_df, on="_vexpresso_index").exclude("_vexpresso_index")

        return self.from_daft_df(df)

    @lazy(default=True)

    def sort(self, column, desc=True) -> DaftCollection:

        return self.from_daft_df(self.daft_df.sort(col(column), desc=desc))

    def embed_query(

        self,

        query: Any,

        embedding_column_name: Optional[str] = None,

        embedding_fn: Optional[Transformation] = None,

        resource_request=ResourceRequest(),

        *args,

        **kwargs,

    ) -> Any:

        return self.embed_queries(

            queries=[query],

            embedding_column_name=embedding_column_name,

            embedding_fn=embedding_fn,

            resource_request=resource_request,

            *args,

            **kwargs,

        )[0]

    def embed_queries(

        self,

        queries: List[Any],

        embedding_column_name: Optional[str] = None,

        embedding_fn: Optional[Transformation] = None,

        resource_request=ResourceRequest(),

        *args,

        **kwargs,

    ) -> Any:

        if embedding_fn is None:

            if embedding_column_name is None:

                raise ValueError("Column name must be provided if embedding_fn is None")

            embedding_fn = self.embedding_functions[embedding_column_name]

        elif isinstance(embedding_fn, str):

            embedding_fn = self.embedding_functions[embedding_fn]

        query_embeddings = (

            daft.from_pydict({"queries": queries})

            .with_column(

                "query_embeddings",

                embedding_fn(col("queries"), *args, **kwargs),

                resource_request=resource_request,

            )

            .select("query_embeddings")

            .collect()

            .to_pydict()["query_embeddings"]

        )

        return query_embeddings

    @lazy(default=True)

    def query(

        self,

        column: str,

        query: List[Any] = None,

        query_embedding: List[Any] = None,

        filter_conditions: Optional[Dict[str, Dict[str, str]]] = None,

        k: int = None,

        sort: bool = True,

        embedding_fn: Optional[Transformation] = None,

        return_scores: bool = False,

        score_column_name: Optional[str] = None,

        resource_request: ResourceRequest = ResourceRequest(),

        retriever: Optional[BaseRetriever] = None,

        *args,

        **kwargs,

    ) -> Collection:

        if query is not None:

            query = [query]

        if query_embedding is not None:

            query_embedding = [query_embedding]

        return self.batch_query(

            column=column,

            queries=query,

            query_embeddings=query_embedding,

            filter_conditions=filter_conditions,

            k=k,

            sort=sort,

            embedding_fn=embedding_fn,

            return_scores=return_scores,

            score_column_name=score_column_name,

            resource_request=resource_request,

            retriever=retriever,

            *args,

            **kwargs,

        )[0]

    @lazy(default=True)

    def batch_query(

        self,

        column: str,

        queries: List[Any] = None,

        query_embeddings: List[Any] = None,

        filter_conditions: Optional[Dict[str, Dict[str, str]]] = None,

        k: int = None,

        sort: bool = True,

        embedding_fn: Optional[Union[Transformation, str]] = None,

        return_scores: bool = False,

        score_column_name: Optional[str] = None,

        resource_request: ResourceRequest = ResourceRequest(),

        retriever: Optional[BaseRetriever] = None,

        *args,

        **kwargs,

    ) -> List[Collection]:

        batch_size = len(queries) if query_embeddings is None else len(query_embeddings)

        if embedding_fn is not None:

            if isinstance(embedding_fn, str):

                embedding_fn = self.embedding_functions[embedding_fn]

            else:

                if column in self.embedding_functions:

                    if embedding_fn != self.embedding_functions[column]:

                        print(

                            "embedding_fn may not be the same as whats in map! Updating what's in map..."

                        )

                self.embedding_functions[column] = get_embedding_fn(embedding_fn)

                embedding_fn = self.embedding_functions[column]

        if query_embeddings is None:

            query_embeddings = self.embed_queries(

                queries,

                column,

                embedding_fn,

                resource_request,

                *args,

                **kwargs,

            )

        if retriever is None:

            retriever = self.retriever

        if k is None:

            k = self.__len__()

        dfs = retrieve(

            batch_size,

            self.daft_df,

            column,

            query_embeddings,

            retriever,

            k,

            sort,

            return_scores,

            score_column_name,

            resource_request,

        )

        for i in range(len(dfs)):

            if filter_conditions is not None:

                dfs[i] = FilterHelper.filter(dfs[i], filter_conditions)

        return [self.from_daft_df(df) for df in dfs]

    @lazy(default=True)

    def select(

        self,

        *args,

    ) -> DaftCollection:

        return self.from_daft_df(FilterHelper.select(self.daft_df, *args))

    @lazy(default=True)

    def exclude(

        self,

        *args,

    ) -> DaftCollection:

        return self.from_daft_df(self.daft_df.exclude(*args))

    @lazy(default=True)

    def filter(

        self, filter_conditions: Dict[str, Dict[str, str]], *args, **kwargs

    ) -> DaftCollection:

        return self.from_daft_df(

            FilterHelper.filter(self.daft_df, filter_conditions, *args, **kwargs)

        )

    @lazy(default=True)

    def apply(

        self,

        transform_fn: Transformation,

        column: DaftCollection,

        *args,

        to: Optional[str] = None,

        resource_request: ResourceRequest = ResourceRequest(),

        datatype: DataType = DataType.python(),

        init_kwargs: Dict[str, Any] = {},

        function: str = "__call__",

        **kwargs,

    ) -> DaftCollection:

        transform_fn = transformation(

            transform_fn, datatype=datatype, init_kwargs=init_kwargs, function=function

        )

        if not isinstance(column, DaftCollection):

            raise TypeError(

                "first args in apply must be a DaftCollection! use `collection['column_name']`"

            )

        collection = self

        args = [column, *args]

        _args = []

        for _arg in args:

            if isinstance(_arg, DaftCollection):

                if len(_arg.column_names) > 1:

                    raise ValueError(

                        "When passing in a Daft collection into `embed`, they must only have 1 column!"

                    )

                column_name = _arg.column_names[0]

                if column_name not in collection.column_names:

                    content = _arg.select(column_name).to_dict()[column_name]

                    collection = collection.add_column(column_name, content)

                _args.append(col(column_name))

            else:

                _args.append(_arg)

        _kwargs = {}

        for k in kwargs:

            _kwargs[k] = kwargs[k]

            if isinstance(_kwargs[k], DaftCollection):

                # only support first column

                column = _kwargs[k].daft_df.columns[0]

                _kwargs[k] = column

        if to is None:

            to = f"tranformed_{_args[0].name()}"

        return collection.on_df.with_column(

            to, transform_fn(*_args, **_kwargs), resource_request=resource_request

        )

    @lazy(default=True)

    def embed(

        self,

        column: Union[DaftCollection, List[Any], str],

        *args,

        embedding_fn: Optional[Transformation] = None,

        to: Optional[str] = None,

        resource_request: ResourceRequest = ResourceRequest(),

        datatype: DataType = DataType.python(),

        init_kwargs: Dict[str, Any] = {},

        **kwargs,

    ) -> DaftCollection:

        collection = self

        column_name = None

        if isinstance(column, str):

            column_name = column

        elif not isinstance(column, DaftCollection):

            # raw content

            column_name = f"content_{len(collection.column_names)}"

            collection = collection.add_column(column_name, column)

        else:

            column_name = column.column_names[0]

        if to is None:

            to = f"embeddings_{column_name}"

        if embedding_fn is None:

            embedding_fn = self.embedding_functions[to]

        else:

            self.embedding_functions[to] = embedding_fn

        self.embedding_functions[to] = get_embedding_fn(

            self.embedding_functions[to], datatype=datatype, init_kwargs=init_kwargs

        )

        return collection.apply(

            self.embedding_functions[to],

            collection[column_name],

            *args,

            to=to,

            resource_request=resource_request,

            **kwargs,

        )

    def save_local(self, directory: str) -> str:

        os.makedirs(directory, exist_ok=True)

        table = self.daft_df.to_arrow()

        pq.write_table(table, os.path.join(directory, "content.parquet"))

    @classmethod

    def from_local_dir(cls, local_dir: str, *args, **kwargs) -> DaftCollection:

        df = daft.read_parquet(os.path.join(local_dir, "content.parquet"))

        return DaftCollection(daft_df=df, *args, **kwargs)

    @classmethod

    def connect(

        cls, address: str = None, cluster_kwargs: Dict[str, Any] = {}, *args, **kwargs

    ) -> DaftCollection:

        import ray

        if address is None:

            addy = ray.init(**cluster_kwargs)

        else:

            addy = ray.init(address=address, **cluster_kwargs)

        daft.context.set_runner_ray(address=addy.address_info["address"])

        return DaftCollection(*args, **kwargs)

    def to_langchain(self, document_column: str, embeddings_column: str):

        from langchain.docstore.document import Document

        from langchain.vectorstores import VectorStore

        class VexpressoVectorStore(VectorStore):

            def __init__(self, collection: DaftCollection):

                self.collection = collection

                self.document_column = document_column

                self.embeddings_column = embeddings_column

            def add_texts(

                self,

                texts: Iterable[str],

                metadatas: Optional[List[dict]] = None,

                **kwargs: Any,

            ) -> List[str]:

                if metadatas is None:

                    metadatas = [{} for _ in range(len(texts))]

                combined = [

                    {self.document_column: t, **m} for t, m in zip(texts, metadatas)

                ]

                self.collection = self.collection.add_rows(combined)

            def similarity_search(

                self, query: str, k: int = 4, **kwargs: Any

            ) -> List[Document]:

                dictionary = self.collection.query(

                    self.embeddings_column, query=query, k=k, lazy=False, **kwargs

                ).to_dict()

                documents = dictionary[self.document_column]

                metadatas = {

                    k: dictionary[k] for k in dictionary if k != self.document_column

                }

                out = []

                for i in range(len(documents)):

                    doc = documents[i]

                    d = {k: metadatas[k][i] for k in metadatas}

                    out.append(Document(page_content=doc, metadata=d))

                return out

            @classmethod

            def from_texts(

                cls,

                *args,

                **kwargs: Any,

            ):

                """Return VectorStore initialized from texts and embeddings."""

                return None

        return VexpressoVectorStore(self)

    @classmethod

    def from_documents(

        cls, documents: List[Document], *args, **kwargs

    ) -> DaftCollection:

        # for langchain integration

        raw = [{"text": d.page_content, **d.metadata} for d in documents]

        return DaftCollection(data=raw, *args, **kwargs)

Ancestors (in MRO)#

  • vexpresso.collection.Collection

Static methods#

connect#

def connect(
    address: 'str' = None,
    cluster_kwargs: 'Dict[str, Any]' = {},
    *args,
    **kwargs
) -> 'DaftCollection'
View Source
    @classmethod

    def connect(

        cls, address: str = None, cluster_kwargs: Dict[str, Any] = {}, *args, **kwargs

    ) -> DaftCollection:

        import ray

        if address is None:

            addy = ray.init(**cluster_kwargs)

        else:

            addy = ray.init(address=address, **cluster_kwargs)

        daft.context.set_runner_ray(address=addy.address_info["address"])

        return DaftCollection(*args, **kwargs)

from_documents#

def from_documents(
    documents: 'List[Document]',
    *args,
    **kwargs
) -> 'DaftCollection'
View Source
    @classmethod

    def from_documents(

        cls, documents: List[Document], *args, **kwargs

    ) -> DaftCollection:

        # for langchain integration

        raw = [{"text": d.page_content, **d.metadata} for d in documents]

        return DaftCollection(data=raw, *args, **kwargs)

from_local_dir#

def from_local_dir(
    local_dir: 'str',
    *args,
    **kwargs
) -> 'DaftCollection'
View Source
    @classmethod

    def from_local_dir(cls, local_dir: str, *args, **kwargs) -> DaftCollection:

        df = daft.read_parquet(os.path.join(local_dir, "content.parquet"))

        return DaftCollection(daft_df=df, *args, **kwargs)

from_saved#

def from_saved(
    directory_or_repo_id: 'Optional[str]' = None,
    token: 'Optional[str]' = None,
    local_dir: 'Optional[str]' = None,
    to_tmpdir: 'bool' = False,
    hf_username: 'Optional[str]' = None,
    repo_name: 'Optional[str]' = None,
    hub_download_kwargs: 'Optional[Dict[str, Any]]' = {},
    *args,
    **kwargs
) -> 'Collection'
View Source
    @classmethod

    def from_saved(

        cls,

        directory_or_repo_id: Optional[str] = None,

        token: Optional[str] = None,

        local_dir: Optional[str] = None,

        to_tmpdir: bool = False,

        hf_username: Optional[str] = None,

        repo_name: Optional[str] = None,

        hub_download_kwargs: Optional[Dict[str, Any]] = {},

        *args,

        **kwargs,

    ) -> Collection:

        if directory_or_repo_id is None:

            if hf_username is None or repo_name is None:

                raise ValueError(

                    "Please provide either a directory / repo id or your huggingface username + repo name"

                )

            directory_or_repo_id = f"{hf_username}/{repo_name}"

        saved_dir = directory_or_repo_id

        if not os.path.isdir(directory_or_repo_id):

            # from huggingface

            print(f"Retrieving from hf repo: {directory_or_repo_id}")

            with tempfile.TemporaryDirectory() as tmpdirname:

                helper = HFHubHelper()

                if to_tmpdir:

                    local_dir = tmpdirname

                saved_dir = helper.download(

                    directory_or_repo_id,

                    token=token,

                    local_dir=local_dir,

                    **hub_download_kwargs,

                )

        return cls.from_local_dir(saved_dir, *args, **kwargs)

load#

def load(
    *args,
    **kwargs
) -> 'Collection'
View Source
    @classmethod

    def load(

        cls,

        *args,

        **kwargs,

    ) -> Collection:

        return cls.from_saved(

            *args,

            **kwargs,

        )

Instance variables#

column_names
df
on_df

Methods#

add#

def add(
    self,
    entries: 'List[Dict[str, Any]]'
) -> 'DaftCollection'
View Source
    def add(self, entries: List[Dict[str, Any]]) -> DaftCollection:

        return self.add_row(entries)

add_column#

def add_column(
    self,
    name: 'str',
    column: 'List[Any]'
) -> 'DaftCollection'
View Source
    @lazy(default=True)

    def add_column(self, name: str, column: List[Any]) -> DaftCollection:

        df = self.df

        if name in self.column_names:

            df = df.exclude(name)

        df = df.with_column("_vexpresso_index", indices(col(self.column_names[0])))

        second_df = daft.from_pydict(

            {name: column, "_vexpresso_index": list(range(len(self)))}

        )

        df = df.join(second_df, on="_vexpresso_index").exclude("_vexpresso_index")

        return self.from_daft_df(df)

add_rows#

def add_rows(
    self,
    entries: 'List[Dict[str, Any]]'
) -> 'DaftCollection'
View Source
    def add_rows(self, entries: List[Dict[str, Any]]) -> DaftCollection:

        dic = self.to_dict()

        for k in dic:

            for d in entries:

                value = d.get(k, None)

                dic[k].append(value)

        return self.from_data(dic)

agg#

def agg(
    self,
    *args,
    **kwargs
) -> 'DaftCollection'
View Source
    @lazy(default=True)

    def agg(self, *args, **kwargs) -> DaftCollection:

        return self.from_df(self.daft_df.agg(*args, **kwargs))

apply#

def apply(
    self,
    transform_fn: 'Transformation',
    column: 'DaftCollection',
    *args,
    to: 'Optional[str]' = None,
    resource_request: 'ResourceRequest' = ResourceRequest(num_cpus=None, num_gpus=None, memory_bytes=None),
    datatype: 'DataType' = Python,
    init_kwargs: 'Dict[str, Any]' = {},
    function: 'str' = '__call__',
    **kwargs
) -> 'DaftCollection'

Apply method, takes in args and kwargs columns and applies a transformation function on them. The transformed columns are in format:

transformed_{column_name}

View Source
    @lazy(default=True)

    def apply(

        self,

        transform_fn: Transformation,

        column: DaftCollection,

        *args,

        to: Optional[str] = None,

        resource_request: ResourceRequest = ResourceRequest(),

        datatype: DataType = DataType.python(),

        init_kwargs: Dict[str, Any] = {},

        function: str = "__call__",

        **kwargs,

    ) -> DaftCollection:

        transform_fn = transformation(

            transform_fn, datatype=datatype, init_kwargs=init_kwargs, function=function

        )

        if not isinstance(column, DaftCollection):

            raise TypeError(

                "first args in apply must be a DaftCollection! use `collection['column_name']`"

            )

        collection = self

        args = [column, *args]

        _args = []

        for _arg in args:

            if isinstance(_arg, DaftCollection):

                if len(_arg.column_names) > 1:

                    raise ValueError(

                        "When passing in a Daft collection into `embed`, they must only have 1 column!"

                    )

                column_name = _arg.column_names[0]

                if column_name not in collection.column_names:

                    content = _arg.select(column_name).to_dict()[column_name]

                    collection = collection.add_column(column_name, content)

                _args.append(col(column_name))

            else:

                _args.append(_arg)

        _kwargs = {}

        for k in kwargs:

            _kwargs[k] = kwargs[k]

            if isinstance(_kwargs[k], DaftCollection):

                # only support first column

                column = _kwargs[k].daft_df.columns[0]

                _kwargs[k] = column

        if to is None:

            to = f"tranformed_{_args[0].name()}"

        return collection.on_df.with_column(

            to, transform_fn(*_args, **_kwargs), resource_request=resource_request

        )

batch_query#

def batch_query(
    self,
    column: 'str',
    queries: 'List[Any]' = None,
    query_embeddings: 'List[Any]' = None,
    filter_conditions: 'Optional[Dict[str, Dict[str, str]]]' = None,
    k: 'int' = None,
    sort: 'bool' = True,
    embedding_fn: 'Optional[Union[Transformation, str]]' = None,
    return_scores: 'bool' = False,
    score_column_name: 'Optional[str]' = None,
    resource_request: 'ResourceRequest' = ResourceRequest(num_cpus=None, num_gpus=None, memory_bytes=None),
    retriever: 'Optional[BaseRetriever]' = None,
    *args,
    **kwargs
) -> 'List[Collection]'
View Source
    @lazy(default=True)

    def batch_query(

        self,

        column: str,

        queries: List[Any] = None,

        query_embeddings: List[Any] = None,

        filter_conditions: Optional[Dict[str, Dict[str, str]]] = None,

        k: int = None,

        sort: bool = True,

        embedding_fn: Optional[Union[Transformation, str]] = None,

        return_scores: bool = False,

        score_column_name: Optional[str] = None,

        resource_request: ResourceRequest = ResourceRequest(),

        retriever: Optional[BaseRetriever] = None,

        *args,

        **kwargs,

    ) -> List[Collection]:

        batch_size = len(queries) if query_embeddings is None else len(query_embeddings)

        if embedding_fn is not None:

            if isinstance(embedding_fn, str):

                embedding_fn = self.embedding_functions[embedding_fn]

            else:

                if column in self.embedding_functions:

                    if embedding_fn != self.embedding_functions[column]:

                        print(

                            "embedding_fn may not be the same as whats in map! Updating what's in map..."

                        )

                self.embedding_functions[column] = get_embedding_fn(embedding_fn)

                embedding_fn = self.embedding_functions[column]

        if query_embeddings is None:

            query_embeddings = self.embed_queries(

                queries,

                column,

                embedding_fn,

                resource_request,

                *args,

                **kwargs,

            )

        if retriever is None:

            retriever = self.retriever

        if k is None:

            k = self.__len__()

        dfs = retrieve(

            batch_size,

            self.daft_df,

            column,

            query_embeddings,

            retriever,

            k,

            sort,

            return_scores,

            score_column_name,

            resource_request,

        )

        for i in range(len(dfs)):

            if filter_conditions is not None:

                dfs[i] = FilterHelper.filter(dfs[i], filter_conditions)

        return [self.from_daft_df(df) for df in dfs]

cast#

def cast(
    self,
    column: 'str' = None,
    datatype: 'DataType' = Python
) -> 'DaftCollection'
View Source
    def cast(

        self, column: str = None, datatype: DataType = DataType.python()

    ) -> DaftCollection:

        if column is None:

            columns = [col(c).cast(datatype) for c in self.column_names]

        else:

            columns = []

            for c in self.column_names:

                if c == column:

                    columns.append(col(column).cast(datatype))

                else:

                    columns.append(c)

        return self.from_daft_df(self.daft_df.select(*columns))

collect#

def collect(
    self,
    in_place: 'bool' = False
)

Materializes the collection

Returns:

Type Description
Collection Materialized collection
View Source
    def collect(self, in_place: bool = False):

        if in_place:

            self.daft_df = self.daft_df.collect(num_preview_rows=None)

            return self

        return self.from_daft_df(self.daft_df.collect(num_preview_rows=None))

embed#

def embed(
    self,
    column: 'Union[DaftCollection, List[Any], str]',
    *args,
    embedding_fn: 'Optional[Transformation]' = None,
    to: 'Optional[str]' = None,
    resource_request: 'ResourceRequest' = ResourceRequest(num_cpus=None, num_gpus=None, memory_bytes=None),
    datatype: 'DataType' = Python,
    init_kwargs: 'Dict[str, Any]' = {},
    **kwargs
) -> 'DaftCollection'
View Source
    @lazy(default=True)

    def embed(

        self,

        column: Union[DaftCollection, List[Any], str],

        *args,

        embedding_fn: Optional[Transformation] = None,

        to: Optional[str] = None,

        resource_request: ResourceRequest = ResourceRequest(),

        datatype: DataType = DataType.python(),

        init_kwargs: Dict[str, Any] = {},

        **kwargs,

    ) -> DaftCollection:

        collection = self

        column_name = None

        if isinstance(column, str):

            column_name = column

        elif not isinstance(column, DaftCollection):

            # raw content

            column_name = f"content_{len(collection.column_names)}"

            collection = collection.add_column(column_name, column)

        else:

            column_name = column.column_names[0]

        if to is None:

            to = f"embeddings_{column_name}"

        if embedding_fn is None:

            embedding_fn = self.embedding_functions[to]

        else:

            self.embedding_functions[to] = embedding_fn

        self.embedding_functions[to] = get_embedding_fn(

            self.embedding_functions[to], datatype=datatype, init_kwargs=init_kwargs

        )

        return collection.apply(

            self.embedding_functions[to],

            collection[column_name],

            *args,

            to=to,

            resource_request=resource_request,

            **kwargs,

        )

embed_queries#

def embed_queries(
    self,
    queries: 'List[Any]',
    embedding_column_name: 'Optional[str]' = None,
    embedding_fn: 'Optional[Transformation]' = None,
    resource_request=ResourceRequest(num_cpus=None, num_gpus=None, memory_bytes=None),
    *args,
    **kwargs
) -> 'Any'
View Source
    def embed_queries(

        self,

        queries: List[Any],

        embedding_column_name: Optional[str] = None,

        embedding_fn: Optional[Transformation] = None,

        resource_request=ResourceRequest(),

        *args,

        **kwargs,

    ) -> Any:

        if embedding_fn is None:

            if embedding_column_name is None:

                raise ValueError("Column name must be provided if embedding_fn is None")

            embedding_fn = self.embedding_functions[embedding_column_name]

        elif isinstance(embedding_fn, str):

            embedding_fn = self.embedding_functions[embedding_fn]

        query_embeddings = (

            daft.from_pydict({"queries": queries})

            .with_column(

                "query_embeddings",

                embedding_fn(col("queries"), *args, **kwargs),

                resource_request=resource_request,

            )

            .select("query_embeddings")

            .collect()

            .to_pydict()["query_embeddings"]

        )

        return query_embeddings

embed_query#

def embed_query(
    self,
    query: 'Any',
    embedding_column_name: 'Optional[str]' = None,
    embedding_fn: 'Optional[Transformation]' = None,
    resource_request=ResourceRequest(num_cpus=None, num_gpus=None, memory_bytes=None),
    *args,
    **kwargs
) -> 'Any'
View Source
    def embed_query(

        self,

        query: Any,

        embedding_column_name: Optional[str] = None,

        embedding_fn: Optional[Transformation] = None,

        resource_request=ResourceRequest(),

        *args,

        **kwargs,

    ) -> Any:

        return self.embed_queries(

            queries=[query],

            embedding_column_name=embedding_column_name,

            embedding_fn=embedding_fn,

            resource_request=resource_request,

            *args,

            **kwargs,

        )[0]

exclude#

def exclude(
    self,
    *args
) -> 'DaftCollection'
View Source
    @lazy(default=True)

    def exclude(

        self,

        *args,

    ) -> DaftCollection:

        return self.from_daft_df(self.daft_df.exclude(*args))

execute#

def execute(
    self
) -> 'DaftCollection'
View Source
    def execute(self) -> DaftCollection:

        return self.collect()

filter#

def filter(
    self,
    filter_conditions: 'Dict[str, Dict[str, str]]',
    *args,
    **kwargs
) -> 'DaftCollection'

Filter method, filters using conditions based on metadata

Parameters:

Name Type Description Default
filter_conditions Dict[str, Dict[str, str]] description None

Returns:

Type Description
Collection description
View Source
    @lazy(default=True)

    def filter(

        self, filter_conditions: Dict[str, Dict[str, str]], *args, **kwargs

    ) -> DaftCollection:

        return self.from_daft_df(

            FilterHelper.filter(self.daft_df, filter_conditions, *args, **kwargs)

        )

from_daft_df#

def from_daft_df(
    self,
    df: 'daft.DataFrame'
) -> 'DaftCollection'
View Source
    def from_daft_df(self, df: daft.DataFrame) -> DaftCollection:

        return DaftCollection(

            retriever=self.retriever,

            embedding_functions=self.embedding_functions,

            daft_df=df,

        )

from_data#

def from_data(
    self,
    data: 'Any'
) -> 'DaftCollection'
View Source
    def from_data(self, data: Any) -> DaftCollection:

        return DaftCollection(

            data=data,

            retriever=self.retriever,

            embedding_functions=self.embedding_functions,

        )

iloc#

def iloc(
    self,
    idx: 'Union[int, Iterable[int]]'
) -> 'DaftCollection'
View Source
    @lazy(default=True)

    def iloc(self, idx: Union[int, Iterable[int]]) -> DaftCollection:

        # for some reason this is super slow

        if isinstance(idx, int):

            idx = [idx]

        collection = (

            self.on_df.with_column(

                "_vexpresso_index", indices(col(self.column_names[0]))

            )

            .filter({"_vexpresso_index": {"isin": idx}})

            .exclude("_vexpresso_index")

        )

        return collection

query#

def query(
    self,
    column: 'str',
    query: 'List[Any]' = None,
    query_embedding: 'List[Any]' = None,
    filter_conditions: 'Optional[Dict[str, Dict[str, str]]]' = None,
    k: 'int' = None,
    sort: 'bool' = True,
    embedding_fn: 'Optional[Transformation]' = None,
    return_scores: 'bool' = False,
    score_column_name: 'Optional[str]' = None,
    resource_request: 'ResourceRequest' = ResourceRequest(num_cpus=None, num_gpus=None, memory_bytes=None),
    retriever: 'Optional[BaseRetriever]' = None,
    *args,
    **kwargs
) -> 'Collection'

Query method, takes in queries or query embeddings and retrieves nearest content

Parameters:

Name Type Description Default
query Dict[str, Any] description None
query_embeddings Dict[str, Any] description. Defaults to {}. {}
filter_conditions Dict[str, Dict[str, str]] description None
View Source
    @lazy(default=True)

    def query(

        self,

        column: str,

        query: List[Any] = None,

        query_embedding: List[Any] = None,

        filter_conditions: Optional[Dict[str, Dict[str, str]]] = None,

        k: int = None,

        sort: bool = True,

        embedding_fn: Optional[Transformation] = None,

        return_scores: bool = False,

        score_column_name: Optional[str] = None,

        resource_request: ResourceRequest = ResourceRequest(),

        retriever: Optional[BaseRetriever] = None,

        *args,

        **kwargs,

    ) -> Collection:

        if query is not None:

            query = [query]

        if query_embedding is not None:

            query_embedding = [query_embedding]

        return self.batch_query(

            column=column,

            queries=query,

            query_embeddings=query_embedding,

            filter_conditions=filter_conditions,

            k=k,

            sort=sort,

            embedding_fn=embedding_fn,

            return_scores=return_scores,

            score_column_name=score_column_name,

            resource_request=resource_request,

            retriever=retriever,

            *args,

            **kwargs,

        )[0]

rename#

def rename(
    self,
    columns: 'Dict[str, str]'
) -> 'DaftCollection'
View Source
    @lazy(default=True)

    def rename(self, columns: Dict[str, str]) -> DaftCollection:

        expressions = []

        for column in self.column_names:

            if column in columns:

                expressions.append(col(column).alias(columns[column]))

            else:

                expressions.append(col(column))

        return self.on_df.select(*expressions)

save#

def save(
    self,
    directory_or_repo_id: 'Optional[str]' = None,
    to_hub: 'bool' = False,
    token: 'Optional[str]' = None,
    private: 'bool' = True,
    hf_username: 'Optional[str]' = None,
    repo_name: 'Optional[str]' = None,
    hub_kwargs: 'Optional[Dict[str, Any]]' = {}
) -> 'str'
View Source
    def save(

        self,

        directory_or_repo_id: Optional[str] = None,

        to_hub: bool = False,

        token: Optional[str] = None,

        private: bool = True,

        hf_username: Optional[str] = None,

        repo_name: Optional[str] = None,

        hub_kwargs: Optional[Dict[str, Any]] = {},

    ) -> str:

        if to_hub:

            print(f"Uploading collection to {directory_or_repo_id}")

            if directory_or_repo_id is None:

                if hf_username is None or repo_name is None:

                    raise ValueError(

                        "Please provide either a directory / repo id or your huggingface username + repo name"

                    )

                directory_or_repo_id = f"{hf_username}/{repo_name}"

            with tempfile.TemporaryDirectory() as tmpdirname:

                self.save_local(tmpdirname)

                helper = HFHubHelper()

                helper.upload(

                    repo_id=directory_or_repo_id,

                    folder_path=tmpdirname,

                    token=token,

                    private=private,

                    **hub_kwargs,

                )

            print(f"Upload to {directory_or_repo_id} complete!")

            return directory_or_repo_id

        else:

            print(f"saving to {directory_or_repo_id}")

            return self.save_local(directory_or_repo_id)

save_local#

def save_local(
    self,
    directory: 'str'
) -> 'str'
View Source
    def save_local(self, directory: str) -> str:

        os.makedirs(directory, exist_ok=True)

        table = self.daft_df.to_arrow()

        pq.write_table(table, os.path.join(directory, "content.parquet"))

select#

def select(
    self,
    *args
) -> 'DaftCollection'

Select method, selects columns

Returns:

Type Description
None Collection
View Source
    @lazy(default=True)

    def select(

        self,

        *args,

    ) -> DaftCollection:

        return self.from_daft_df(FilterHelper.select(self.daft_df, *args))

set_embedding_function#

def set_embedding_function(
    self,
    column: 'str',
    embedding_function: 'Transformation'
)
View Source
    def set_embedding_function(self, column: str, embedding_function: Transformation):

        self.embedding_functions[column] = embedding_function

show#

def show(
    self,
    num_rows: 'Optional[int]' = None
)
View Source
    def show(self, num_rows: Optional[int] = None):

        if num_rows is None:

            return self.daft_df.show(self.__len__())

        return self.daft_df.show(num_rows)

sort#

def sort(
    self,
    column,
    desc=True
) -> 'DaftCollection'
View Source
    @lazy(default=True)

    def sort(self, column, desc=True) -> DaftCollection:

        return self.from_daft_df(self.daft_df.sort(col(column), desc=desc))

to_dict#

def to_dict(
    self
) -> 'Dict[str, List[Any]]'

Converts collection to dict

Returns:

Type Description
Dict[str, List[Any]] collection as dict
View Source
    def to_dict(self) -> Dict[str, List[Any]]:

        collection = self.execute()

        return collection.daft_df.to_pydict()

to_langchain#

def to_langchain(
    self,
    document_column: 'str',
    embeddings_column: 'str'
)
View Source
    def to_langchain(self, document_column: str, embeddings_column: str):

        from langchain.docstore.document import Document

        from langchain.vectorstores import VectorStore

        class VexpressoVectorStore(VectorStore):

            def __init__(self, collection: DaftCollection):

                self.collection = collection

                self.document_column = document_column

                self.embeddings_column = embeddings_column

            def add_texts(

                self,

                texts: Iterable[str],

                metadatas: Optional[List[dict]] = None,

                **kwargs: Any,

            ) -> List[str]:

                if metadatas is None:

                    metadatas = [{} for _ in range(len(texts))]

                combined = [

                    {self.document_column: t, **m} for t, m in zip(texts, metadatas)

                ]

                self.collection = self.collection.add_rows(combined)

            def similarity_search(

                self, query: str, k: int = 4, **kwargs: Any

            ) -> List[Document]:

                dictionary = self.collection.query(

                    self.embeddings_column, query=query, k=k, lazy=False, **kwargs

                ).to_dict()

                documents = dictionary[self.document_column]

                metadatas = {

                    k: dictionary[k] for k in dictionary if k != self.document_column

                }

                out = []

                for i in range(len(documents)):

                    doc = documents[i]

                    d = {k: metadatas[k][i] for k in metadatas}

                    out.append(Document(page_content=doc, metadata=d))

                return out

            @classmethod

            def from_texts(

                cls,

                *args,

                **kwargs: Any,

            ):

                """Return VectorStore initialized from texts and embeddings."""

                return None

        return VexpressoVectorStore(self)

to_list#

def to_list(
    self
) -> 'List[Any]'

Converts collection to list

Returns:

Type Description
List[Any] returns list of columns
View Source
    def to_list(self) -> List[Any]:

        collection = self.execute()

        values = list(collection.daft_df.to_pydict().values())

        if len(values) == 1:

            return values[0]

        return values

to_pandas#

def to_pandas(
    self
) -> 'pd.DataFrame'

Converts collection to pandas dataframe

Returns:

Type Description
pd.DataFrame description
View Source
    def to_pandas(self) -> pd.DataFrame:

        collection = self.execute()

        return collection.daft_df.to_pandas()