Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions NEXT_CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
- Fixed `fetchAutoCommitStateFromServer()` to accept both `"1"`/`"0"` and `"true"`/`"false"` responses from `SET AUTOCOMMIT` query, since different server implementations return different formats.
- Fixed socket leak in SDK HTTP client that prevented CRaC checkpointing. The SDK's connection pool was not shut down on `connection.close()`, leaving TCP sockets open.
- Fixed Date fields within complex types (ARRAY, STRUCT, MAP) being returned as epoch day integers instead of proper date values.
- Fixed primitive types within complex types (ARRAY, MAP, STRUCT) not being correctly parsed when Arrow serialization uses alternate formats: TIMESTAMP/TIMESTAMP_NTZ as epoch microseconds or component arrays, and BINARY as base64-encoded strings.

---
*Note: When making changes, please add your change under the appropriate section
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,16 @@
import com.fasterxml.jackson.databind.JsonNode;
import java.io.IOException;
import java.math.BigDecimal;
import java.nio.charset.StandardCharsets;
import java.sql.Date;
import java.sql.Time;
import java.sql.Timestamp;
import java.time.DateTimeException;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
Expand Down Expand Up @@ -144,6 +148,13 @@ private Object convertValueNode(JsonNode node, String expectedType)
return jsonText;
}
}
// Arrow serializes TIMESTAMP_NTZ inside nested types as a JSON array of components:
// [year, month, day, hour, minute, second] (and optionally nanoseconds as a 7th element).
// e.g., [{"event_ts_ntz":[2023,10,5,15,20,30]}]
// We must handle this before calling node.asText(), which returns "" for array nodes.
if (node.isArray() && expectedType.equalsIgnoreCase(DatabricksTypeUtil.TIMESTAMP_NTZ)) {
return convertTimestampNtzArray(node);
}
return convertPrimitive(node.asText(), expectedType);
}

Expand Down Expand Up @@ -219,17 +230,63 @@ private Object convertPrimitive(String text, String type) {
}
}
case DatabricksTypeUtil.TIMESTAMP:
return parseTimestamp(text);
case DatabricksTypeUtil.TIMESTAMP_NTZ:
try {
return parseTimestamp(text);
} catch (IllegalArgumentException e) {
// Arrow serializes TIMESTAMP/TIMESTAMP_NTZ inside nested types as epoch microseconds.
// e.g., {"ts":1696519230000000} for 2023-10-05 15:20:30 UTC
try {
long micros = Long.parseLong(text);
long seconds = Math.floorDiv(micros, 1_000_000L);
long microsRemainder = Math.floorMod(micros, 1_000_000L);
Instant instant = Instant.ofEpochSecond(seconds, microsRemainder * 1_000);
return Timestamp.from(instant);
} catch (NumberFormatException nfe) {
LOGGER.error(e, "Failed to parse TIMESTAMP value '{}' as epoch microseconds", text);
throw e;
}
}
case DatabricksTypeUtil.TIME:
return Time.valueOf(text);
case DatabricksTypeUtil.BINARY:
return text.getBytes();
// Arrow serializes BINARY inside nested types as base64-encoded strings.
// e.g., {"bin_data":"QUJD"} for CAST('ABC' AS BINARY)
try {
return Base64.getDecoder().decode(text);
} catch (IllegalArgumentException e) {
// Not base64 encoded, fall back to raw bytes
return text.getBytes(StandardCharsets.UTF_8);
}
case DatabricksTypeUtil.STRING:
default:
return text;
}
}

