Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .env → .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,6 @@ POSTGRES_PORT_NONE=5433
POSTGRES_PORT_NEON=5434
POSTGRES_PORT_PGVECTOR=5435
POSTGRES_PORT_LANTERN=5436
SSH_PORT_LANTERN=2222
SSH_PORT_LANTERN=2222
PINECONE_API_KEY='YOUR_PINECONE_API_KEY_HERE'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Coming from JS background so following some conventions from there. What do you think about

  1. putting this in .env.local
  2. putting .env.local in .gitignore
  3. adding a check that these two variables are defined when testing Pinecone
    I'm worry about accidentally committing these variables

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I think moving .env to gitignore and adding .env.example may be an option. Also having checks before using these variables will help to provide better user facing error message

PINECONE_ENV='us-west1-gcp'
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ db/
*.fvecs
*.bvecs
*.csv
.env

# Python
*.pyc
Expand All @@ -21,4 +22,4 @@ external_indexes/

# Lantern Extras
lantern_extras/
.cargo
.cargo
8 changes: 6 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
# Setup

This setup assumes that you are using VSCode and Docker.

1. Use the `Dev Containers` extension to open the workspace in a container

2. Setup tables
2. Create `.env` file from `.env.example`

3. Setup tables

```
python3 -m core.setup
```

3. Now you can use notebooks or the CLI to run experiments
4. Now you can use notebooks or the CLI to run experiments
58 changes: 58 additions & 0 deletions core/benchmark_create.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import logging
import subprocess
import statistics
import json
from .utils.database import DatabaseConnection, get_database_url
from .utils.delete_index import delete_index
from .utils.create_index import get_create_index_query, get_index_name
Expand All @@ -12,6 +13,8 @@
from .utils import cli
from .utils.process import save_result, get_experiment_results
from .utils.print import print_labels, print_row, get_title
from .utils.cloud_provider import get_cloud_provider
from .utils.names import get_cloud_index_name, get_table_name

SUPPRESS_COMMAND = "SET client_min_messages TO WARNING"

Expand All @@ -36,6 +39,11 @@ def generate_external_performance_result(extension, dataset, N, index_params):
t2 = time.time()
return (t2 - t1) * 1000

def generate_cloud_performance_result(provider_instance, vector_data, index_params):
t1 = time.time()
provider_instance.create_index(index_params, vector_data)
t2 = time.time()
return (t2 - t1) * 1000

def generate_performance_result(extension, dataset, N, index_params):
if 'external' in index_params and index_params['external']:
Expand Down Expand Up @@ -149,3 +157,53 @@ def print_results(dataset):

# Generate result
generate_result(extension, dataset, N, index_params, count=count)


def generate_cloud_result(provider, dataset, N, index_params={}, count=10):
cloud_provider = get_cloud_provider(provider)
index_name = get_cloud_index_name(dataset, N)
cloud_provider.delete_index(index_name)
index_params['name'] = index_name

print(get_title(provider, index_params, dataset, N))

print_labels(f"Iteration /{count}", 'Latency (ms)')

times = []
vector_data = []

with DatabaseConnection(Extension.NONE) as conn:
base_table_name = get_table_name(dataset=dataset, N=N)
vector_data = conn.select(f'SELECT v FROM {base_table_name}')
vector_data = list(map(lambda x: (str(x[0]), json.loads(x[1][0])), enumerate(vector_data)))

for iteration in range(count):
time = generate_cloud_performance_result(cloud_provider, vector_data, index_params)

times.append(time)

print_row(str(iteration), "{:.2f}".format(time))

cloud_provider.delete_index(index_name)

latency_average = statistics.mean(times)

def save_create_result(metric_type, metric_value):
save_result(
metric_type=metric_type,
metric_value=metric_value,
extension=provider,
index_params=index_params,
dataset=dataset,
n=convert_string_to_number(N),
)

save_create_result(Metric.CREATE_LATENCY, latency_average)
if count > 1:
latency_stddev = statistics.stdev(times)
save_create_result(Metric.CREATE_LATENCY_STDDEV, latency_stddev)

print('average latency:', f"{latency_average:.2f} ms")
if count > 1:
print('stddev latency', f"{latency_stddev:.2f} ms")
print()
3 changes: 2 additions & 1 deletion core/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
psycopg2
numpy
pgvector
pgvector
pinecone-client
24 changes: 24 additions & 0 deletions core/utils/base_cloud_provider.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import itertools

def chunks(iterable, batch_size=100):
"""A helper function to break an iterable into chunks of size batch_size."""
it = iter(iterable)
chunk = tuple(itertools.islice(it, batch_size))
while chunk:
yield chunk
chunk = tuple(itertools.islice(it, batch_size))


class BaseCloudProvider:
def create_index(self):
raise Exception("Not implemented")

def delete_index(self):
raise Exception("Not implemented")

def insert_data(self):
raise Exception("Not implemented")

def search(self):
raise Exception("Not implemented")

