Skip to content

Commit 33aa31a

Browse files
matar993Marco Scalzotmnd1991agilelab-tmnd1991
authored
Feature/182 implement querytable api on icebergsharedtable (#214)
Co-authored-by: Marco Scalzo <af39763@enel.com> Co-authored-by: Antonio Murgia <ing.murgia@icloud.com> Co-authored-by: Antonio Murgia <antonio.murgia@agilelab.it>
1 parent 9c0da63 commit 33aa31a

19 files changed

+815
-44
lines changed

server/app/src/test/java/io/whitefox/api/deltasharing/SampleTables.java

Lines changed: 117 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
11
package io.whitefox.api.deltasharing;
22

33
import static io.whitefox.DeltaTestUtils.*;
4-
import static io.whitefox.IcebergTestUtils.icebergTableWithHadoopCatalog;
5-
import static io.whitefox.IcebergTestUtils.s3IcebergTableWithAwsGlueCatalog;
4+
import static io.whitefox.IcebergTestUtils.*;
65

76
import io.whitefox.AwsGlueTestConfig;
87
import io.whitefox.S3TestConfig;
@@ -73,6 +72,78 @@ public static StorageManager createStorageManager() {
7372
0L)));
7473
}
7574

75+
public static final ParquetProtocol localIcebergTable1Protocol =
76+
ParquetProtocol.ofMinReaderVersion(1);
77+
public static final ParquetProtocol s3IcebergTable1Protocol =
78+
ParquetProtocol.ofMinReaderVersion(1);
79+
public static final ParquetMetadata localIcebergTable1Metadata = ParquetMetadata.builder()
80+
.metadata(ParquetMetadata.Metadata.builder()
81+
.id("3369848726892806393")
82+
.name(Optional.of("metastore.test_db.icebergtable1"))
83+
.format(new Format())
84+
.schemaString(
85+
"{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"long\",\"nullable\":false,\"metadata\":{}}]}")
86+
.partitionColumns(List.of())
87+
.version(Optional.of(1L))
88+
.configuration(Optional.of(Map.of("write.parquet.compression-codec", "zstd")))
89+
.build())
90+
.build();
91+
92+
public static final Set<FileObjectWithoutPresignedUrl> localIcebergTableFilesToBeSigned = Set.of(
93+
new FileObjectWithoutPresignedUrl()
94+
._file(new FileObjectFileWithoutPresignedUrl()
95+
.partitionValues(Map.of())
96+
.size(419L)
97+
.stats(
98+
"{\"numRecords\":1,\"minValues\":{\"id\":3},\"maxValues\":{\"id\":3},\"nullCount\":{\"id\":0}}")
99+
.version(1L)
100+
.timestamp(1705667209813L)),
101+
new FileObjectWithoutPresignedUrl()
102+
._file(new FileObjectFileWithoutPresignedUrl()
103+
.partitionValues(Map.of())
104+
.size(419L)
105+
.stats(
106+
"{\"numRecords\":1,\"minValues\":{\"id\":2},\"maxValues\":{\"id\":2},\"nullCount\":{\"id\":0}}")
107+
.version(1L)
108+
.timestamp(1705667209813L)),
109+
new FileObjectWithoutPresignedUrl()
110+
._file(new FileObjectFileWithoutPresignedUrl()
111+
.partitionValues(Map.of())
112+
.size(419L)
113+
.stats(
114+
"{\"numRecords\":1,\"minValues\":{\"id\":1},\"maxValues\":{\"id\":1},\"nullCount\":{\"id\":0}}")
115+
.version(1L)
116+
.timestamp(1705667209813L)),
117+
new FileObjectWithoutPresignedUrl()
118+
._file(new FileObjectFileWithoutPresignedUrl()
119+
.partitionValues(Map.of())
120+
.size(418L)
121+
.stats(
122+
"{\"numRecords\":1,\"minValues\":{\"id\":0},\"maxValues\":{\"id\":0},\"nullCount\":{\"id\":0}}")
123+
.version(1L)
124+
.timestamp(1705667209813L)),
125+
new FileObjectWithoutPresignedUrl()
126+
._file(new FileObjectFileWithoutPresignedUrl()
127+
.partitionValues(Map.of())
128+
.size(419L)
129+
.stats(
130+
"{\"numRecords\":1,\"minValues\":{\"id\":4},\"maxValues\":{\"id\":4},\"nullCount\":{\"id\":0}}")
131+
.version(1L)
132+
.timestamp(1705667209813L)));
133+
134+
public static final ParquetMetadata s3IcebergTable1Metadata = ParquetMetadata.builder()
135+
.metadata(ParquetMetadata.Metadata.builder()
136+
.id("7819530050735196523")
137+
.name(Optional.of("metastore.test_glue_db.icebergtable1"))
138+
.format(new Format())
139+
.schemaString(
140+
"{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"long\",\"nullable\":false,\"metadata\":{}}]}")
141+
.partitionColumns(List.of())
142+
.version(Optional.of(1L))
143+
.configuration(Optional.of(Map.of("write.parquet.compression-codec", "zstd")))
144+
.build())
145+
.build();
146+
76147
public static final ParquetMetadata deltaTable1Metadata = ParquetMetadata.builder()
77148
.metadata(ParquetMetadata.Metadata.builder()
78149
.id("56d48189-cdbc-44f2-9b0e-2bded4c79ed7")
@@ -98,6 +169,7 @@ public static StorageManager createStorageManager() {
98169
.configuration(Optional.of(Map.of()))
99170
.build())
100171
.build();
172+
101173
public static final ParquetMetadata deltaTableWithHistory1Metadata = ParquetMetadata.builder()
102174
.metadata(ParquetMetadata.Metadata.builder()
103175
.id("56d48189-cdbc-44f2-9b0e-2bded4c79ed7")
@@ -110,6 +182,7 @@ public static StorageManager createStorageManager() {
110182
.configuration(Optional.of(Map.of()))
111183
.build())
112184
.build();
185+
113186
public static final ParquetProtocol deltaTable1Protocol = ParquetProtocol.ofMinReaderVersion(1);
114187

115188
public static final ParquetProtocol s3DeltaTable1Protocol = ParquetProtocol.ofMinReaderVersion(1);
@@ -196,6 +269,48 @@ public static StorageManager createStorageManager() {
196269
.build())
197270
.build());
198271

272+
public static final Set<FileObjectWithoutPresignedUrl> s3IcebergTable1FilesWithoutPresignedUrl =
273+
Set.of(
274+
new FileObjectWithoutPresignedUrl()
275+
._file(new FileObjectFileWithoutPresignedUrl()
276+
.partitionValues(Map.of())
277+
.size(419L)
278+
.stats(
279+
"{\"numRecords\":1,\"minValues\":{\"id\":4},\"maxValues\":{\"id\":4},\"nullCount\":{\"id\":0}}")
280+
.version(1L)
281+
.timestamp(1705948389052L)),
282+
new FileObjectWithoutPresignedUrl()
283+
._file(new FileObjectFileWithoutPresignedUrl()
284+
.partitionValues(Map.of())
285+
.size(419L)
286+
.stats(
287+
"{\"numRecords\":1,\"minValues\":{\"id\":3},\"maxValues\":{\"id\":3},\"nullCount\":{\"id\":0}}")
288+
.version(1L)
289+
.timestamp(1705948389052L)),
290+
new FileObjectWithoutPresignedUrl()
291+
._file(new FileObjectFileWithoutPresignedUrl()
292+
.partitionValues(Map.of())
293+
.size(419L)
294+
.stats(
295+
"{\"numRecords\":1,\"minValues\":{\"id\":2},\"maxValues\":{\"id\":2},\"nullCount\":{\"id\":0}}")
296+
.version(1L)
297+
.timestamp(1705948389052L)),
298+
new FileObjectWithoutPresignedUrl()
299+
._file(new FileObjectFileWithoutPresignedUrl()
300+
.partitionValues(Map.of())
301+
.size(418L)
302+
.stats(
303+
"{\"numRecords\":1,\"minValues\":{\"id\":0},\"maxValues\":{\"id\":0},\"nullCount\":{\"id\":0}}")
304+
.version(1L)
305+
.timestamp(1705948389052L)),
306+
new FileObjectWithoutPresignedUrl()
307+
._file(new FileObjectFileWithoutPresignedUrl()
308+
.partitionValues(Map.of())
309+
.size(419L)
310+
.stats(
311+
"{\"numRecords\":1,\"minValues\":{\"id\":1},\"maxValues\":{\"id\":1},\"nullCount\":{\"id\":0}}")
312+
.version(1L)
313+
.timestamp(1705948389052L)));
199314
public static final Set<FileObjectWithoutPresignedUrl> s3DeltaTable1FilesWithoutPresignedUrl =
200315
Set.of(
201316
new FileObjectWithoutPresignedUrl()

server/app/src/test/java/io/whitefox/api/deltasharing/model/FileObjectFileWithoutPresignedUrl.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,10 @@
66
import java.util.Map;
77
import java.util.Objects;
88

9+
/**
10+
* This class is test-only and it's needed because we can't know in advance pre-signed urls, in this way we can
11+
* easily run assertions on FileObjects that contain pre-signed urls ignoring the url.
12+
*/
913
public class FileObjectFileWithoutPresignedUrl {
1014

1115
private Map<String, String> partitionValues = new HashMap<>();

server/app/src/test/java/io/whitefox/api/deltasharing/model/FileObjectWithoutPresignedUrl.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,10 @@
33
import com.fasterxml.jackson.annotation.JsonProperty;
44
import java.util.Objects;
55

6+
/**
7+
* This class is test-only and it's needed because we can't know in advance pre-signed urls, in this way we can
8+
* easily run assertions on FileObjects that contain pre-signed urls ignoring the url.
9+
*/
610
public class FileObjectWithoutPresignedUrl {
711

812
private FileObjectFileWithoutPresignedUrl _file;

server/app/src/test/java/io/whitefox/api/deltasharing/server/DeltaSharesApiImplAwsTest.java

Lines changed: 131 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ public void icebergTableMetadata() throws IOException {
149149

150150
@DisabledOnOs(OS.WINDOWS)
151151
@Test
152-
public void queryTableCurrentVersion() throws IOException {
152+
public void queryDeltaTableCurrentVersion() throws IOException {
153153
var responseBodyLines = given()
154154
.when()
155155
.filter(deltaFilter)
@@ -192,7 +192,7 @@ public void queryTableCurrentVersion() throws IOException {
192192

193193
@DisabledOnOs(OS.WINDOWS)
194194
@Test
195-
public void queryTableByVersion() throws IOException {
195+
public void queryDeltaTableByVersion() throws IOException {
196196
var responseBodyLines = given()
197197
.when()
198198
.filter(deltaFilter)
@@ -232,4 +232,133 @@ public void queryTableByVersion() throws IOException {
232232
assertEquals(7, responseBodyLines.length);
233233
assertEquals(s3DeltaTable1FilesWithoutPresignedUrl, files);
234234
}
235+
236+
@DisabledOnOs(OS.WINDOWS)
237+
@Test
238+
public void queryIcebergTableCurrentVersion() throws IOException {
239+
var responseBodyLines = given()
240+
.when()
241+
.filter(deltaFilter)
242+
.body("{}")
243+
.header(new Header("Content-Type", "application/json"))
244+
.post(
245+
"delta-api/v1/shares/{share}/schemas/{schema}/tables/{table}/query",
246+
"s3share",
247+
"s3schema",
248+
"s3IcebergTable1")
249+
.then()
250+
.statusCode(200)
251+
.extract()
252+
.body()
253+
.asString()
254+
.split("\n");
255+
256+
assertEquals(
257+
s3IcebergTable1Protocol,
258+
objectMapper.reader().readValue(responseBodyLines[0], ParquetProtocol.class));
259+
assertEquals(
260+
s3IcebergTable1Metadata,
261+
objectMapper.reader().readValue(responseBodyLines[1], ParquetMetadata.class));
262+
var files = Arrays.stream(responseBodyLines)
263+
.skip(2)
264+
.map(line -> {
265+
try {
266+
return objectMapper
267+
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
268+
.reader()
269+
.readValue(line, FileObjectWithoutPresignedUrl.class);
270+
} catch (IOException e) {
271+
throw new RuntimeException(e);
272+
}
273+
})
274+
.collect(Collectors.toSet());
275+
assertEquals(7, responseBodyLines.length);
276+
assertEquals(s3IcebergTable1FilesWithoutPresignedUrl, files);
277+
}
278+
279+
@DisabledOnOs(OS.WINDOWS)
280+
@Test
281+
public void queryIcebergTableByVersion() throws IOException {
282+
var responseBodyLines = given()
283+
.when()
284+
.filter(deltaFilter)
285+
.body("{\"version\": 7819530050735196523}")
286+
.header(new Header("Content-Type", "application/json"))
287+
.post(
288+
"delta-api/v1/shares/{share}/schemas/{schema}/tables/{table}/query",
289+
"s3share",
290+
"s3schema",
291+
"s3IcebergTable1")
292+
.then()
293+
.statusCode(200)
294+
.extract()
295+
.body()
296+
.asString()
297+
.split("\n");
298+
299+
assertEquals(
300+
s3IcebergTable1Protocol,
301+
objectMapper.reader().readValue(responseBodyLines[0], ParquetProtocol.class));
302+
assertEquals(
303+
s3IcebergTable1Metadata,
304+
objectMapper.reader().readValue(responseBodyLines[1], ParquetMetadata.class));
305+
var files = Arrays.stream(responseBodyLines)
306+
.skip(2)
307+
.map(line -> {
308+
try {
309+
return objectMapper
310+
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
311+
.reader()
312+
.readValue(line, FileObjectWithoutPresignedUrl.class);
313+
} catch (IOException e) {
314+
throw new RuntimeException(e);
315+
}
316+
})
317+
.collect(Collectors.toSet());
318+
assertEquals(7, responseBodyLines.length);
319+
assertEquals(s3IcebergTable1FilesWithoutPresignedUrl, files);
320+
}
321+
322+
@DisabledOnOs(OS.WINDOWS)
323+
@Test
324+
public void queryIcebergTableByTs() throws IOException {
325+
var responseBodyLines = given()
326+
.when()
327+
.filter(deltaFilter)
328+
.body("{\"timestamp\": \"2024-02-02T12:00:00Z\"}")
329+
.header(new Header("Content-Type", "application/json"))
330+
.post(
331+
"delta-api/v1/shares/{share}/schemas/{schema}/tables/{table}/query",
332+
"s3share",
333+
"s3schema",
334+
"s3IcebergTable1")
335+
.then()
336+
.statusCode(200)
337+
.extract()
338+
.body()
339+
.asString()
340+
.split("\n");
341+
342+
assertEquals(
343+
s3IcebergTable1Protocol,
344+
objectMapper.reader().readValue(responseBodyLines[0], ParquetProtocol.class));
345+
assertEquals(
346+
s3IcebergTable1Metadata,
347+
objectMapper.reader().readValue(responseBodyLines[1], ParquetMetadata.class));
348+
var files = Arrays.stream(responseBodyLines)
349+
.skip(2)
350+
.map(line -> {
351+
try {
352+
return objectMapper
353+
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
354+
.reader()
355+
.readValue(line, FileObjectWithoutPresignedUrl.class);
356+
} catch (IOException e) {
357+
throw new RuntimeException(e);
358+
}
359+
})
360+
.collect(Collectors.toSet());
361+
assertEquals(7, responseBodyLines.length);
362+
assertEquals(s3IcebergTable1FilesWithoutPresignedUrl, files);
363+
}
235364
}

0 commit comments

Comments
 (0)