/**
* Converts a TIMESTAMP_NTZ value serialized as a JSON array of components
* [year,month,day,hour,minute,second] into a {@link Timestamp}.
*/
private Timestamp convertTimestampNtzArray(JsonNode arrayNode) throws DatabricksParsingException {
if (arrayNode == null || !arrayNode.isArray() || arrayNode.size() < 6) {
throw new DatabricksParsingException(
"Invalid TIMESTAMP_NTZ array representation: expected at least 6 elements "
+ "[year,month,day,hour,minute,second], but got: "
+ arrayNode,
DatabricksDriverErrorCode.JSON_PARSING_ERROR);
}
int year = arrayNode.get(0).asInt();
int month = arrayNode.get(1).asInt();
int day = arrayNode.get(2).asInt();
int hour = arrayNode.get(3).asInt();
int minute = arrayNode.get(4).asInt();
int second = arrayNode.get(5).asInt();
int nano = arrayNode.size() > 6 && arrayNode.get(6) != null ? arrayNode.get(6).asInt(0) : 0;
LocalDateTime ldt = LocalDateTime.of(year, month, day, hour, minute, second, nano);
return Timestamp.valueOf(ldt);
}

private Timestamp parseTimestamp(String text) {
if (WildcardUtil.isNullOrEmpty(text)) {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,8 @@ private Object convertValue(Object value, String type) {
case DatabricksTypeUtil.DATE:
return Date.valueOf(value.toString());
case DatabricksTypeUtil.TIMESTAMP:
return Timestamp.valueOf(value.toString());
case DatabricksTypeUtil.TIMESTAMP_NTZ:
return value instanceof Timestamp ? value : Timestamp.valueOf(value.toString());
case DatabricksTypeUtil.TIME:
return Time.valueOf(value.toString());
case DatabricksTypeUtil.BINARY:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,8 @@ private Object convertSimpleValue(Object value, String type) {
case DatabricksTypeUtil.DATE:
return Date.valueOf(value.toString());
case DatabricksTypeUtil.TIMESTAMP:
return Timestamp.valueOf(value.toString());
case DatabricksTypeUtil.TIMESTAMP_NTZ:
return value instanceof Timestamp ? value : Timestamp.valueOf(value.toString());
case DatabricksTypeUtil.TIME:
return Time.valueOf(value.toString());
case DatabricksTypeUtil.BINARY:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,140 @@ void testDateAsStringInStruct() throws DatabricksParsingException {
}
}

@Test
void testTimestampAsEpochMicrosInStruct() throws DatabricksParsingException {
// TIMESTAMP inside STRUCT — Arrow serializes as epoch microseconds
// 1696519230000000 micros = 1696519230000 millis (2023-10-05 15:20:30 UTC)
String json = "{\"ts\":1696519230000000}";

DatabricksStruct dbStruct = parser.parseJsonStringToDbStruct(json, "STRUCT<ts:TIMESTAMP>");
assertNotNull(dbStruct);

try {
Object[] attrs = dbStruct.getAttributes();
assertEquals(1, attrs.length);
assertInstanceOf(Timestamp.class, attrs[0]);
Timestamp ts = (Timestamp) attrs[0];
assertEquals(1696519230000L, ts.getTime());
assertEquals(0, ts.getNanos() % 1_000_000); // no sub-millisecond component
} catch (Exception e) {
fail("Should not throw: " + e.getMessage());
}
}

@Test
void testTimestampAsEpochMicrosInArray() throws DatabricksParsingException {
// TIMESTAMP inside plain ARRAY — Arrow serializes as epoch microseconds
String json = "[1696519230000000]";

DatabricksArray dbArray = parser.parseJsonStringToDbArray(json, "ARRAY<TIMESTAMP>");
assertNotNull(dbArray);

try {
Object[] elements = (Object[]) dbArray.getArray();
assertEquals(1, elements.length);
assertInstanceOf(Timestamp.class, elements[0]);
Timestamp ts = (Timestamp) elements[0];
assertEquals(1696519230000L, ts.getTime());
} catch (Exception e) {
fail("Should not throw: " + e.getMessage());
}
}

@Test
void testTimestampAsEpochMicrosInMap() throws DatabricksParsingException {
// TIMESTAMP as value in MAP — Arrow serializes as epoch microseconds
String json = "{\"key1\":1696519230000000}";

DatabricksMap<String, Object> dbMap =
parser.parseJsonStringToDbMap(json, "MAP<STRING,TIMESTAMP>");
assertNotNull(dbMap);

Object val = dbMap.get("key1");
assertInstanceOf(Timestamp.class, val);
assertEquals(1696519230000L, ((Timestamp) val).getTime());
}

@Test
void testTimestampNtzAsStringInStruct() throws DatabricksParsingException {
// TIMESTAMP_NTZ with string format should be handled, not fall through to STRING
String json = "{\"ts\":\"2023-10-05 15:20:30\"}";

DatabricksStruct dbStruct = parser.parseJsonStringToDbStruct(json, "STRUCT<ts:TIMESTAMP_NTZ>");
assertNotNull(dbStruct);

try {
Object[] attrs = dbStruct.getAttributes();
assertEquals(1, attrs.length);
assertInstanceOf(Timestamp.class, attrs[0]);
} catch (Exception e) {
fail("Should not throw: " + e.getMessage());
}
}

@Test
void testTimestampNtzAsArrayComponentsInStruct() throws DatabricksParsingException {
// Server actually returns TIMESTAMP_NTZ as array of components: [year,month,day,hour,min,sec]
// Confirmed via E2E: [{"event_ts_ntz":[2023,10,5,15,20,30]}]
String json = "{\"ts_ntz\":[2023,10,5,15,20,30]}";

DatabricksStruct dbStruct =
parser.parseJsonStringToDbStruct(json, "STRUCT<ts_ntz:TIMESTAMP_NTZ>");
assertNotNull(dbStruct);

try {
Object[] attrs = dbStruct.getAttributes();
assertEquals(1, attrs.length);
assertInstanceOf(Timestamp.class, attrs[0]);
// TIMESTAMP_NTZ is timezone-independent — Timestamp.valueOf(LocalDateTime) is used,
// so toLocalDateTime() gives back the exact components regardless of JVM timezone.
Timestamp ts = (Timestamp) attrs[0];
assertEquals(java.time.LocalDateTime.of(2023, 10, 5, 15, 20, 30), ts.toLocalDateTime());
} catch (Exception e) {
fail("Should not throw: " + e.getMessage());
}
}

@Test
void testBinaryAsBase64InStruct() throws DatabricksParsingException {
// BINARY inside STRUCT — server returns base64-encoded strings
// Confirmed via E2E: [{"bin_data":"QUJD"}] for CAST('ABC' AS BINARY)
// "QUJD" is base64 for "ABC"
String json = "{\"bin_data\":\"QUJD\"}";

DatabricksStruct dbStruct = parser.parseJsonStringToDbStruct(json, "STRUCT<bin_data:BINARY>");
assertNotNull(dbStruct);

try {
Object[] attrs = dbStruct.getAttributes();
assertEquals(1, attrs.length);
assertInstanceOf(byte[].class, attrs[0]);
assertArrayEquals("ABC".getBytes(), (byte[]) attrs[0]);
} catch (Exception e) {
fail("Should not throw: " + e.getMessage());
}
}

@Test
void testBinaryAsBase64InArray() throws DatabricksParsingException {
// BINARY inside ARRAY — server returns base64-encoded strings
// Confirmed via E2E: ["QUJD","WFla"] for ARRAY(CAST('ABC' AS BINARY), CAST('XYZ' AS BINARY))
String json = "[\"QUJD\",\"WFla\"]";

DatabricksArray dbArray = parser.parseJsonStringToDbArray(json, "ARRAY<BINARY>");
assertNotNull(dbArray);

try {
Object[] elements = (Object[]) dbArray.getArray();
assertEquals(2, elements.length);
assertInstanceOf(byte[].class, elements[0]);
assertArrayEquals("ABC".getBytes(), (byte[]) elements[0]);
assertArrayEquals("XYZ".getBytes(), (byte[]) elements[1]);
} catch (Exception e) {
fail("Should not throw: " + e.getMessage());
}
}

@Test
void testFormatComplexTypeString_withMapType() {
String jsonString = "[{\"key\":1,\"value\":2},{\"key\":3,\"value\":4}]";
Expand Down
Loading