12 changes: 12 additions & 0 deletions core/utils/cloud_provider.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import os
from .pinecone_provider import Pinecone
from .constants import Cloud
from .validate_env_vars import validate_env_vars


def get_cloud_provider(provider_name):
validate_env_vars(['PINECONE_API_KEY', 'PINECONE_ENV'])
if provider_name == Cloud.PINECONE:
return Pinecone(os.environ['PINECONE_API_KEY'], os.environ['PINECONE_ENV'])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could do the "variables exist" check here

raise Exception(f'Invalid cloud provider {provider_name}')

4 changes: 4 additions & 0 deletions core/utils/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ class Extension(Enum):
NEON = 'neon'
NONE = 'none'

class Cloud(Enum):
PINECONE = 'pinecone'


EXTENSION_VALUES = [extension.value for extension in Extension]

Expand All @@ -33,6 +36,7 @@ class Extension(Enum):
Extension.PGVECTOR_HNSW: {'m': 32, 'ef_construction': 128, 'ef': 10},
Extension.LANTERN: {'m': 32, 'ef_construction': 128, 'ef': 10},
Extension.NEON: {'m': 32, 'ef_construction': 128, 'ef': 10},
Cloud.PINECONE: { 'name': '', 'metric': 'euclidean', 'pods': 1, 'replicas': 1, 'pod_type': 'p2' },
Extension.NONE: {},
}

Expand Down
3 changes: 3 additions & 0 deletions core/utils/names.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ def get_table_name(dataset, N, type='base'):
table_name = f"{dataset.value}_{type}{N}"
return table_name

def get_cloud_index_name(dataset, N, type='base'):
table_name = get_table_name(dataset, N, type)
return table_name.replace('_', '-')

def get_index_name(dataset, N):
return get_table_name(dataset, N) + "_index"
130 changes: 130 additions & 0 deletions core/utils/pinecone_async_index.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
from typing import Optional, List, Union, Dict, Tuple, Generic, Callable, TypeVar
from pinecone.index import Index, parse_query_response, fix_tuple_length, Iterable, _OPENAPI_ENDPOINT_PARAMS # type: ignore
from pinecone.core.client.models import QueryVector, QueryResponse, SparseValues, QueryRequest # type: ignore
from pinecone.core.utils.error_handling import validate_and_convert_errors # type: ignore


T = TypeVar('T')

# This class is an async wrapper for multiprocessing.AsyncResult functions that don't use regular async/await
# Pinecone is unfortunately one of those libraries.
class AsyncHandle(Generic[T]):
_retrieval: Callable[..., T]
_is_done: bool
_result: T
def __init__(self, retrieval: Callable[..., T]):
self._retrieval = retrieval
self._is_done = False
def get(self):
if not self._is_done:
self._is_done = True
self._result = self._retrieval()
return self._result

###
## This works around bugs in Pinecone which doesn't support async properly. The underlying API does support async,
## so we need to modify the Pinecone client to support it by deriving from their existing index class.
## Note: This will be fragile if Pinecone updates their code, so we need to stay pinned to their client library version.

class AsyncIndex(Index):
@validate_and_convert_errors
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)

def query(self,
vector: Optional[List[float]] = None,
id: Optional[str] = None,
queries: Optional[Union[List[QueryVector], List[Tuple]]] = None,
top_k: Optional[int] = None,
namespace: Optional[str] = None,
filter: Optional[Dict[str, Union[str, float, int, bool, List, dict]]] = None,
include_values: Optional[bool] = None,
include_metadata: Optional[bool] = None,
sparse_vector: Optional[Union[SparseValues, Dict[str, Union[List[float], List[int]]]]] = None,
**kwargs) -> AsyncHandle[QueryResponse]:
"""
The Query operation searches a namespace, using a query vector.
It retrieves the ids of the most similar items in a namespace, along with their similarity scores.

API reference: https://docs.pinecone.io/reference/query

Examples:
>>> index.query(vector=[1, 2, 3], top_k=10, namespace='my_namespace')
>>> index.query(id='id1', top_k=10, namespace='my_namespace')
>>> index.query(vector=[1, 2, 3], top_k=10, namespace='my_namespace', filter={'key': 'value'})
>>> index.query(id='id1', top_k=10, namespace='my_namespace', include_metadata=True, include_values=True)
>>> index.query(vector=[1, 2, 3], sparse_vector={'indices': [1, 2], 'values': [0.2, 0.4]},
>>> top_k=10, namespace='my_namespace')
>>> index.query(vector=[1, 2, 3], sparse_vector=SparseValues([1, 2], [0.2, 0.4]),
>>> top_k=10, namespace='my_namespace')

Args:
vector (List[float]): The query vector. This should be the same length as the dimension of the index
being queried. Each `query()` request can contain only one of the parameters
`queries`, `id` or `vector`.. [optional]
id (str): The unique ID of the vector to be used as a query vector.
Each `query()` request can contain only one of the parameters
`queries`, `vector`, or `id`.. [optional]
queries ([QueryVector]): DEPRECATED. The query vectors.
Each `query()` request can contain only one of the parameters
`queries`, `vector`, or `id`.. [optional]
top_k (int): The number of results to return for each query. Must be an integer greater than 1.
namespace (str): The namespace to fetch vectors from.
If not specified, the default namespace is used. [optional]
filter (Dict[str, Union[str, float, int, bool, List, dict]):
The filter to apply. You can use vector metadata to limit your search.
See https://www.pinecone.io/docs/metadata-filtering/.. [optional]
include_values (bool): Indicates whether vector values are included in the response.
If omitted the server will use the default value of False [optional]
include_metadata (bool): Indicates whether metadata is included in the response as well as the ids.
If omitted the server will use the default value of False [optional]
sparse_vector: (Union[SparseValues, Dict[str, Union[List[float], List[int]]]]): sparse values of the query vector.
Expected to be either a SparseValues object or a dict of the form:
{'indices': List[int], 'values': List[float]}, where the lists each have the same length.
Keyword Args:
Supports OpenAPI client keyword arguments. See pinecone.core.client.models.QueryRequest for more details.

Returns: QueryResponse object which contains the list of the closest vectors as ScoredVector objects,
and namespace name.
"""
def _query_transform(item):
if isinstance(item, QueryVector):
return item
if isinstance(item, tuple):
values, filter = fix_tuple_length(item, 2)
if filter is None:
return QueryVector(values=values, _check_type=_check_type)
else:
return QueryVector(values=values, filter=filter, _check_type=_check_type)
if isinstance(item, Iterable):
return QueryVector(values=item, _check_type=_check_type)
raise ValueError(f"Invalid query vector value passed: cannot interpret type {type(item)}")

