Skip to content

Commit 3f752b6

Browse files
authored
Merge pull request #30 from googleapis/batch_fetch
Add batch fetch API
2 parents 6d00a7c + 04d06aa commit 3f752b6

File tree

6 files changed

+354
-21
lines changed

6 files changed

+354
-21
lines changed

src/main/java/com/google/cloud/aiplatform/fs/BigtableClient.java

Lines changed: 55 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -18,17 +18,21 @@
1818
import static com.google.cloud.bigtable.data.v2.models.Filters.FILTERS;
1919

2020
import com.google.api.gax.core.FixedCredentialsProvider;
21+
import com.google.api.gax.rpc.ServerStream;
2122
import com.google.auth.oauth2.AccessToken;
2223
import com.google.auth.oauth2.GoogleCredentials;
2324
import com.google.cloud.bigtable.data.v2.BigtableDataClient;
2425
import com.google.cloud.bigtable.data.v2.BigtableDataSettings;
2526
import com.google.cloud.bigtable.data.v2.models.Filters.Filter;
27+
import com.google.cloud.bigtable.data.v2.models.Query;
2628
import com.google.cloud.bigtable.data.v2.models.Row;
2729
import com.google.cloud.bigtable.data.v2.models.TableId;
2830
import com.google.cloud.bigtable.data.v2.stub.metrics.NoopMetricsProvider;
2931
import com.google.common.annotations.VisibleForTesting;
3032
import com.google.common.base.VerifyException;
3133
import java.io.IOException;
34+
import java.util.ArrayList;
35+
import java.util.List;
3236
import java.util.Optional;
3337

3438
/**
@@ -96,27 +100,61 @@ public void close() {
96100
this.bigtableDataClient.close();
97101
}
98102

99-
public Row fetchData(InternalFetchRequest request) throws Exception {
100-
Filter filter = null;
103+
private Filter buildFilter(InternalFetchRequest request) {
101104
if (request.featureViewSpec.continuousSyncEnabled) {
102-
filter =
103-
FILTERS
104-
.chain()
105-
.filter(FILTERS.family().regex(request.featureViewId))
106-
.filter(FILTERS.limit().cellsPerColumn(1));
105+
return FILTERS
106+
.chain()
107+
.filter(FILTERS.family().regex(request.featureViewId))
108+
.filter(FILTERS.limit().cellsPerColumn(1));
107109
} else {
108-
filter =
109-
FILTERS
110-
.chain()
111-
.filter(
112-
FILTERS
113-
.qualifier()
114-
.rangeWithinFamily(request.featureViewId)
115-
.startClosed(defaultColumn)
116-
.endClosed(directWriteColumn))
117-
.filter(FILTERS.limit().cellsPerColumn(1));
110+
return FILTERS
111+
.chain()
112+
.filter(
113+
FILTERS
114+
.qualifier()
115+
.rangeWithinFamily(request.featureViewId)
116+
.startClosed(defaultColumn)
117+
.endClosed(directWriteColumn))
118+
.filter(FILTERS.limit().cellsPerColumn(1));
118119
}
120+
}
121+
122+
public Row fetchData(InternalFetchRequest request) throws Exception {
123+
Filter filter = buildFilter(request);
119124
return bigtableDataClient.readRow(
120125
TableId.of(request.cloudBigtableSpec.tableId), request.dataKey, filter);
121126
}
127+
128+
/**
129+
* Fetches multiple rows from Bigtable based on the provided InternalFetchRequest.
130+
* This method assumes that {@code request.dataKeys} is populated.
131+
*
132+
* @param request An {@link InternalFetchRequest} with {@code dataKeys} set.
133+
* @return A {@link List} of {@link Row} objects for the keys found in Bigtable. Keys not found
134+
* in Bigtable will not have corresponding entries in the returned list.
135+
* @throws IllegalArgumentException if {@code request.dataKeys} is null or empty.
136+
* @throws Exception for other underlying issues during the Bigtable read.
137+
*/
138+
public List<Row> batchFetchData(InternalFetchRequest request) throws Exception {
139+
if (request.dataKeys == null || request.dataKeys.isEmpty()) {
140+
throw new IllegalArgumentException("batchFetchData requires InternalFetchRequest with populated dataKeys.");
141+
}
142+
143+
Filter filter = buildFilter(request);
144+
TableId tableId = TableId.of(request.cloudBigtableSpec.tableId);
145+
146+
Query query = Query.create(tableId);
147+
for (String dataKey : request.dataKeys) {
148+
query = query.rowKey(dataKey);
149+
}
150+
query = query.filter(filter);
151+
152+
ServerStream<Row> rowStream = bigtableDataClient.readRows(query);
153+
154+
List<Row> rows = new ArrayList<>();
155+
for (Row row : rowStream) {
156+
rows.add(row);
157+
}
158+
return rows;
159+
}
122160
}

src/main/java/com/google/cloud/aiplatform/fs/CloudBigtableCache.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,4 +119,4 @@ class CloudBigtableSpec {
119119
this.instanceId = fos.getBigtable().getBigtableMetadata().getInstanceId();
120120
this.tableId = fos.getBigtable().getBigtableMetadata().getTableId();
121121
}
122-
}
122+
}

src/main/java/com/google/cloud/aiplatform/fs/Converter.java

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,10 @@
2929
import com.google.protobuf.InvalidProtocolBufferException;
3030
import io.grpc.Status;
3131
import io.grpc.Status.Code;
32+
import java.util.ArrayList;
33+
import java.util.HashMap;
34+
import java.util.List;
35+
import java.util.Map;
3236
import java.util.logging.Level;
3337
import java.util.logging.Logger;
3438

