Module vexpresso.daft#
View Source
from vexpresso.daft.collection import DaftCollection
__all__ = ["DaftCollection"]
Sub-modules#
Classes#
DaftCollection#
class DaftCollection(
data: 'Optional[Union[str, pd.DataFrame, Dict[str, Any]]]' = None,
retriever: 'BaseRetriever' = <vexpresso.retrievers.np.Retriever object at 0x7f1607d50580>,
embeddings: 'Optional[List[Any]]' = None,
embedding_functions: 'Dict[str, Any]' = {},
daft_df: 'Optional[daft.DataFrame]' = None,
lazy: 'bool' = False
)
View Source
class DaftCollection(Collection):
def __init__(
self,
data: Optional[Union[str, pd.DataFrame, Dict[str, Any]]] = None,
retriever: BaseRetriever = Retriever(),
embeddings: Optional[List[Any]] = None,
embedding_functions: Dict[str, Any] = {},
daft_df: Optional[daft.DataFrame] = None,
lazy: bool = False,
):
self.daft_df = daft_df
self.retriever = retriever
self.embedding_functions = embedding_functions
_metadata = {}
if data is not None:
if isinstance(data, str):
if data.endswith(".json"):
with open(data, "r") as f:
_metadata = pd.DataFrame(json.load(f))
elif isinstance(data, pd.DataFrame):
_metadata = data.to_dict("list")
else:
_metadata = data
if daft_df is None and len(_metadata) > 0:
if isinstance(_metadata, list):
self.daft_df = daft.from_pylist(_metadata)
else:
self.daft_df = daft.from_pydict({**_metadata})
if embeddings is not None:
self.daft_df = self.add_column("embeddings", embeddings).daft_df
if not lazy:
self.daft_df = self.daft_df.collect()
def __repr__(self) -> str:
return self.daft_df.__repr__()
@property
def on_df(self) -> Wrapper:
return Wrapper(self)
@property
def df(self) -> daft.DataFrame:
return self.daft_df
def __len__(self) -> int:
return self.daft_df.count_rows()
def __getitem__(self, column: str) -> DaftCollection:
return self.select(column)
def cast(
self, column: str = None, datatype: DataType = DataType.python()
) -> DaftCollection:
if column is None:
columns = [col(c).cast(datatype) for c in self.column_names]
else:
columns = []
for c in self.column_names:
if c == column:
columns.append(col(column).cast(datatype))
else:
columns.append(c)
return self.from_daft_df(self.daft_df.select(*columns))
def add_rows(self, entries: List[Dict[str, Any]]) -> DaftCollection:
dic = self.to_dict()
for k in dic:
for d in entries:
value = d.get(k, None)
dic[k].append(value)
return self.from_data(dic)
def add(self, entries: List[Dict[str, Any]]) -> DaftCollection:
return self.add_row(entries)
def set_embedding_function(self, column: str, embedding_function: Transformation):
self.embedding_functions[column] = embedding_function
@property
def column_names(self) -> List[str]:
return self.daft_df.column_names
def from_daft_df(self, df: daft.DataFrame) -> DaftCollection:
return DaftCollection(
retriever=self.retriever,
embedding_functions=self.embedding_functions,
daft_df=df,
)
def from_data(self, data: Any) -> DaftCollection:
return DaftCollection(
data=data,
retriever=self.retriever,
embedding_functions=self.embedding_functions,
)
def collect(self, in_place: bool = False):
if in_place:
self.daft_df = self.daft_df.collect(num_preview_rows=None)
return self
return self.from_daft_df(self.daft_df.collect(num_preview_rows=None))
def execute(self) -> DaftCollection:
return self.collect()
def to_pandas(self) -> pd.DataFrame:
collection = self.execute()
return collection.daft_df.to_pandas()
def to_dict(self) -> Dict[str, List[Any]]:
collection = self.execute()
return collection.daft_df.to_pydict()
def to_list(self) -> List[Any]:
collection = self.execute()
values = list(collection.daft_df.to_pydict().values())
if len(values) == 1:
return values[0]
return values
def show(self, num_rows: Optional[int] = None):
if num_rows is None:
return self.daft_df.show(self.__len__())
return self.daft_df.show(num_rows)
@lazy(default=True)
def iloc(self, idx: Union[int, Iterable[int]]) -> DaftCollection:
# for some reason this is super slow
if isinstance(idx, int):
idx = [idx]
collection = (
self.on_df.with_column(
"_vexpresso_index", indices(col(self.column_names[0]))
)
.filter({"_vexpresso_index": {"isin": idx}})
.exclude("_vexpresso_index")
)
return collection
@lazy(default=True)
def rename(self, columns: Dict[str, str]) -> DaftCollection:
expressions = []
for column in self.column_names:
if column in columns:
expressions.append(col(column).alias(columns[column]))
else:
expressions.append(col(column))
return self.on_df.select(*expressions)
@lazy(default=True)
def agg(self, *args, **kwargs) -> DaftCollection:
return self.from_df(self.daft_df.agg(*args, **kwargs))
@lazy(default=True)
def add_column(self, name: str, column: List[Any]) -> DaftCollection:
df = self.df
if name in self.column_names:
df = df.exclude(name)
df = df.with_column("_vexpresso_index", indices(col(self.column_names[0])))
second_df = daft.from_pydict(
{name: column, "_vexpresso_index": list(range(len(self)))}
)
df = df.join(second_df, on="_vexpresso_index").exclude("_vexpresso_index")
return self.from_daft_df(df)
@lazy(default=True)
def sort(self, column, desc=True) -> DaftCollection:
return self.from_daft_df(self.daft_df.sort(col(column), desc=desc))
def embed_query(
self,
query: Any,
embedding_column_name: Optional[str] = None,
embedding_fn: Optional[Transformation] = None,
resource_request=ResourceRequest(),
*args,
**kwargs,
) -> Any:
return self.embed_queries(
queries=[query],
embedding_column_name=embedding_column_name,
embedding_fn=embedding_fn,
resource_request=resource_request,
*args,
**kwargs,
)[0]
def embed_queries(
self,
queries: List[Any],
embedding_column_name: Optional[str] = None,
embedding_fn: Optional[Transformation] = None,
resource_request=ResourceRequest(),
*args,
**kwargs,
) -> Any:
if embedding_fn is None:
if embedding_column_name is None:
raise ValueError("Column name must be provided if embedding_fn is None")
embedding_fn = self.embedding_functions[embedding_column_name]
elif isinstance(embedding_fn, str):
embedding_fn = self.embedding_functions[embedding_fn]
query_embeddings = (
daft.from_pydict({"queries": queries})
.with_column(
"query_embeddings",
embedding_fn(col("queries"), *args, **kwargs),
resource_request=resource_request,
)
.select("query_embeddings")
.collect()
.to_pydict()["query_embeddings"]
)
return query_embeddings
@lazy(default=True)
def query(
self,
column: str,
query: List[Any] = None,
query_embedding: List[Any] = None,
filter_conditions: Optional[Dict[str, Dict[str, str]]] = None,
k: int = None,
sort: bool = True,
embedding_fn: Optional[Transformation] = None,
return_scores: bool = False,
score_column_name: Optional[str] = None,
resource_request: ResourceRequest = ResourceRequest(),
retriever: Optional[BaseRetriever] = None,
*args,
**kwargs,
) -> Collection:
if query is not None:
query = [query]
if query_embedding is not None:
query_embedding = [query_embedding]
return self.batch_query(
column=column,
queries=query,
query_embeddings=query_embedding,
filter_conditions=filter_conditions,
k=k,
sort=sort,
embedding_fn=embedding_fn,
return_scores=return_scores,
score_column_name=score_column_name,
resource_request=resource_request,
retriever=retriever,
*args,
**kwargs,
)[0]
@lazy(default=True)
def batch_query(
self,
column: str,
queries: List[Any] = None,
query_embeddings: List[Any] = None,
filter_conditions: Optional[Dict[str, Dict[str, str]]] = None,
k: int = None,
sort: bool = True,
embedding_fn: Optional[Union[Transformation, str]] = None,
return_scores: bool = False,
score_column_name: Optional[str] = None,
resource_request: ResourceRequest = ResourceRequest(),
retriever: Optional[BaseRetriever] = None,
*args,
**kwargs,
) -> List[Collection]:
batch_size = len(queries) if query_embeddings is None else len(query_embeddings)
if embedding_fn is not None:
if isinstance(embedding_fn, str):
embedding_fn = self.embedding_functions[embedding_fn]
else:
if column in self.embedding_functions:
if embedding_fn != self.embedding_functions[column]:
print(
"embedding_fn may not be the same as whats in map! Updating what's in map..."
)
self.embedding_functions[column] = get_embedding_fn(embedding_fn)
embedding_fn = self.embedding_functions[column]
if query_embeddings is None:
query_embeddings = self.embed_queries(
queries,
column,
embedding_fn,
resource_request,
*args,
**kwargs,
)
if retriever is None:
retriever = self.retriever
if k is None:
k = self.__len__()
dfs = retrieve(
batch_size,
self.daft_df,
column,
query_embeddings,
retriever,
k,
sort,
return_scores,
score_column_name,
resource_request,
)
for i in range(len(dfs)):
if filter_conditions is not None:
dfs[i] = FilterHelper.filter(dfs[i], filter_conditions)
return [self.from_daft_df(df) for df in dfs]
@lazy(default=True)
def select(
self,
*args,
) -> DaftCollection:
return self.from_daft_df(FilterHelper.select(self.daft_df, *args))
@lazy(default=True)
def exclude(
self,
*args,
) -> DaftCollection:
return self.from_daft_df(self.daft_df.exclude(*args))
@lazy(default=True)
def filter(
self, filter_conditions: Dict[str, Dict[str, str]], *args, **kwargs
) -> DaftCollection:
return self.from_daft_df(
FilterHelper.filter(self.daft_df, filter_conditions, *args, **kwargs)
)
@lazy(default=True)
def apply(
self,
transform_fn: Transformation,
column: DaftCollection,
*args,
to: Optional[str] = None,
resource_request: ResourceRequest = ResourceRequest(),
datatype: DataType = DataType.python(),
init_kwargs: Dict[str, Any] = {},
function: str = "__call__",
**kwargs,
) -> DaftCollection:
transform_fn = transformation(
transform_fn, datatype=datatype, init_kwargs=init_kwargs, function=function
)
if not isinstance(column, DaftCollection):
raise TypeError(
"first args in apply must be a DaftCollection! use `collection['column_name']`"
)
collection = self
args = [column, *args]
_args = []
for _arg in args:
if isinstance(_arg, DaftCollection):
if len(_arg.column_names) > 1:
raise ValueError(
"When passing in a Daft collection into `embed`, they must only have 1 column!"
)
column_name = _arg.column_names[0]
if column_name not in collection.column_names:
content = _arg.select(column_name).to_dict()[column_name]
collection = collection.add_column(column_name, content)
_args.append(col(column_name))
else:
_args.append(_arg)
_kwargs = {}
for k in kwargs:
_kwargs[k] = kwargs[k]
if isinstance(_kwargs[k], DaftCollection):
# only support first column
column = _kwargs[k].daft_df.columns[0]
_kwargs[k] = column
if to is None:
to = f"tranformed_{_args[0].name()}"
return collection.on_df.with_column(
to, transform_fn(*_args, **_kwargs), resource_request=resource_request
)
@lazy(default=True)
def embed(
self,
column: Union[DaftCollection, List[Any], str],
*args,
embedding_fn: Optional[Transformation] = None,
to: Optional[str] = None,
resource_request: ResourceRequest = ResourceRequest(),
datatype: DataType = DataType.python(),
init_kwargs: Dict[str, Any] = {},
**kwargs,
) -> DaftCollection:
collection = self
column_name = None
if isinstance(column, str):
column_name = column
elif not isinstance(column, DaftCollection):
# raw content
column_name = f"content_{len(collection.column_names)}"
collection = collection.add_column(column_name, column)
else:
column_name = column.column_names[0]
if to is None:
to = f"embeddings_{column_name}"
if embedding_fn is None:
embedding_fn = self.embedding_functions[to]
else:
self.embedding_functions[to] = embedding_fn
self.embedding_functions[to] = get_embedding_fn(
self.embedding_functions[to], datatype=datatype, init_kwargs=init_kwargs
)
return collection.apply(
self.embedding_functions[to],
collection[column_name],
*args,
to=to,
resource_request=resource_request,
**kwargs,
)
def save_local(self, directory: str) -> str:
os.makedirs(directory, exist_ok=True)
table = self.daft_df.to_arrow()
pq.write_table(table, os.path.join(directory, "content.parquet"))
@classmethod
def from_local_dir(cls, local_dir: str, *args, **kwargs) -> DaftCollection:
df = daft.read_parquet(os.path.join(local_dir, "content.parquet"))
return DaftCollection(daft_df=df, *args, **kwargs)
@classmethod
def connect(
cls, address: str = None, cluster_kwargs: Dict[str, Any] = {}, *args, **kwargs
) -> DaftCollection:
import ray
if address is None:
addy = ray.init(**cluster_kwargs)
else:
addy = ray.init(address=address, **cluster_kwargs)
daft.context.set_runner_ray(address=addy.address_info["address"])
return DaftCollection(*args, **kwargs)
def to_langchain(self, document_column: str, embeddings_column: str):
from langchain.docstore.document import Document
from langchain.vectorstores import VectorStore
class VexpressoVectorStore(VectorStore):
def __init__(self, collection: DaftCollection):
self.collection = collection
self.document_column = document_column
self.embeddings_column = embeddings_column
def add_texts(
self,
texts: Iterable[str],
metadatas: Optional[List[dict]] = None,
**kwargs: Any,
) -> List[str]:
if metadatas is None:
metadatas = [{} for _ in range(len(texts))]
combined = [
{self.document_column: t, **m} for t, m in zip(texts, metadatas)
]
self.collection = self.collection.add_rows(combined)
def similarity_search(
self, query: str, k: int = 4, **kwargs: Any
) -> List[Document]:
dictionary = self.collection.query(
self.embeddings_column, query=query, k=k, lazy=False, **kwargs
).to_dict()
documents = dictionary[self.document_column]
metadatas = {
k: dictionary[k] for k in dictionary if k != self.document_column
}
out = []
for i in range(len(documents)):
doc = documents[i]
d = {k: metadatas[k][i] for k in metadatas}
out.append(Document(page_content=doc, metadata=d))
return out
@classmethod
def from_texts(
cls,
*args,
**kwargs: Any,
):
"""Return VectorStore initialized from texts and embeddings."""
return None
return VexpressoVectorStore(self)
@classmethod
def from_documents(
cls, documents: List[Document], *args, **kwargs
) -> DaftCollection:
# for langchain integration
raw = [{"text": d.page_content, **d.metadata} for d in documents]
return DaftCollection(data=raw, *args, **kwargs)
Ancestors (in MRO)#
- vexpresso.collection.Collection
Static methods#
connect#
def connect(
address: 'str' = None,
cluster_kwargs: 'Dict[str, Any]' = {},
*args,
**kwargs
) -> 'DaftCollection'
View Source
@classmethod
def connect(
cls, address: str = None, cluster_kwargs: Dict[str, Any] = {}, *args, **kwargs
) -> DaftCollection:
import ray
if address is None:
addy = ray.init(**cluster_kwargs)
else:
addy = ray.init(address=address, **cluster_kwargs)
daft.context.set_runner_ray(address=addy.address_info["address"])
return DaftCollection(*args, **kwargs)
from_documents#
def from_documents(
documents: 'List[Document]',
*args,
**kwargs
) -> 'DaftCollection'
View Source
@classmethod
def from_documents(
cls, documents: List[Document], *args, **kwargs
) -> DaftCollection:
# for langchain integration
raw = [{"text": d.page_content, **d.metadata} for d in documents]
return DaftCollection(data=raw, *args, **kwargs)
from_local_dir#
def from_local_dir(
local_dir: 'str',
*args,
**kwargs
) -> 'DaftCollection'
View Source
@classmethod
def from_local_dir(cls, local_dir: str, *args, **kwargs) -> DaftCollection:
df = daft.read_parquet(os.path.join(local_dir, "content.parquet"))
return DaftCollection(daft_df=df, *args, **kwargs)
from_saved#
def from_saved(
directory_or_repo_id: 'Optional[str]' = None,
token: 'Optional[str]' = None,
local_dir: 'Optional[str]' = None,
to_tmpdir: 'bool' = False,
hf_username: 'Optional[str]' = None,
repo_name: 'Optional[str]' = None,
hub_download_kwargs: 'Optional[Dict[str, Any]]' = {},
*args,
**kwargs
) -> 'Collection'
View Source
@classmethod
def from_saved(
cls,
directory_or_repo_id: Optional[str] = None,
token: Optional[str] = None,
local_dir: Optional[str] = None,
to_tmpdir: bool = False,
hf_username: Optional[str] = None,
repo_name: Optional[str] = None,
hub_download_kwargs: Optional[Dict[str, Any]] = {},
*args,
**kwargs,
) -> Collection:
if directory_or_repo_id is None:
if hf_username is None or repo_name is None:
raise ValueError(
"Please provide either a directory / repo id or your huggingface username + repo name"
)
directory_or_repo_id = f"{hf_username}/{repo_name}"
saved_dir = directory_or_repo_id
if not os.path.isdir(directory_or_repo_id):
# from huggingface
print(f"Retrieving from hf repo: {directory_or_repo_id}")
with tempfile.TemporaryDirectory() as tmpdirname:
helper = HFHubHelper()
if to_tmpdir:
local_dir = tmpdirname
saved_dir = helper.download(
directory_or_repo_id,
token=token,
local_dir=local_dir,
**hub_download_kwargs,
)
return cls.from_local_dir(saved_dir, *args, **kwargs)
load#
def load(
*args,
**kwargs
) -> 'Collection'
View Source
@classmethod
def load(
cls,
*args,
**kwargs,
) -> Collection:
return cls.from_saved(
*args,
**kwargs,
)
Instance variables#
column_names
df
on_df
Methods#
add#
def add(
self,
entries: 'List[Dict[str, Any]]'
) -> 'DaftCollection'
View Source
def add(self, entries: List[Dict[str, Any]]) -> DaftCollection:
return self.add_row(entries)
add_column#
def add_column(
self,
name: 'str',
column: 'List[Any]'
) -> 'DaftCollection'
View Source
@lazy(default=True)
def add_column(self, name: str, column: List[Any]) -> DaftCollection:
df = self.df
if name in self.column_names:
df = df.exclude(name)
df = df.with_column("_vexpresso_index", indices(col(self.column_names[0])))
second_df = daft.from_pydict(
{name: column, "_vexpresso_index": list(range(len(self)))}
)
df = df.join(second_df, on="_vexpresso_index").exclude("_vexpresso_index")
return self.from_daft_df(df)
add_rows#
def add_rows(
self,
entries: 'List[Dict[str, Any]]'
) -> 'DaftCollection'
View Source
def add_rows(self, entries: List[Dict[str, Any]]) -> DaftCollection:
dic = self.to_dict()
for k in dic:
for d in entries:
value = d.get(k, None)
dic[k].append(value)
return self.from_data(dic)
agg#
def agg(
self,
*args,
**kwargs
) -> 'DaftCollection'
View Source
@lazy(default=True)
def agg(self, *args, **kwargs) -> DaftCollection:
return self.from_df(self.daft_df.agg(*args, **kwargs))
apply#
def apply(
self,
transform_fn: 'Transformation',
column: 'DaftCollection',
*args,
to: 'Optional[str]' = None,
resource_request: 'ResourceRequest' = ResourceRequest(num_cpus=None, num_gpus=None, memory_bytes=None),
datatype: 'DataType' = Python,
init_kwargs: 'Dict[str, Any]' = {},
function: 'str' = '__call__',
**kwargs
) -> 'DaftCollection'
Apply method, takes in args and kwargs columns and applies a transformation function on them. The transformed columns are in format:
transformed_{column_name}
View Source
@lazy(default=True)
def apply(
self,
transform_fn: Transformation,
column: DaftCollection,
*args,
to: Optional[str] = None,
resource_request: ResourceRequest = ResourceRequest(),
datatype: DataType = DataType.python(),
init_kwargs: Dict[str, Any] = {},
function: str = "__call__",
**kwargs,
) -> DaftCollection:
transform_fn = transformation(
transform_fn, datatype=datatype, init_kwargs=init_kwargs, function=function
)
if not isinstance(column, DaftCollection):
raise TypeError(
"first args in apply must be a DaftCollection! use `collection['column_name']`"
)
collection = self
args = [column, *args]
_args = []
for _arg in args:
if isinstance(_arg, DaftCollection):
if len(_arg.column_names) > 1:
raise ValueError(
"When passing in a Daft collection into `embed`, they must only have 1 column!"
)
column_name = _arg.column_names[0]
if column_name not in collection.column_names:
content = _arg.select(column_name).to_dict()[column_name]
collection = collection.add_column(column_name, content)
_args.append(col(column_name))
else:
_args.append(_arg)
_kwargs = {}
for k in kwargs:
_kwargs[k] = kwargs[k]
if isinstance(_kwargs[k], DaftCollection):
# only support first column
column = _kwargs[k].daft_df.columns[0]
_kwargs[k] = column
if to is None:
to = f"tranformed_{_args[0].name()}"
return collection.on_df.with_column(
to, transform_fn(*_args, **_kwargs), resource_request=resource_request
)
batch_query#
def batch_query(
self,
column: 'str',
queries: 'List[Any]' = None,
query_embeddings: 'List[Any]' = None,
filter_conditions: 'Optional[Dict[str, Dict[str, str]]]' = None,
k: 'int' = None,
sort: 'bool' = True,
embedding_fn: 'Optional[Union[Transformation, str]]' = None,
return_scores: 'bool' = False,
score_column_name: 'Optional[str]' = None,
resource_request: 'ResourceRequest' = ResourceRequest(num_cpus=None, num_gpus=None, memory_bytes=None),
retriever: 'Optional[BaseRetriever]' = None,
*args,
**kwargs
) -> 'List[Collection]'
View Source
@lazy(default=True)
def batch_query(
self,
column: str,
queries: List[Any] = None,
query_embeddings: List[Any] = None,
filter_conditions: Optional[Dict[str, Dict[str, str]]] = None,
k: int = None,
sort: bool = True,
embedding_fn: Optional[Union[Transformation, str]] = None,
return_scores: bool = False,
score_column_name: Optional[str] = None,
resource_request: ResourceRequest = ResourceRequest(),
retriever: Optional[BaseRetriever] = None,
*args,
**kwargs,
) -> List[Collection]:
batch_size = len(queries) if query_embeddings is None else len(query_embeddings)
if embedding_fn is not None:
if isinstance(embedding_fn, str):
embedding_fn = self.embedding_functions[embedding_fn]
else:
if column in self.embedding_functions:
if embedding_fn != self.embedding_functions[column]:
print(
"embedding_fn may not be the same as whats in map! Updating what's in map..."
)
self.embedding_functions[column] = get_embedding_fn(embedding_fn)
embedding_fn = self.embedding_functions[column]
if query_embeddings is None:
query_embeddings = self.embed_queries(
queries,
column,
embedding_fn,
resource_request,
*args,
**kwargs,
)
if retriever is None:
retriever = self.retriever
if k is None:
k = self.__len__()
dfs = retrieve(
batch_size,
self.daft_df,
column,
query_embeddings,
retriever,
k,
sort,
return_scores,
score_column_name,
resource_request,
)
for i in range(len(dfs)):
if filter_conditions is not None:
dfs[i] = FilterHelper.filter(dfs[i], filter_conditions)
return [self.from_daft_df(df) for df in dfs]
cast#
def cast(
self,
column: 'str' = None,
datatype: 'DataType' = Python
) -> 'DaftCollection'
View Source
def cast(
self, column: str = None, datatype: DataType = DataType.python()
) -> DaftCollection:
if column is None:
columns = [col(c).cast(datatype) for c in self.column_names]
else:
columns = []
for c in self.column_names:
if c == column:
columns.append(col(column).cast(datatype))
else:
columns.append(c)
return self.from_daft_df(self.daft_df.select(*columns))
collect#
def collect(
self,
in_place: 'bool' = False
)
Materializes the collection
Returns:
| Type | Description |
|---|---|
| Collection | Materialized collection |
View Source
def collect(self, in_place: bool = False):
if in_place:
self.daft_df = self.daft_df.collect(num_preview_rows=None)
return self
return self.from_daft_df(self.daft_df.collect(num_preview_rows=None))
embed#
def embed(
self,
column: 'Union[DaftCollection, List[Any], str]',
*args,
embedding_fn: 'Optional[Transformation]' = None,
to: 'Optional[str]' = None,
resource_request: 'ResourceRequest' = ResourceRequest(num_cpus=None, num_gpus=None, memory_bytes=None),
datatype: 'DataType' = Python,
init_kwargs: 'Dict[str, Any]' = {},
**kwargs
) -> 'DaftCollection'
View Source
@lazy(default=True)
def embed(
self,
column: Union[DaftCollection, List[Any], str],
*args,
embedding_fn: Optional[Transformation] = None,
to: Optional[str] = None,
resource_request: ResourceRequest = ResourceRequest(),
datatype: DataType = DataType.python(),
init_kwargs: Dict[str, Any] = {},
**kwargs,
) -> DaftCollection:
collection = self
column_name = None
if isinstance(column, str):
column_name = column
elif not isinstance(column, DaftCollection):
# raw content
column_name = f"content_{len(collection.column_names)}"
collection = collection.add_column(column_name, column)
else:
column_name = column.column_names[0]
if to is None:
to = f"embeddings_{column_name}"
if embedding_fn is None:
embedding_fn = self.embedding_functions[to]
else:
self.embedding_functions[to] = embedding_fn
self.embedding_functions[to] = get_embedding_fn(
self.embedding_functions[to], datatype=datatype, init_kwargs=init_kwargs
)
return collection.apply(
self.embedding_functions[to],
collection[column_name],
*args,
to=to,
resource_request=resource_request,
**kwargs,
)
embed_queries#
def embed_queries(
self,
queries: 'List[Any]',
embedding_column_name: 'Optional[str]' = None,
embedding_fn: 'Optional[Transformation]' = None,
resource_request=ResourceRequest(num_cpus=None, num_gpus=None, memory_bytes=None),
*args,
**kwargs
) -> 'Any'
View Source
def embed_queries(
self,
queries: List[Any],
embedding_column_name: Optional[str] = None,
embedding_fn: Optional[Transformation] = None,
resource_request=ResourceRequest(),
*args,
**kwargs,
) -> Any:
if embedding_fn is None:
if embedding_column_name is None:
raise ValueError("Column name must be provided if embedding_fn is None")
embedding_fn = self.embedding_functions[embedding_column_name]
elif isinstance(embedding_fn, str):
embedding_fn = self.embedding_functions[embedding_fn]
query_embeddings = (
daft.from_pydict({"queries": queries})
.with_column(
"query_embeddings",
embedding_fn(col("queries"), *args, **kwargs),
resource_request=resource_request,
)
.select("query_embeddings")
.collect()
.to_pydict()["query_embeddings"]
)
return query_embeddings
embed_query#
def embed_query(
self,
query: 'Any',
embedding_column_name: 'Optional[str]' = None,
embedding_fn: 'Optional[Transformation]' = None,
resource_request=ResourceRequest(num_cpus=None, num_gpus=None, memory_bytes=None),
*args,
**kwargs
) -> 'Any'
View Source
def embed_query(
self,
query: Any,
embedding_column_name: Optional[str] = None,
embedding_fn: Optional[Transformation] = None,
resource_request=ResourceRequest(),
*args,
**kwargs,
) -> Any:
return self.embed_queries(
queries=[query],
embedding_column_name=embedding_column_name,
embedding_fn=embedding_fn,
resource_request=resource_request,
*args,
**kwargs,
)[0]
exclude#
def exclude(
self,
*args
) -> 'DaftCollection'
View Source
@lazy(default=True)
def exclude(
self,
*args,
) -> DaftCollection:
return self.from_daft_df(self.daft_df.exclude(*args))
execute#
def execute(
self
) -> 'DaftCollection'
View Source
def execute(self) -> DaftCollection:
return self.collect()
filter#
def filter(
self,
filter_conditions: 'Dict[str, Dict[str, str]]',
*args,
**kwargs
) -> 'DaftCollection'
Filter method, filters using conditions based on metadata
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
| filter_conditions | Dict[str, Dict[str, str]] | description | None |
Returns:
| Type | Description |
|---|---|
| Collection | description |
View Source
@lazy(default=True)
def filter(
self, filter_conditions: Dict[str, Dict[str, str]], *args, **kwargs
) -> DaftCollection:
return self.from_daft_df(
FilterHelper.filter(self.daft_df, filter_conditions, *args, **kwargs)
)
from_daft_df#
def from_daft_df(
self,
df: 'daft.DataFrame'
) -> 'DaftCollection'
View Source
def from_daft_df(self, df: daft.DataFrame) -> DaftCollection:
return DaftCollection(
retriever=self.retriever,
embedding_functions=self.embedding_functions,
daft_df=df,
)
from_data#
def from_data(
self,
data: 'Any'
) -> 'DaftCollection'
View Source
def from_data(self, data: Any) -> DaftCollection:
return DaftCollection(
data=data,
retriever=self.retriever,
embedding_functions=self.embedding_functions,
)
iloc#
def iloc(
self,
idx: 'Union[int, Iterable[int]]'
) -> 'DaftCollection'
View Source
@lazy(default=True)
def iloc(self, idx: Union[int, Iterable[int]]) -> DaftCollection:
# for some reason this is super slow
if isinstance(idx, int):
idx = [idx]
collection = (
self.on_df.with_column(
"_vexpresso_index", indices(col(self.column_names[0]))
)
.filter({"_vexpresso_index": {"isin": idx}})
.exclude("_vexpresso_index")
)
return collection
query#
def query(
self,
column: 'str',
query: 'List[Any]' = None,
query_embedding: 'List[Any]' = None,
filter_conditions: 'Optional[Dict[str, Dict[str, str]]]' = None,
k: 'int' = None,
sort: 'bool' = True,
embedding_fn: 'Optional[Transformation]' = None,
return_scores: 'bool' = False,
score_column_name: 'Optional[str]' = None,
resource_request: 'ResourceRequest' = ResourceRequest(num_cpus=None, num_gpus=None, memory_bytes=None),
retriever: 'Optional[BaseRetriever]' = None,
*args,
**kwargs
) -> 'Collection'
Query method, takes in queries or query embeddings and retrieves nearest content
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
| query | Dict[str, Any] | description | None |
| query_embeddings | Dict[str, Any] | description. Defaults to {}. | {} |
| filter_conditions | Dict[str, Dict[str, str]] | description | None |
View Source
@lazy(default=True)
def query(
self,
column: str,
query: List[Any] = None,
query_embedding: List[Any] = None,
filter_conditions: Optional[Dict[str, Dict[str, str]]] = None,
k: int = None,
sort: bool = True,
embedding_fn: Optional[Transformation] = None,
return_scores: bool = False,
score_column_name: Optional[str] = None,
resource_request: ResourceRequest = ResourceRequest(),
retriever: Optional[BaseRetriever] = None,
*args,
**kwargs,
) -> Collection:
if query is not None:
query = [query]
if query_embedding is not None:
query_embedding = [query_embedding]
return self.batch_query(
column=column,
queries=query,
query_embeddings=query_embedding,
filter_conditions=filter_conditions,
k=k,
sort=sort,
embedding_fn=embedding_fn,
return_scores=return_scores,
score_column_name=score_column_name,
resource_request=resource_request,
retriever=retriever,
*args,
**kwargs,
)[0]
rename#
def rename(
self,
columns: 'Dict[str, str]'
) -> 'DaftCollection'
View Source
@lazy(default=True)
def rename(self, columns: Dict[str, str]) -> DaftCollection:
expressions = []
for column in self.column_names:
if column in columns:
expressions.append(col(column).alias(columns[column]))
else:
expressions.append(col(column))
return self.on_df.select(*expressions)
save#
def save(
self,
directory_or_repo_id: 'Optional[str]' = None,
to_hub: 'bool' = False,
token: 'Optional[str]' = None,
private: 'bool' = True,
hf_username: 'Optional[str]' = None,
repo_name: 'Optional[str]' = None,
hub_kwargs: 'Optional[Dict[str, Any]]' = {}
) -> 'str'
View Source
def save(
self,
directory_or_repo_id: Optional[str] = None,
to_hub: bool = False,
token: Optional[str] = None,
private: bool = True,
hf_username: Optional[str] = None,
repo_name: Optional[str] = None,
hub_kwargs: Optional[Dict[str, Any]] = {},
) -> str:
if to_hub:
print(f"Uploading collection to {directory_or_repo_id}")
if directory_or_repo_id is None:
if hf_username is None or repo_name is None:
raise ValueError(
"Please provide either a directory / repo id or your huggingface username + repo name"
)
directory_or_repo_id = f"{hf_username}/{repo_name}"
with tempfile.TemporaryDirectory() as tmpdirname:
self.save_local(tmpdirname)
helper = HFHubHelper()
helper.upload(
repo_id=directory_or_repo_id,
folder_path=tmpdirname,
token=token,
private=private,
**hub_kwargs,
)
print(f"Upload to {directory_or_repo_id} complete!")
return directory_or_repo_id
else:
print(f"saving to {directory_or_repo_id}")
return self.save_local(directory_or_repo_id)
save_local#
def save_local(
self,
directory: 'str'
) -> 'str'
View Source
def save_local(self, directory: str) -> str:
os.makedirs(directory, exist_ok=True)
table = self.daft_df.to_arrow()
pq.write_table(table, os.path.join(directory, "content.parquet"))
select#
def select(
self,
*args
) -> 'DaftCollection'
Select method, selects columns
Returns:
| Type | Description |
|---|---|
| None | Collection |
View Source
@lazy(default=True)
def select(
self,
*args,
) -> DaftCollection:
return self.from_daft_df(FilterHelper.select(self.daft_df, *args))
set_embedding_function#
def set_embedding_function(
self,
column: 'str',
embedding_function: 'Transformation'
)
View Source
def set_embedding_function(self, column: str, embedding_function: Transformation):
self.embedding_functions[column] = embedding_function
show#
def show(
self,
num_rows: 'Optional[int]' = None
)
View Source
def show(self, num_rows: Optional[int] = None):
if num_rows is None:
return self.daft_df.show(self.__len__())
return self.daft_df.show(num_rows)
sort#
def sort(
self,
column,
desc=True
) -> 'DaftCollection'
View Source
@lazy(default=True)
def sort(self, column, desc=True) -> DaftCollection:
return self.from_daft_df(self.daft_df.sort(col(column), desc=desc))
to_dict#
def to_dict(
self
) -> 'Dict[str, List[Any]]'
Converts collection to dict
Returns:
| Type | Description |
|---|---|
| Dict[str, List[Any]] | collection as dict |
View Source
def to_dict(self) -> Dict[str, List[Any]]:
collection = self.execute()
return collection.daft_df.to_pydict()
to_langchain#
def to_langchain(
self,
document_column: 'str',
embeddings_column: 'str'
)
View Source
def to_langchain(self, document_column: str, embeddings_column: str):
from langchain.docstore.document import Document
from langchain.vectorstores import VectorStore
class VexpressoVectorStore(VectorStore):
def __init__(self, collection: DaftCollection):
self.collection = collection
self.document_column = document_column
self.embeddings_column = embeddings_column
def add_texts(
self,
texts: Iterable[str],
metadatas: Optional[List[dict]] = None,
**kwargs: Any,
) -> List[str]:
if metadatas is None:
metadatas = [{} for _ in range(len(texts))]
combined = [
{self.document_column: t, **m} for t, m in zip(texts, metadatas)
]
self.collection = self.collection.add_rows(combined)
def similarity_search(
self, query: str, k: int = 4, **kwargs: Any
) -> List[Document]:
dictionary = self.collection.query(
self.embeddings_column, query=query, k=k, lazy=False, **kwargs
).to_dict()
documents = dictionary[self.document_column]
metadatas = {
k: dictionary[k] for k in dictionary if k != self.document_column
}
out = []
for i in range(len(documents)):
doc = documents[i]
d = {k: metadatas[k][i] for k in metadatas}
out.append(Document(page_content=doc, metadata=d))
return out
@classmethod
def from_texts(
cls,
*args,
**kwargs: Any,
):
"""Return VectorStore initialized from texts and embeddings."""
return None
return VexpressoVectorStore(self)
to_list#
def to_list(
self
) -> 'List[Any]'
Converts collection to list
Returns:
| Type | Description |
|---|---|
| List[Any] | returns list of columns |
View Source
def to_list(self) -> List[Any]:
collection = self.execute()
values = list(collection.daft_df.to_pydict().values())
if len(values) == 1:
return values[0]
return values
to_pandas#
def to_pandas(
self
) -> 'pd.DataFrame'
Converts collection to pandas dataframe
Returns:
| Type | Description |
|---|---|
| pd.DataFrame | description |
View Source
def to_pandas(self) -> pd.DataFrame:
collection = self.execute()
return collection.daft_df.to_pandas()