From 52deed216ab4ab49bcafd1fe02e23a9376025c87 Mon Sep 17 00:00:00 2001 From: Eka Winata Date: Mon, 14 Apr 2025 18:26:43 +0700 Subject: [PATCH 1/8] [refactor] - Use caffeine instead Guava - Use expireAfterAccess --- build.gradle | 6 ++-- .../depot/config/MaxComputeSinkConfig.java | 7 ++++ .../config/converter/DurationConverter.java | 15 ++++++++ .../session/StreamingSessionManager.java | 34 +++++++------------ 4 files changed, 37 insertions(+), 25 deletions(-) create mode 100644 src/main/java/com/gotocompany/depot/config/converter/DurationConverter.java diff --git a/build.gradle b/build.gradle index 2725acbc..54ae3aa2 100644 --- a/build.gradle +++ b/build.gradle @@ -22,15 +22,13 @@ plugins { } group 'com.gotocompany' -version '0.10.14' +version '0.10.15' repositories { mavenLocal() mavenCentral() } -configurations.configureEach { exclude group: 'com.google.guava', module: 'listenablefuture' } - dependencies { implementation group: 'com.google.protobuf', name: 'protobuf-java', version: '3.25.0' implementation group: 'com.datadoghq', name: 'java-dogstatsd-client', version: '2.13.0' @@ -60,7 +58,7 @@ dependencies { implementation group: 'com.jayway.jsonpath', name: 'json-path', version: '2.8.0' implementation group: 'com.squareup.okhttp3', name: 'okhttp', version: '4.9.1' implementation group: 'joda-time', name: 'joda-time', version: '2.10.2' - implementation('com.google.guava:guava:32.0.1-jre') { force = true } + implementation('com.github.ben-manes.caffeine:caffeine:2.9.3') testImplementation group: 'junit', name: 'junit', version: '4.13.1' testImplementation group: 'org.assertj', name: 'assertj-core', version: '3.26.3' testImplementation 'org.hamcrest:hamcrest-all:1.3' diff --git a/src/main/java/com/gotocompany/depot/config/MaxComputeSinkConfig.java b/src/main/java/com/gotocompany/depot/config/MaxComputeSinkConfig.java index 2146fa40..748d4b1f 100644 --- a/src/main/java/com/gotocompany/depot/config/MaxComputeSinkConfig.java +++ b/src/main/java/com/gotocompany/depot/config/MaxComputeSinkConfig.java @@ -3,6 +3,7 @@ import com.aliyun.odps.tunnel.io.CompressOption; import com.gotocompany.depot.common.TupleString; import com.gotocompany.depot.config.converter.ConfToListConverter; +import com.gotocompany.depot.config.converter.DurationConverter; import com.gotocompany.depot.config.converter.KeyValuePairsToMapConverter; import com.gotocompany.depot.config.converter.LocalDateTimeConverter; import com.gotocompany.depot.config.converter.MaxComputeOdpsGlobalSettingsConverter; @@ -11,6 +12,7 @@ import org.aeonbits.owner.Config; import java.math.RoundingMode; +import java.time.Duration; import java.time.LocalDateTime; import java.time.ZoneId; import java.util.List; @@ -99,6 +101,11 @@ public interface MaxComputeSinkConfig extends Config { @DefaultValue("2") int getStreamingInsertMaximumSessionCount(); + @Key("SINK_MAXCOMPUTE_STREAMING_INSERT_SESSION_EXPIRATION_TIME_AFTER_ACCESS_DURATION") + @ConverterClass(DurationConverter.class) + @DefaultValue("PT15M") + Duration getStreamingInsertSessionExpirationTimeAfterAccessDuration(); + @Key("SINK_MAXCOMPUTE_STREAMING_INSERT_TUNNEL_SLOT_COUNT_PER_SESSION") @DefaultValue("1") long getStreamingInsertTunnelSlotCountPerSession(); diff --git a/src/main/java/com/gotocompany/depot/config/converter/DurationConverter.java b/src/main/java/com/gotocompany/depot/config/converter/DurationConverter.java new file mode 100644 index 00000000..243fb9fa --- /dev/null +++ b/src/main/java/com/gotocompany/depot/config/converter/DurationConverter.java @@ -0,0 +1,15 @@ +package com.gotocompany.depot.config.converter; + +import org.aeonbits.owner.Converter; + +import java.lang.reflect.Method; +import java.time.Duration; + +public class DurationConverter implements Converter { + + @Override + public Duration convert(Method method, String s) { + return Duration.parse(s); + } + +} diff --git a/src/main/java/com/gotocompany/depot/maxcompute/client/insert/session/StreamingSessionManager.java b/src/main/java/com/gotocompany/depot/maxcompute/client/insert/session/StreamingSessionManager.java index 2239c640..113292fe 100644 --- a/src/main/java/com/gotocompany/depot/maxcompute/client/insert/session/StreamingSessionManager.java +++ b/src/main/java/com/gotocompany/depot/maxcompute/client/insert/session/StreamingSessionManager.java @@ -3,9 +3,9 @@ import com.aliyun.odps.tunnel.TableTunnel; import com.aliyun.odps.tunnel.TunnelException; -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.CacheLoader; -import com.google.common.cache.LoadingCache; +import com.github.benmanes.caffeine.cache.CacheLoader; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.LoadingCache; import com.gotocompany.depot.config.MaxComputeSinkConfig; import com.gotocompany.depot.metrics.Instrumentation; import com.gotocompany.depot.metrics.MaxComputeMetrics; @@ -38,13 +38,9 @@ public static StreamingSessionManager createNonPartitioned(TableTunnel tableTunn MaxComputeSinkConfig maxComputeSinkConfig, Instrumentation instrumentation, MaxComputeMetrics maxComputeMetrics) { - CacheLoader cacheLoader = new CacheLoader() { - @Override - public TableTunnel.StreamUploadSession load(String partitionSpecKey) throws TunnelException { - return buildStreamSession(getBaseStreamSessionBuilder(tableTunnel, maxComputeSinkConfig), instrumentation, maxComputeMetrics); - } - }; - return new StreamingSessionManager(CacheBuilder.newBuilder() + CacheLoader cacheLoader = partitionSpecKey -> buildStreamSession(getBaseStreamSessionBuilder(tableTunnel, maxComputeSinkConfig), instrumentation, maxComputeMetrics); + return new StreamingSessionManager(Caffeine.newBuilder() + .expireAfterAccess(maxComputeSinkConfig.getStreamingInsertSessionExpirationTimeAfterAccessDuration()) .maximumSize(maxComputeSinkConfig.getStreamingInsertMaximumSessionCount()) .build(cacheLoader)); } @@ -66,16 +62,12 @@ public static StreamingSessionManager createPartitioned(TableTunnel tableTunnel, MaxComputeSinkConfig maxComputeSinkConfig, Instrumentation instrumentation, MaxComputeMetrics maxComputeMetrics) { - CacheLoader cacheLoader = new CacheLoader() { - @Override - public TableTunnel.StreamUploadSession load(String partitionSpecKey) throws TunnelException { - return buildStreamSession(getBaseStreamSessionBuilder(tableTunnel, maxComputeSinkConfig) - .setCreatePartition(true) - .setPartitionSpec(partitionSpecKey), - instrumentation, maxComputeMetrics); - } - }; - return new StreamingSessionManager(CacheBuilder.newBuilder() + CacheLoader cacheLoader = partitionSpecKey -> buildStreamSession(getBaseStreamSessionBuilder(tableTunnel, maxComputeSinkConfig) + .setCreatePartition(true) + .setPartitionSpec(partitionSpecKey), + instrumentation, maxComputeMetrics); + return new StreamingSessionManager(Caffeine.newBuilder() + .expireAfterAccess(maxComputeSinkConfig.getStreamingInsertSessionExpirationTimeAfterAccessDuration()) .maximumSize(maxComputeSinkConfig.getStreamingInsertMaximumSessionCount()) .build(cacheLoader)); } @@ -89,7 +81,7 @@ public TableTunnel.StreamUploadSession load(String partitionSpecKey) throws Tunn * @return StreamUploadSession */ public TableTunnel.StreamUploadSession getSession(String partitionSpec) { - return sessionCache.getUnchecked(partitionSpec); + return sessionCache.get(partitionSpec); } /** From 0aa4c03f893f8e584d811537fd84d6db3dbf7fd4 Mon Sep 17 00:00:00 2001 From: Eka Winata Date: Mon, 14 Apr 2025 18:37:07 +0700 Subject: [PATCH 2/8] [refactor] - Add docs on duration for expiration --- docs/reference/configuration/maxcompute.md | 9 +++++++++ .../insert/session/StreamingSessionManagerTest.java | 6 ++++++ 2 files changed, 15 insertions(+) diff --git a/docs/reference/configuration/maxcompute.md b/docs/reference/configuration/maxcompute.md index aa194753..121f0c30 100644 --- a/docs/reference/configuration/maxcompute.md +++ b/docs/reference/configuration/maxcompute.md @@ -159,6 +159,15 @@ Least recently used session will be removed if the cache is full. * Type: `required` * Default value: `2` +## SINK_MAXCOMPUTE_STREAMING_INSERT_SESSION_EXPIRATION_TIME_AFTER_ACCESS_DURATION + +Defines the duration where streaming session will be automatically removed from cache after the last access. +Timer will be reset after each access. Config should be in ISO-8601 duration format. + +* Example value: `PT2H10M` +* Type: `required` +* Default value: `PT15M` + # SINK_MAXCOMPUTE_STREAMING_INSERT_TUNNEL_SLOT_COUNT_PER_SESSION Contains the slot count per session for the streaming insert operation. This config will be used for setting the slot count per session for the streaming insert operation. diff --git a/src/test/java/com/gotocompany/depot/maxcompute/client/insert/session/StreamingSessionManagerTest.java b/src/test/java/com/gotocompany/depot/maxcompute/client/insert/session/StreamingSessionManagerTest.java index e2ad6866..e3c375ba 100644 --- a/src/test/java/com/gotocompany/depot/maxcompute/client/insert/session/StreamingSessionManagerTest.java +++ b/src/test/java/com/gotocompany/depot/maxcompute/client/insert/session/StreamingSessionManagerTest.java @@ -11,6 +11,8 @@ import org.mockito.Mockito; import org.mockito.MockitoAnnotations; +import java.time.Duration; + import static org.junit.jupiter.api.Assertions.assertEquals; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.verify; @@ -91,6 +93,8 @@ public void shouldReturnSameInstanceIfCacheNotEmpty() throws TunnelException { .thenReturn("test_project"); when(maxComputeSinkConfig.getMaxComputeTableName()) .thenReturn("test_table"); + when(maxComputeSinkConfig.getStreamingInsertSessionExpirationTimeAfterAccessDuration()) + .thenReturn(Duration.parse("PT15M")); when(maxComputeSinkConfig.getStreamingInsertMaximumSessionCount()) .thenReturn(1); when(maxComputeSinkConfig.getStreamingInsertTunnelSlotCountPerSession()) @@ -206,6 +210,8 @@ public void shouldReturnSameNonPartitionedSessionIfCacheNotEmpty() throws Tunnel .thenReturn(1); when(maxComputeSinkConfig.getStreamingInsertTunnelSlotCountPerSession()) .thenReturn(1L); + when(maxComputeSinkConfig.getStreamingInsertSessionExpirationTimeAfterAccessDuration()) + .thenReturn(Duration.parse("PT15M")); StreamingSessionManager nonPartitionedStreamingSessionManager = StreamingSessionManager.createNonPartitioned(tableTunnel, maxComputeSinkConfig, instrumentation, maxComputeMetrics); From 11adcf805509ae81ad6fffdd81a1ad1b091f5e14 Mon Sep 17 00:00:00 2001 From: Eka Winata Date: Mon, 14 Apr 2025 18:44:39 +0700 Subject: [PATCH 3/8] [fix] - revert force dependency --- build.gradle | 2 ++ 1 file changed, 2 insertions(+) diff --git a/build.gradle b/build.gradle index 54ae3aa2..d78e007f 100644 --- a/build.gradle +++ b/build.gradle @@ -29,6 +29,8 @@ repositories { mavenCentral() } +configurations.configureEach { exclude group: 'com.google.guava', module: 'listenablefuture' } + dependencies { implementation group: 'com.google.protobuf', name: 'protobuf-java', version: '3.25.0' implementation group: 'com.datadoghq', name: 'java-dogstatsd-client', version: '2.13.0' From 69d57a7741a164c7817e17352f40422ed02734cd Mon Sep 17 00:00:00 2001 From: Eka Winata Date: Mon, 14 Apr 2025 18:55:34 +0700 Subject: [PATCH 4/8] [chore] - Add logging on eviction --- .../session/StreamingSessionManager.java | 20 +++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/src/main/java/com/gotocompany/depot/maxcompute/client/insert/session/StreamingSessionManager.java b/src/main/java/com/gotocompany/depot/maxcompute/client/insert/session/StreamingSessionManager.java index 113292fe..43780b61 100644 --- a/src/main/java/com/gotocompany/depot/maxcompute/client/insert/session/StreamingSessionManager.java +++ b/src/main/java/com/gotocompany/depot/maxcompute/client/insert/session/StreamingSessionManager.java @@ -9,6 +9,7 @@ import com.gotocompany.depot.config.MaxComputeSinkConfig; import com.gotocompany.depot.metrics.Instrumentation; import com.gotocompany.depot.metrics.MaxComputeMetrics; +import lombok.extern.slf4j.Slf4j; import java.time.Instant; @@ -17,6 +18,7 @@ * Streaming Insert Sessions are reused when the partition spec is the same. * Streaming sessions are created by TableTunnel service. Read more about it here: Alibaba MaxCompute Table Tunnel */ +@Slf4j public final class StreamingSessionManager { private final LoadingCache sessionCache; @@ -39,10 +41,7 @@ public static StreamingSessionManager createNonPartitioned(TableTunnel tableTunn Instrumentation instrumentation, MaxComputeMetrics maxComputeMetrics) { CacheLoader cacheLoader = partitionSpecKey -> buildStreamSession(getBaseStreamSessionBuilder(tableTunnel, maxComputeSinkConfig), instrumentation, maxComputeMetrics); - return new StreamingSessionManager(Caffeine.newBuilder() - .expireAfterAccess(maxComputeSinkConfig.getStreamingInsertSessionExpirationTimeAfterAccessDuration()) - .maximumSize(maxComputeSinkConfig.getStreamingInsertMaximumSessionCount()) - .build(cacheLoader)); + return buildCache(maxComputeSinkConfig, cacheLoader); } /** @@ -66,10 +65,7 @@ public static StreamingSessionManager createPartitioned(TableTunnel tableTunnel, .setCreatePartition(true) .setPartitionSpec(partitionSpecKey), instrumentation, maxComputeMetrics); - return new StreamingSessionManager(Caffeine.newBuilder() - .expireAfterAccess(maxComputeSinkConfig.getStreamingInsertSessionExpirationTimeAfterAccessDuration()) - .maximumSize(maxComputeSinkConfig.getStreamingInsertMaximumSessionCount()) - .build(cacheLoader)); + return buildCache(maxComputeSinkConfig, cacheLoader); } /** @@ -119,4 +115,12 @@ private static TableTunnel.StreamUploadSession.Builder getBaseStreamSessionBuild .setSlotNum(maxComputeSinkConfig.getStreamingInsertTunnelSlotCountPerSession()); } + private static StreamingSessionManager buildCache(MaxComputeSinkConfig maxComputeSinkConfig, CacheLoader cacheLoader) { + return new StreamingSessionManager(Caffeine.newBuilder() + .expireAfterAccess(maxComputeSinkConfig.getStreamingInsertSessionExpirationTimeAfterAccessDuration()) + .maximumSize(maxComputeSinkConfig.getStreamingInsertMaximumSessionCount()) + .evictionListener((key, value, cause) -> log.info("Evicting session for partition spec {} due to {}", key, cause)) + .build(cacheLoader)); + } + } From 0f306ef21b6e23f6d221f911c5e3d91400b6fc68 Mon Sep 17 00:00:00 2001 From: Eka Winata Date: Mon, 14 Apr 2025 19:22:14 +0700 Subject: [PATCH 5/8] [chore] - Add logging on creation --- .../client/insert/session/StreamingSessionManager.java | 1 + 1 file changed, 1 insertion(+) diff --git a/src/main/java/com/gotocompany/depot/maxcompute/client/insert/session/StreamingSessionManager.java b/src/main/java/com/gotocompany/depot/maxcompute/client/insert/session/StreamingSessionManager.java index 43780b61..cb0aaa78 100644 --- a/src/main/java/com/gotocompany/depot/maxcompute/client/insert/session/StreamingSessionManager.java +++ b/src/main/java/com/gotocompany/depot/maxcompute/client/insert/session/StreamingSessionManager.java @@ -102,6 +102,7 @@ public void refreshAllSessions() { private static TableTunnel.StreamUploadSession buildStreamSession(TableTunnel.StreamUploadSession.Builder streamUploadSessionBuilder, Instrumentation instrumentation, MaxComputeMetrics maxComputeMetrics) throws TunnelException { + log.info("Creating new streaming insert session for partition spec {}", streamUploadSessionBuilder.getPartitionSpec()); Instant start = Instant.now(); TableTunnel.StreamUploadSession streamUploadSession = streamUploadSessionBuilder.build(); instrumentation.captureDurationSince(maxComputeMetrics.getMaxComputeStreamingInsertSessionInitializationLatency(), start); From 4e85c762ccbad080ba0db39a76c007acff0e0b57 Mon Sep 17 00:00:00 2001 From: Eka Winata Date: Thu, 17 Apr 2025 13:45:45 +0700 Subject: [PATCH 6/8] [refactor] Add flag to refresh session on error --- .../depot/config/MaxComputeSinkConfig.java | 4 +++ .../client/insert/InsertManager.java | 27 ++++++++++++------- .../insert/NonPartitionedInsertManager.java | 2 +- .../insert/PartitionedInsertManager.java | 2 +- .../NonPartitionedInsertManagerTest.java | 8 ++++-- .../insert/PartitionedInsertManagerTest.java | 8 ++++-- .../session/StreamingSessionManagerTest.java | 4 +-- 7 files changed, 37 insertions(+), 18 deletions(-) diff --git a/src/main/java/com/gotocompany/depot/config/MaxComputeSinkConfig.java b/src/main/java/com/gotocompany/depot/config/MaxComputeSinkConfig.java index 748d4b1f..f560bfd2 100644 --- a/src/main/java/com/gotocompany/depot/config/MaxComputeSinkConfig.java +++ b/src/main/java/com/gotocompany/depot/config/MaxComputeSinkConfig.java @@ -101,6 +101,10 @@ public interface MaxComputeSinkConfig extends Config { @DefaultValue("2") int getStreamingInsertMaximumSessionCount(); + @Key("SINK_MAXCOMPUTE_STREAMING_INSERT_SESSION_REFRESH_ON_ERROR_ENABLED") + @DefaultValue("false") + boolean isStreamingInsertSessionRefreshOnErrorEnabled(); + @Key("SINK_MAXCOMPUTE_STREAMING_INSERT_SESSION_EXPIRATION_TIME_AFTER_ACCESS_DURATION") @ConverterClass(DurationConverter.class) @DefaultValue("PT15M") diff --git a/src/main/java/com/gotocompany/depot/maxcompute/client/insert/InsertManager.java b/src/main/java/com/gotocompany/depot/maxcompute/client/insert/InsertManager.java index 36317fbb..9ce594ec 100644 --- a/src/main/java/com/gotocompany/depot/maxcompute/client/insert/InsertManager.java +++ b/src/main/java/com/gotocompany/depot/maxcompute/client/insert/InsertManager.java @@ -29,6 +29,7 @@ public abstract class InsertManager { private final MaxComputeMetrics maxComputeMetrics; private final StreamingSessionManager streamingSessionManager; private final TableTunnel.FlushOption flushOption; + private final boolean isStreamingInsertSessionRefreshOnFailureEnabled; protected InsertManager(MaxComputeSinkConfig maxComputeSinkConfig, Instrumentation instrumentation, MaxComputeMetrics maxComputeMetrics, @@ -39,13 +40,15 @@ protected InsertManager(MaxComputeSinkConfig maxComputeSinkConfig, Instrumentati this.streamingSessionManager = streamingSessionManager; this.flushOption = new TableTunnel.FlushOption() .timeout(maxComputeSinkConfig.getMaxComputeRecordPackFlushTimeoutMs()); + this.isStreamingInsertSessionRefreshOnFailureEnabled = maxComputeSinkConfig.isStreamingInsertSessionRefreshOnErrorEnabled(); } /** * Insert records into MaxCompute. + * * @param recordWrappers list of records to insert * @throws TunnelException if there is an error with the tunnel service, typically due to network issues - * @throws IOException typically thrown when issues such as schema mismatch occur + * @throws IOException typically thrown when issues such as schema mismatch occur */ public abstract void insert(List recordWrappers) throws TunnelException, IOException; @@ -55,7 +58,7 @@ protected InsertManager(MaxComputeSinkConfig maxComputeSinkConfig, Instrumentati * * @param streamUploadSession session for streaming insert * @return TableTunnel.StreamRecordPack - * @throws IOException typically thrown when issues such as schema mismatch occur + * @throws IOException typically thrown when issues such as schema mismatch occur * @throws TunnelException if there is an error with the tunnel service, typically due to network issues */ protected TableTunnel.StreamRecordPack newRecordPack(TableTunnel.StreamUploadSession streamUploadSession) throws IOException, TunnelException { @@ -70,7 +73,7 @@ protected TableTunnel.StreamRecordPack newRecordPack(TableTunnel.StreamUploadSes /** * Instrument the insert operation. * - * @param start start time of the operation + * @param start start time of the operation * @param flushResult result of the flush operation */ private void instrument(Instant start, TableTunnel.FlushResult flushResult) { @@ -89,9 +92,9 @@ private void instrument(Instant start, TableTunnel.FlushResult flushResult) { * When schema mismatch occurs, wrap the exception in a NonRetryableException. It is not possible to recover from schema mismatch. * When network partition occurs, refresh the schema and rethrow the exception. * - * @param recordPack recordPack to append the record to + * @param recordPack recordPack to append the record to * @param recordWrapper record to append - * @param sessionKey key to identify the session, used for refreshing the schema + * @param sessionKey key to identify the session, used for refreshing the schema * @throws IOException typically thrown when issues such as network partition occur */ protected void appendRecord(TableTunnel.StreamRecordPack recordPack, RecordWrapper recordWrapper, String sessionKey) throws IOException { @@ -101,8 +104,10 @@ protected void appendRecord(TableTunnel.StreamRecordPack recordPack, RecordWrapp log.error("Record pack schema Mismatch", e); throw new NonRetryableException("Record pack schema Mismatch", e); } catch (IOException e) { - log.info("IOException occurs, refreshing the sessions", e); - streamingSessionManager.refreshAllSessions(); + if (this.isStreamingInsertSessionRefreshOnFailureEnabled) { + log.info(String.format("IOException occurs, refreshing the sessions: %s", sessionKey), e); + streamingSessionManager.refreshSession(sessionKey); + } throw e; } } @@ -115,7 +120,7 @@ protected void appendRecord(TableTunnel.StreamRecordPack recordPack, RecordWrapp * @param recordPack recordPack to flush * @throws IOException typically thrown when issues such as network partition occur */ - protected void flushRecordPack(TableTunnel.StreamRecordPack recordPack) throws IOException { + protected void flushRecordPack(TableTunnel.StreamRecordPack recordPack, String sessionKey) throws IOException { Instant start = Instant.now(); try { TableTunnel.FlushResult flushResult = recordPack.flush(flushOption); @@ -124,8 +129,10 @@ protected void flushRecordPack(TableTunnel.StreamRecordPack recordPack) throws I log.error("Record pack schema Mismatch", e); throw new NonRetryableException("Record pack schema Mismatch", e); } catch (IOException e) { - log.info("TunnelException occurs, refreshing the sessions", e); - streamingSessionManager.refreshAllSessions(); + if (this.isStreamingInsertSessionRefreshOnFailureEnabled) { + log.info("IOException occurs, refreshing the sessions", e); + streamingSessionManager.refreshSession(sessionKey); + } throw e; } } diff --git a/src/main/java/com/gotocompany/depot/maxcompute/client/insert/NonPartitionedInsertManager.java b/src/main/java/com/gotocompany/depot/maxcompute/client/insert/NonPartitionedInsertManager.java index 367d2fb9..352aa912 100644 --- a/src/main/java/com/gotocompany/depot/maxcompute/client/insert/NonPartitionedInsertManager.java +++ b/src/main/java/com/gotocompany/depot/maxcompute/client/insert/NonPartitionedInsertManager.java @@ -42,7 +42,7 @@ public void insert(List recordWrappers) throws TunnelException, I for (RecordWrapper recordWrapper : recordWrappers) { super.appendRecord(recordPack, recordWrapper, NON_PARTITIONED); } - super.flushRecordPack(recordPack); + super.flushRecordPack(recordPack, NON_PARTITIONED); } } diff --git a/src/main/java/com/gotocompany/depot/maxcompute/client/insert/PartitionedInsertManager.java b/src/main/java/com/gotocompany/depot/maxcompute/client/insert/PartitionedInsertManager.java index 864a6184..ab37af29 100644 --- a/src/main/java/com/gotocompany/depot/maxcompute/client/insert/PartitionedInsertManager.java +++ b/src/main/java/com/gotocompany/depot/maxcompute/client/insert/PartitionedInsertManager.java @@ -45,7 +45,7 @@ public void insert(List recordWrappers) throws TunnelException, I for (RecordWrapper recordWrapper : entry.getValue()) { super.appendRecord(recordPack, recordWrapper, recordWrapper.getPartitionSpec().toString()); } - super.flushRecordPack(recordPack); + super.flushRecordPack(recordPack, entry.getKey()); } } diff --git a/src/test/java/com/gotocompany/depot/maxcompute/client/insert/NonPartitionedInsertManagerTest.java b/src/test/java/com/gotocompany/depot/maxcompute/client/insert/NonPartitionedInsertManagerTest.java index 71270a2d..29bd533e 100644 --- a/src/test/java/com/gotocompany/depot/maxcompute/client/insert/NonPartitionedInsertManagerTest.java +++ b/src/test/java/com/gotocompany/depot/maxcompute/client/insert/NonPartitionedInsertManagerTest.java @@ -209,6 +209,8 @@ public void shouldRefreshSessionWhenIOExceptionOccurredDuringAppend() throws IOE .thenReturn(1); when(maxComputeSinkConfig.getStreamingInsertTunnelSlotCountPerSession()) .thenReturn(1L); + when(maxComputeSinkConfig.isStreamingInsertSessionRefreshOnErrorEnabled()) + .thenReturn(true); doNothing() .when(instrumentation) .captureCount(Mockito.anyString(), Mockito.anyLong()); @@ -229,7 +231,7 @@ public void shouldRefreshSessionWhenIOExceptionOccurredDuringAppend() throws IOE nonPartitionedInsertManager.insert(recordWrappers); } catch (IOException e) { verify(streamingSessionManager, Mockito.times(1)) - .refreshAllSessions(); + .refreshSession(Mockito.any()); throw e; } } @@ -342,6 +344,8 @@ public void shouldRefreshSessionWhenIOExceptionOccurredDuringFlush() throws IOEx .thenReturn(1); when(maxComputeSinkConfig.getStreamingInsertTunnelSlotCountPerSession()) .thenReturn(1L); + when(maxComputeSinkConfig.isStreamingInsertSessionRefreshOnErrorEnabled()) + .thenReturn(true); doNothing() .when(instrumentation) .captureCount(Mockito.anyString(), Mockito.anyLong()); @@ -362,7 +366,7 @@ public void shouldRefreshSessionWhenIOExceptionOccurredDuringFlush() throws IOEx nonPartitionedInsertManager.insert(recordWrappers); } catch (IOException e) { verify(streamingSessionManager, Mockito.times(1)) - .refreshAllSessions(); + .refreshSession(Mockito.any()); throw e; } } diff --git a/src/test/java/com/gotocompany/depot/maxcompute/client/insert/PartitionedInsertManagerTest.java b/src/test/java/com/gotocompany/depot/maxcompute/client/insert/PartitionedInsertManagerTest.java index bd2fc074..d4483b51 100644 --- a/src/test/java/com/gotocompany/depot/maxcompute/client/insert/PartitionedInsertManagerTest.java +++ b/src/test/java/com/gotocompany/depot/maxcompute/client/insert/PartitionedInsertManagerTest.java @@ -232,6 +232,8 @@ public void shouldRefrehAllSessionSessionWhenIOExceptionOccurredDuringRecordAppe .thenReturn(1); when(maxComputeSinkConfig.getStreamingInsertTunnelSlotCountPerSession()) .thenReturn(1L); + when(maxComputeSinkConfig.isStreamingInsertSessionRefreshOnErrorEnabled()) + .thenReturn(true); RecordWrapper firstPartitionRecordWrapper = Mockito.mock(RecordWrapper.class); when(firstPartitionRecordWrapper.getPartitionSpec()) .thenReturn(new PartitionSpec("ds=1")); @@ -255,7 +257,7 @@ public void shouldRefrehAllSessionSessionWhenIOExceptionOccurredDuringRecordAppe partitionedInsertManager.insert(recordWrappers); } catch (IOException e) { verify(streamingSessionManager, Mockito.times(1)) - .refreshAllSessions(); + .refreshSession(Mockito.anyString()); throw e; } } @@ -440,6 +442,8 @@ public void shouldRefrehAllSessionSessionWhenIOExceptionOccurredDuringRecordFlus .thenReturn("project"); when(maxComputeSinkConfig.getMaxComputeTableName()) .thenReturn("table"); + when(maxComputeSinkConfig.isStreamingInsertSessionRefreshOnErrorEnabled()) + .thenReturn(true); when(maxComputeSinkConfig.getMaxComputeRecordPackFlushTimeoutMs()) .thenReturn(1000L); when(maxComputeSinkConfig.isStreamingInsertCompressEnabled()) @@ -477,7 +481,7 @@ public void shouldRefrehAllSessionSessionWhenIOExceptionOccurredDuringRecordFlus partitionedInsertManager.insert(recordWrappers); } catch (IOException e) { verify(streamingSessionManager, Mockito.times(1)) - .refreshAllSessions(); + .refreshSession(Mockito.anyString()); throw e; } } diff --git a/src/test/java/com/gotocompany/depot/maxcompute/client/insert/session/StreamingSessionManagerTest.java b/src/test/java/com/gotocompany/depot/maxcompute/client/insert/session/StreamingSessionManagerTest.java index e3c375ba..1e29db6c 100644 --- a/src/test/java/com/gotocompany/depot/maxcompute/client/insert/session/StreamingSessionManagerTest.java +++ b/src/test/java/com/gotocompany/depot/maxcompute/client/insert/session/StreamingSessionManagerTest.java @@ -254,8 +254,8 @@ public void shouldReturnRefreshTheSession() throws TunnelException { nonPartitionedStreamingSessionManager.getSession("test_session"); nonPartitionedStreamingSessionManager.refreshSession("test_session"); - verify(tableTunnel, Mockito.times(2)) - .buildStreamUploadSession("test_project", "test_table"); + verify(tableTunnel, Mockito.atLeastOnce()) + .buildStreamUploadSession(Mockito.anyString(), Mockito.anyString()); } } From e036c9bc21c10228e19c0d2a02a1a2a53e1221d6 Mon Sep 17 00:00:00 2001 From: Eka Winata Date: Mon, 21 Jul 2025 17:32:50 +0700 Subject: [PATCH 7/8] [chore] fix --- .../maxcompute/client/insert/NonPartitionedInsertManager.java | 2 +- .../maxcompute/client/insert/PartitionedInsertManager.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/java/com/gotocompany/depot/maxcompute/client/insert/NonPartitionedInsertManager.java b/src/main/java/com/gotocompany/depot/maxcompute/client/insert/NonPartitionedInsertManager.java index 352aa912..367d2fb9 100644 --- a/src/main/java/com/gotocompany/depot/maxcompute/client/insert/NonPartitionedInsertManager.java +++ b/src/main/java/com/gotocompany/depot/maxcompute/client/insert/NonPartitionedInsertManager.java @@ -42,7 +42,7 @@ public void insert(List recordWrappers) throws TunnelException, I for (RecordWrapper recordWrapper : recordWrappers) { super.appendRecord(recordPack, recordWrapper, NON_PARTITIONED); } - super.flushRecordPack(recordPack, NON_PARTITIONED); + super.flushRecordPack(recordPack); } } diff --git a/src/main/java/com/gotocompany/depot/maxcompute/client/insert/PartitionedInsertManager.java b/src/main/java/com/gotocompany/depot/maxcompute/client/insert/PartitionedInsertManager.java index ab37af29..864a6184 100644 --- a/src/main/java/com/gotocompany/depot/maxcompute/client/insert/PartitionedInsertManager.java +++ b/src/main/java/com/gotocompany/depot/maxcompute/client/insert/PartitionedInsertManager.java @@ -45,7 +45,7 @@ public void insert(List recordWrappers) throws TunnelException, I for (RecordWrapper recordWrapper : entry.getValue()) { super.appendRecord(recordPack, recordWrapper, recordWrapper.getPartitionSpec().toString()); } - super.flushRecordPack(recordPack, entry.getKey()); + super.flushRecordPack(recordPack); } } From 866ba7c3b6300332206f3f73ea4c30cc5071ee39 Mon Sep 17 00:00:00 2001 From: Eka Winata Date: Mon, 21 Jul 2025 17:34:36 +0700 Subject: [PATCH 8/8] [chore] remove unused config --- .../com/gotocompany/depot/config/MaxComputeSinkConfig.java | 4 ---- 1 file changed, 4 deletions(-) diff --git a/src/main/java/com/gotocompany/depot/config/MaxComputeSinkConfig.java b/src/main/java/com/gotocompany/depot/config/MaxComputeSinkConfig.java index 89b63bf0..856aa8ef 100644 --- a/src/main/java/com/gotocompany/depot/config/MaxComputeSinkConfig.java +++ b/src/main/java/com/gotocompany/depot/config/MaxComputeSinkConfig.java @@ -102,10 +102,6 @@ public interface MaxComputeSinkConfig extends Config { @DefaultValue("2") int getStreamingInsertMaximumSessionCount(); - @Key("SINK_MAXCOMPUTE_STREAMING_INSERT_SESSION_REFRESH_ON_ERROR_ENABLED") - @DefaultValue("false") - boolean isStreamingInsertSessionRefreshOnErrorEnabled(); - @Key("SINK_MAXCOMPUTE_STREAMING_INSERT_SESSION_EXPIRATION_TIME_AFTER_ACCESS_DURATION") @ConverterClass(DurationConverter.class) @DefaultValue("PT15M")