@@ -163,4 +167,73 @@ public static FetchFeatureValuesResponse rowToResponse(Row row, InternalFetchReq
163167
responseBuilder.setKeyValues(internalStorageToKeyValuesList(cell, request));
164168
return responseBuilder.build();
165169
}
170+
171+
/**
172+
* Converts a list of Bigtable Rows to a list of FetchFeatureValuesResponse.
173+
* The output list is ordered to match the keys in {@code request.dataKeys}.
174+
*
175+
* @param rows The list of Rows returned from Bigtable. These are only the rows that were found.
176+
* @param request An {@link InternalFetchRequest} containing the ordered list of keys in {@code request.dataKeys}.
177+
* @return A {@link List<FetchFeatureValuesResponse>} where each element corresponds to a key in {@code request.dataKeys}.
178+
* If a key was not found in Bigtable, the response for that key will contain an empty {@link FeatureNameValuePairList}.
179+
* @throws UnimplementedException if {@code request.format} is PROTO_STRUCT.
180+
* @throws InternalException if conversion of any *found* row fails due to unexpected data format,
181+
* propagating the underlying issue.
182+
*/
183+
public static List<FetchFeatureValuesResponse> rowsToResponses(List<Row> rows, InternalFetchRequest request) throws Exception {
184+
if (request.format == FeatureViewDataFormat.PROTO_STRUCT) {
185+
throw new UnimplementedException(
186+
new Throwable("PROTO_STRUCT is not supported for batch fetch"),
187+
/* statusCode= */ GrpcStatusCode.of(Code.UNIMPLEMENTED),
188+
/* retryable= */ false);
189+
}
190+
191+
if (request.dataKeys == null || request.dataKeys.isEmpty()) {
192+
return new ArrayList<>(); // Return empty list if no keys were requested.
193+
}
194+
195+
// 1. Create a map for quick lookup of Rows by their key.
196+
Map<String, Row> keyToRowMap = new HashMap<>();
197+
for (Row row : rows) {
198+
// Row.getKey() returns a ByteString, convert to String.
199+
keyToRowMap.put(row.getKey().toStringUtf8(), row);
200+
}
201+
202+
// 2. Build the ordered list of responses.
203+
List<FetchFeatureValuesResponse> responses = new ArrayList<>(request.dataKeys.size());
204+
205+
// Iterate through the *ordered* requested keys from InternalFetchRequest.dataKeys.
206+
for (String dataKey : request.dataKeys) {
207+
Row foundRow = keyToRowMap.get(dataKey);
208+
FetchFeatureValuesResponse response;
209+
210+
if (foundRow != null) {
211+
// Key was found in Bigtable. Convert the Row to a FeatureViewCell and then to a Response.
212+
try {
213+
FeatureViewCell cell = rowToFeatureViewCell(foundRow, request);
214+
response = FetchFeatureValuesResponse.newBuilder()
215+
.setKeyValues(internalStorageToKeyValuesList(cell, request))
216+
.build();
217+
} catch (Exception e) {
218+
// If conversion of a *found* row fails, it indicates an internal data issue.
219+
// Following the pattern of rowToResponse, we throw an InternalException.
220+
throw new InternalException(
221+
new Throwable(String.format("Failed to convert Bigtable Row for key '%s': %s", dataKey, e.getMessage()), e),
222+
/* statusCode= */ GrpcStatusCode.of(Status.Code.INTERNAL),
223+
/* retryable= */ false);
224+
}
225+
} else {
226+
// Key was not found in Bigtable. Create a response indicating this.
227+
// For KEY_VALUE format, an empty FeatureNameValuePairList signifies that no features
228+
// were found for the requested key.
229+
response = FetchFeatureValuesResponse.newBuilder()
230+
.setKeyValues(FeatureNameValuePairList.getDefaultInstance())
231+
.build();
232+
logger.log(Level.FINE, String.format("Entity id '%s' not found in Bigtable during batch fetch.", dataKey));
233+
}
234+
responses.add(response);
235+
}
236+
237+
return responses;
238+
}
166239
}

src/main/java/com/google/cloud/aiplatform/fs/FeatureOnlineStoreDirectClient.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import com.google.cloud.aiplatform.v1.FetchFeatureValuesResponse;
2626
import com.google.cloud.bigtable.data.v2.models.Row;
2727
import io.grpc.Status.Code;
28+
import java.util.List;
2829
import java.util.Optional;
2930
import java.util.logging.Level;
3031
import java.util.logging.Logger;
@@ -99,6 +100,21 @@ public FetchFeatureValuesResponse fetchFeatureValues(FetchFeatureValuesRequest r
99100
return Converter.rowToResponse(row, internalRequest);
100101
}
101102

103+
public List<FetchFeatureValuesResponse> batchFetchFeatureValues(List<FetchFeatureValuesRequest> requests) throws Exception {
104+
for (FetchFeatureValuesRequest request : requests) {
105+
if (request.getDataFormat().equals(FeatureViewDataFormat.PROTO_STRUCT)) {
106+
throw new UnimplementedException(
107+
new Throwable("PROTO_STRUCT is not supported yet for batch fetch"),
108+
/* statusCode= */ GrpcStatusCode.of(Code.UNIMPLEMENTED),
109+
/* retryable= */ false);
110+
}
111+
}
112+
113+
InternalFetchRequest internalRequest = new InternalFetchRequest(requests);
114+
List<Row> rows = this.bigtableClientManager.getClient().batchFetchData(internalRequest);
115+
return Converter.rowsToResponses(rows, internalRequest);
116+
}
117+
102118
public void close() {
103119
this.bigtableClientManager.shutdown();
104120
}

0 commit comments

Comments
 (0)