Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions src/Weaviate.Client.Tests/Integration/TestIterator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,39 @@ var obj in collection.Iterator(cancellationToken: TestContext.Current.Cancellati
Assert.Contains("Name 2", names);
}

/// <summary>
/// Tests that test iterator
/// </summary>
[Fact]
public async Task Test_Iterator_With_Filter()
{
var collection = await CollectionFactory(
properties: [Property.Text("name"), Property.Bool("isActive")],
vectorConfig: Configure.Vector("custom", v => v.SelfProvided())
);

await collection.Data.InsertMany(
BatchInsertRequest.Create(
Enumerable.Range(1, 200).Select(i => new { Name = $"Name {i}", IsActive = i == 2 })
),
TestContext.Current.CancellationToken
);

var names = new List<string>();
await foreach (
var obj in collection.Iterator(
filter: Filter.Property("isActive").IsEqual(true),
cacheSize: 10,
cancellationToken: TestContext.Current.CancellationToken
)
)
{
obj.Do(o => names.Add(o.Name));
}

Assert.Single(names);
}

/// <summary>
/// Tests that test iterator arguments
/// </summary>
Expand Down
38 changes: 31 additions & 7 deletions src/Weaviate.Client/CollectionClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -130,11 +130,13 @@
/// <param name="returnMetadata">Metadata to include in the response.</param>
/// <param name="includeVectors">Vector configuration for returned objects.</param>
/// <param name="returnProperties">Properties to return in the response.</param>
/// <param name="filter">Filter to apply to the objects.</param>
/// <param name="returnReferences">Cross-references to return.</param>
/// <param name="cancellationToken">Cancellation token.</param>
/// <returns>An async enumerable of WeaviateObject instances.</returns>
public async IAsyncEnumerable<WeaviateObject> Iterator(

Check warning on line 137 in src/Weaviate.Client/CollectionClient.cs

View workflow job for this annotation

GitHub Actions / Test on Weaviate v1.34.0 / All Integration Tests - v1.34.0

Symbol 'Weaviate.Client.CollectionClient.Iterator(System.Guid? after = null, Weaviate.Client.Models.Filter? filter = null, uint cacheSize = 100, Weaviate.Client.Models.MetadataQuery? returnMetadata = null, Weaviate.Client.Models.VectorQuery? includeVectors = null, Weaviate.Client.Internal.AutoArray<string!>? returnProperties = null, System.Collections.Generic.IList<Weaviate.Client.Models.QueryReference!>? returnReferences = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Collections.Generic.IAsyncEnumerable<Weaviate.Client.Models.WeaviateObject!>!' is not part of the declared public API (https://github.com/dotnet/roslyn/blob/main/src/RoslynAnalyzers/PublicApiAnalyzers/PublicApiAnalyzers.Help.md)

Check warning on line 137 in src/Weaviate.Client/CollectionClient.cs

View workflow job for this annotation

GitHub Actions / Test on Weaviate v1.35.2 / All Integration Tests - v1.35.2

Symbol 'Weaviate.Client.CollectionClient.Iterator(System.Guid? after = null, Weaviate.Client.Models.Filter? filter = null, uint cacheSize = 100, Weaviate.Client.Models.MetadataQuery? returnMetadata = null, Weaviate.Client.Models.VectorQuery? includeVectors = null, Weaviate.Client.Internal.AutoArray<string!>? returnProperties = null, System.Collections.Generic.IList<Weaviate.Client.Models.QueryReference!>? returnReferences = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Collections.Generic.IAsyncEnumerable<Weaviate.Client.Models.WeaviateObject!>!' is not part of the declared public API (https://github.com/dotnet/roslyn/blob/main/src/RoslynAnalyzers/PublicApiAnalyzers/PublicApiAnalyzers.Help.md)

Check warning on line 137 in src/Weaviate.Client/CollectionClient.cs

View workflow job for this annotation

GitHub Actions / Test on Weaviate v1.36.0 / All Integration Tests - v1.36.0

Symbol 'Weaviate.Client.CollectionClient.Iterator(System.Guid? after = null, Weaviate.Client.Models.Filter? filter = null, uint cacheSize = 100, Weaviate.Client.Models.MetadataQuery? returnMetadata = null, Weaviate.Client.Models.VectorQuery? includeVectors = null, Weaviate.Client.Internal.AutoArray<string!>? returnProperties = null, System.Collections.Generic.IList<Weaviate.Client.Models.QueryReference!>? returnReferences = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Collections.Generic.IAsyncEnumerable<Weaviate.Client.Models.WeaviateObject!>!' is not part of the declared public API (https://github.com/dotnet/roslyn/blob/main/src/RoslynAnalyzers/PublicApiAnalyzers/PublicApiAnalyzers.Help.md)

Check warning on line 137 in src/Weaviate.Client/CollectionClient.cs

View workflow job for this annotation

GitHub Actions / Test on Weaviate v1.33.5 / All Integration Tests - v1.33.5

Symbol 'Weaviate.Client.CollectionClient.Iterator(System.Guid? after = null, Weaviate.Client.Models.Filter? filter = null, uint cacheSize = 100, Weaviate.Client.Models.MetadataQuery? returnMetadata = null, Weaviate.Client.Models.VectorQuery? includeVectors = null, Weaviate.Client.Internal.AutoArray<string!>? returnProperties = null, System.Collections.Generic.IList<Weaviate.Client.Models.QueryReference!>? returnReferences = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Collections.Generic.IAsyncEnumerable<Weaviate.Client.Models.WeaviateObject!>!' is not part of the declared public API (https://github.com/dotnet/roslyn/blob/main/src/RoslynAnalyzers/PublicApiAnalyzers/PublicApiAnalyzers.Help.md)
Guid? after = null,
Filter? filter = null,
uint cacheSize = ITERATOR_CACHE_SIZE,
MetadataQuery? returnMetadata = null,
VectorQuery? includeVectors = null,
Expand All @@ -145,15 +147,18 @@
{
await _client.EnsureInitializedAsync();
Guid? cursor = after;
IDictionary<string, string>? shardCursors = null;

while (true)
{
cancellationToken.ThrowIfCancellationRequested();

WeaviateResult page = await _client.GrpcClient.FetchObjects(
var reply = await _client.GrpcClient.FetchObjects(
Name,
limit: cacheSize,
after: cursor,
filters: filter,
shardCursors: shardCursors,
returnMetadata: returnMetadata,
includeVectors: includeVectors,
returnProperties: returnProperties,
Expand All @@ -162,15 +167,34 @@
tenant: Tenant
);

if (!page.Objects.Any())
WeaviateResult page = reply;

if (filter is null)
{
yield break;
if (!page.Objects.Any())
{
yield break;
}

foreach (var c in page.Objects)
{
cursor = c.UUID;
yield return c;
}
}

foreach (var c in page.Objects)
else
{
cursor = c.UUID;
yield return c;
foreach (var c in page.Objects)
{
yield return c;
}

if (reply.ShardCursors.Count == 0)
{
yield break;
}

shardCursors = reply.ShardCursors;
}
}
}
Expand Down
3 changes: 3 additions & 0 deletions src/Weaviate.Client/Typed/TypedCollectionClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@
/// Uses cursor-based pagination for efficient iteration over large collections.
/// </summary>
/// <param name="after">Start iteration after this object ID.</param>
/// <param name="filter">Filter to apply to the objects.</param>
/// <param name="cacheSize">Number of objects to fetch per page.</param>
/// <param name="returnMetadata">Metadata to include in results.</param>
/// <param name="includeVectors">Whether to include vectors.</param>
Expand All @@ -149,8 +150,9 @@
/// <param name="returnReferences">References to return.</param>
/// <param name="cancellationToken">Cancellation token.</param>
/// <returns>An async enumerable of strongly-typed objects.</returns>
public async IAsyncEnumerable<WeaviateObject<T>> Iterator(

Check warning on line 153 in src/Weaviate.Client/Typed/TypedCollectionClient.cs

View workflow job for this annotation

GitHub Actions / Test on Weaviate v1.32.17 / All Integration Tests - v1.32.17

Symbol 'Weaviate.Client.Typed.TypedCollectionClient<T>.Iterator(System.Guid? after = null, Weaviate.Client.Models.Filter? filter = null, uint cacheSize = 100, Weaviate.Client.Models.MetadataQuery? returnMetadata = null, Weaviate.Client.Models.VectorQuery? includeVectors = null, Weaviate.Client.Internal.AutoArray<string!>? returnProperties = null, System.Collections.Generic.IList<Weaviate.Client.Models.QueryReference!>? returnReferences = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Collections.Generic.IAsyncEnumerable<Weaviate.Client.Models.Typed.WeaviateObject<T!>!>!' is not part of the declared public API (https://github.com/dotnet/roslyn/blob/main/src/RoslynAnalyzers/PublicApiAnalyzers/PublicApiAnalyzers.Help.md)
Guid? after = null,
Filter? filter = null,
uint cacheSize = CollectionClient.ITERATOR_CACHE_SIZE,
MetadataQuery? returnMetadata = null,
VectorQuery? includeVectors = null,
Expand All @@ -162,6 +164,7 @@
await foreach (
var obj in _collectionClient.Iterator(
after,
filter,
cacheSize,
returnMetadata,
includeVectors,
Expand Down
12 changes: 12 additions & 0 deletions src/Weaviate.Client/gRPC/Search.cs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ internal partial class WeaviateGrpcClient
/// <param name="returnReferences">The return references</param>
/// <param name="returnMetadata">The return metadata</param>
/// <param name="includeVectors">The include vectors</param>
/// <param name="shardCursors">Per-shard cursors for filtered iterator continuation</param>
/// <param name="cancellationToken">The cancellation token</param>
/// <returns>A task containing the search reply</returns>
internal async Task<V1.SearchReply> FetchObjects(
Expand All @@ -75,6 +76,7 @@ internal partial class WeaviateGrpcClient
IList<QueryReference>? returnReferences = null,
MetadataQuery? returnMetadata = null,
VectorQuery? includeVectors = null,
IDictionary<string, string>? shardCursors = null,
CancellationToken cancellationToken = default
)
{
Expand All @@ -97,6 +99,16 @@ internal partial class WeaviateGrpcClient
includeVectors: includeVectors
);

if (filters is not null && !req.HasAfter)
{
req.After = "";
}

if (shardCursors is { Count: > 0 })
{
req.ShardCursors.Add(shardCursors);
}

return await Search(req, cancellationToken);
}

Expand Down
13 changes: 11 additions & 2 deletions src/Weaviate.Client/gRPC/proto/v1/search_get.proto
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,14 @@ message SearchRequest {
uint32 limit = 30;
uint32 offset = 31;
uint32 autocut = 32;
string after = 33;
optional string after = 33;
// protolint:disable:next REPEATED_FIELD_NAMES_PLURALIZED
repeated SortBy sort_by = 34;
// Per-shard cursor continuation state for filtered iterator mode
// Key: shard name, Value: UUID to start after for that shard
// ONLY used when BOTH 'after' AND 'filters' are set
// Value of uuid.Nil ("00000000-0000-0000-0000-000000000000") indicates shard is exhausted
map<string, string> shard_cursors = 35;

// matches/searches for objects
optional Filters filters = 40;
Expand All @@ -52,7 +57,7 @@ message SearchRequest {

bool uses_123_api = 100 [deprecated = true];
bool uses_125_api = 101 [deprecated = true];
bool uses_127_api = 102;
bool uses_127_api = 102;
}

message GroupBy {
Expand Down Expand Up @@ -117,6 +122,10 @@ message SearchReply {
optional string generative_grouped_result = 3 [deprecated = true];
repeated GroupByResult group_by_results = 4;
optional GenerativeResult generative_grouped_results = 5;
// Per-shard cursor state for pagination continuation (filtered iterator mode)
// Key: shard name, Value: UUID to start after for that shard on next request
// Value of uuid.Nil indicates shard is exhausted (no more results to scan)
map<string, string> shard_cursors = 6;
}

message RerankReply {
Expand Down
Loading