# force async
kwargs['async_req'] = True

_check_type = kwargs.pop('_check_type', False)
queries = list(map(_query_transform, queries)) if queries is not None else None

sparse_vector = self._parse_sparse_values_arg(sparse_vector)
args_dict = self._parse_non_empty_args([('vector', vector),
('id', id),
('queries', queries),
('top_k', top_k),
('namespace', namespace),
('filter', filter),
('include_values', include_values),
('include_metadata', include_metadata),
('sparse_vector', sparse_vector)])
response = self._vector_api.query(
QueryRequest(
**args_dict,
_check_type=_check_type,
**{k: v for k, v in kwargs.items() if k not in _OPENAPI_ENDPOINT_PARAMS}
),
**{k: v for k, v in kwargs.items() if k in _OPENAPI_ENDPOINT_PARAMS}
)
def handler():
return parse_query_response(response.get(), vector is not None or id)

return AsyncHandle(handler)
48 changes: 48 additions & 0 deletions core/utils/pinecone_provider.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import os
import pinecone
from .pinecone_async_index import AsyncIndex
from .base_cloud_provider import BaseCloudProvider, chunks

class Pinecone(BaseCloudProvider):
def __init__(self, api_key, environment):
pinecone.init(api_key=api_key, environment=environment)

def create_index(self, index_params, vectors):
pinecone.create_index(name=index_params['name'], dimension=len(vectors[0][1]), metric=index_params['metric'], pods=index_params['pods'], replicas=index_params['replicas'], pod_type=index_params['pod_type'])
return self.insert_data(index_params['name'], vectors)

def delete_index(self, name):
try:
pinecone.delete_index(name)
except pinecone.NotFoundException:
pass

def insert_data(self, index_name, vectors):
with pinecone.Index(index_name, pool_threads=os.cpu_count()) as index:
# Send requests in parallel
async_results = [
index.upsert(vectors=ids_vectors_chunk, async_req=True)
for ids_vectors_chunk in chunks(vectors, batch_size=100)
]
# Wait for and retrieve responses (this raises in case of error)
[async_result.get() for async_result in async_results]

def calculate_recall(self, index_name, vectors_to_query, expected_results, k):
recall_sum = 0
chunk_size = 100
vector_chunks = chunks(vectors_to_query, chunk_size)

with AsyncIndex(index_name, pool_threads=os.cpu_count()) as index:
for (chunk_idx, chunk) in enumerate(vector_chunks):
chunk_idx_offset = chunk_size * chunk_idx
async_results = [
(chunk_idx_offset + idx, index.query(vector=vec, top_k=k, include_values=False, async_req=True))
for (idx, vec) in enumerate(chunk)
]
responses = [(idx, async_result.get()) for (idx, async_result) in async_results]

for (idx, res) in responses:
expected_id_set = set(expected_results[idx][:k])
recall_sum += len(expected_id_set.intersection(map(lambda x: int(x['id']), res['matches'])))

return recall_sum / len(vectors_to_query)
7 changes: 7 additions & 0 deletions core/utils/validate_env_vars.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
import os

def validate_env_vars(keys):
for key in keys:
if key not in os.environ:
raise Exception(f'Missing "{key}" environment variable')

Loading