Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ plugins {
}

group 'com.gotocompany'
version '0.10.20'
version '0.10.21'

repositories {
mavenLocal()
Expand Down Expand Up @@ -60,7 +60,7 @@ dependencies {
implementation group: 'com.jayway.jsonpath', name: 'json-path', version: '2.8.0'
implementation group: 'com.squareup.okhttp3', name: 'okhttp', version: '4.9.1'
implementation group: 'joda-time', name: 'joda-time', version: '2.10.2'
implementation('com.google.guava:guava:32.0.1-jre') { force = true }
implementation('com.github.ben-manes.caffeine:caffeine:2.9.3')
testImplementation group: 'junit', name: 'junit', version: '4.13.1'
testImplementation group: 'org.assertj', name: 'assertj-core', version: '3.26.3'
testImplementation 'org.hamcrest:hamcrest-all:1.3'
Expand Down
9 changes: 9 additions & 0 deletions docs/reference/configuration/maxcompute.md
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,15 @@ Least recently used session will be removed if the cache is full.
* Type: `required`
* Default value: `2`

## SINK_MAXCOMPUTE_STREAMING_INSERT_SESSION_EXPIRATION_TIME_AFTER_ACCESS_DURATION

Defines the duration where streaming session will be automatically removed from cache after the last access.
Timer will be reset after each access. Config should be in ISO-8601 duration format.

* Example value: `PT2H10M`
* Type: `required`
* Default value: `PT15M`

# SINK_MAXCOMPUTE_STREAMING_INSERT_TUNNEL_SLOT_COUNT_PER_SESSION

Contains the slot count per session for the streaming insert operation. This config will be used for setting the slot count per session for the streaming insert operation.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.aliyun.odps.tunnel.io.CompressOption;
import com.gotocompany.depot.common.TupleString;
import com.gotocompany.depot.config.converter.ConfToListConverter;
import com.gotocompany.depot.config.converter.DurationConverter;
import com.gotocompany.depot.config.converter.KeyValuePairsToMapConverter;
import com.gotocompany.depot.config.converter.LocalDateTimeConverter;
import com.gotocompany.depot.config.converter.MaxComputeOdpsGlobalSettingsConverter;
Expand All @@ -11,6 +12,7 @@
import org.aeonbits.owner.Config;

import java.math.RoundingMode;
import java.time.Duration;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.temporal.ChronoUnit;
Expand Down Expand Up @@ -100,6 +102,11 @@ public interface MaxComputeSinkConfig extends Config {
@DefaultValue("2")
int getStreamingInsertMaximumSessionCount();

@Key("SINK_MAXCOMPUTE_STREAMING_INSERT_SESSION_EXPIRATION_TIME_AFTER_ACCESS_DURATION")
@ConverterClass(DurationConverter.class)
@DefaultValue("PT15M")
Duration getStreamingInsertSessionExpirationTimeAfterAccessDuration();

@Key("SINK_MAXCOMPUTE_STREAMING_INSERT_TUNNEL_SLOT_COUNT_PER_SESSION")
@DefaultValue("1")
long getStreamingInsertTunnelSlotCountPerSession();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package com.gotocompany.depot.config.converter;

import org.aeonbits.owner.Converter;

import java.lang.reflect.Method;
import java.time.Duration;

public class DurationConverter implements Converter<Duration> {

@Override
public Duration convert(Method method, String s) {
return Duration.parse(s);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@
import com.aliyun.odps.tunnel.TableTunnel;

import com.aliyun.odps.tunnel.TunnelException;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.github.benmanes.caffeine.cache.CacheLoader;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
import com.gotocompany.depot.config.MaxComputeSinkConfig;
import com.gotocompany.depot.metrics.Instrumentation;
import com.gotocompany.depot.metrics.MaxComputeMetrics;
import lombok.extern.slf4j.Slf4j;

import java.time.Instant;

Expand All @@ -17,6 +18,7 @@
* Streaming Insert Sessions are reused when the partition spec is the same.
* Streaming sessions are created by TableTunnel service. Read more about it here: <a href="https://www.alibabacloud.com/help/en/maxcompute/user-guide/tabletunnel">Alibaba MaxCompute Table Tunnel</a>
*/
@Slf4j
public final class StreamingSessionManager {

private final LoadingCache<String, TableTunnel.StreamUploadSession> sessionCache;
Expand All @@ -38,15 +40,8 @@ public static StreamingSessionManager createNonPartitioned(TableTunnel tableTunn
MaxComputeSinkConfig maxComputeSinkConfig,
Instrumentation instrumentation,
MaxComputeMetrics maxComputeMetrics) {
CacheLoader<String, TableTunnel.StreamUploadSession> cacheLoader = new CacheLoader<String, TableTunnel.StreamUploadSession>() {
@Override
public TableTunnel.StreamUploadSession load(String partitionSpecKey) throws TunnelException {
return buildStreamSession(getBaseStreamSessionBuilder(tableTunnel, maxComputeSinkConfig), instrumentation, maxComputeMetrics);
}
};
return new StreamingSessionManager(CacheBuilder.newBuilder()
.maximumSize(maxComputeSinkConfig.getStreamingInsertMaximumSessionCount())
.build(cacheLoader));
CacheLoader<String, TableTunnel.StreamUploadSession> cacheLoader = partitionSpecKey -> buildStreamSession(getBaseStreamSessionBuilder(tableTunnel, maxComputeSinkConfig), instrumentation, maxComputeMetrics);
return buildCache(maxComputeSinkConfig, cacheLoader);
}

/**
Expand All @@ -66,18 +61,11 @@ public static StreamingSessionManager createPartitioned(TableTunnel tableTunnel,
MaxComputeSinkConfig maxComputeSinkConfig,
Instrumentation instrumentation,
MaxComputeMetrics maxComputeMetrics) {
CacheLoader<String, TableTunnel.StreamUploadSession> cacheLoader = new CacheLoader<String, TableTunnel.StreamUploadSession>() {
@Override
public TableTunnel.StreamUploadSession load(String partitionSpecKey) throws TunnelException {
return buildStreamSession(getBaseStreamSessionBuilder(tableTunnel, maxComputeSinkConfig)
.setCreatePartition(true)
.setPartitionSpec(partitionSpecKey),
instrumentation, maxComputeMetrics);
}
};
return new StreamingSessionManager(CacheBuilder.newBuilder()
.maximumSize(maxComputeSinkConfig.getStreamingInsertMaximumSessionCount())
.build(cacheLoader));
CacheLoader<String, TableTunnel.StreamUploadSession> cacheLoader = partitionSpecKey -> buildStreamSession(getBaseStreamSessionBuilder(tableTunnel, maxComputeSinkConfig)
.setCreatePartition(true)
.setPartitionSpec(partitionSpecKey),
instrumentation, maxComputeMetrics);
return buildCache(maxComputeSinkConfig, cacheLoader);
}

/**
Expand All @@ -89,7 +77,7 @@ public TableTunnel.StreamUploadSession load(String partitionSpecKey) throws Tunn
* @return StreamUploadSession
*/
public TableTunnel.StreamUploadSession getSession(String partitionSpec) {
return sessionCache.getUnchecked(partitionSpec);
return sessionCache.get(partitionSpec);
}

/**
Expand All @@ -116,6 +104,7 @@ public void refreshAllSessions() {
private static TableTunnel.StreamUploadSession buildStreamSession(TableTunnel.StreamUploadSession.Builder streamUploadSessionBuilder,
Instrumentation instrumentation,
MaxComputeMetrics maxComputeMetrics) throws TunnelException {
log.info("Creating new streaming insert session for partition spec {}", streamUploadSessionBuilder.getPartitionSpec());
Instant start = Instant.now();
TableTunnel.StreamUploadSession streamUploadSession = streamUploadSessionBuilder.build();
instrumentation.captureDurationSince(maxComputeMetrics.getMaxComputeStreamingInsertSessionInitializationLatency(), start);
Expand All @@ -129,4 +118,12 @@ private static TableTunnel.StreamUploadSession.Builder getBaseStreamSessionBuild
.setSlotNum(maxComputeSinkConfig.getStreamingInsertTunnelSlotCountPerSession());
}

private static StreamingSessionManager buildCache(MaxComputeSinkConfig maxComputeSinkConfig, CacheLoader<String, TableTunnel.StreamUploadSession> cacheLoader) {
return new StreamingSessionManager(Caffeine.newBuilder()
.expireAfterAccess(maxComputeSinkConfig.getStreamingInsertSessionExpirationTimeAfterAccessDuration())
.maximumSize(maxComputeSinkConfig.getStreamingInsertMaximumSessionCount())
.evictionListener((key, value, cause) -> log.info("Evicting session for partition spec {} due to {}", key, cause))
.build(cacheLoader));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;

import java.time.Duration;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.verify;
Expand Down Expand Up @@ -91,6 +93,8 @@ public void shouldReturnSameInstanceIfCacheNotEmpty() throws TunnelException {
.thenReturn("test_project");
when(maxComputeSinkConfig.getMaxComputeTableName())
.thenReturn("test_table");
when(maxComputeSinkConfig.getStreamingInsertSessionExpirationTimeAfterAccessDuration())
.thenReturn(Duration.parse("PT15M"));
when(maxComputeSinkConfig.getStreamingInsertMaximumSessionCount())
.thenReturn(1);
when(maxComputeSinkConfig.getStreamingInsertTunnelSlotCountPerSession())
Expand Down Expand Up @@ -206,6 +210,8 @@ public void shouldReturnSameNonPartitionedSessionIfCacheNotEmpty() throws Tunnel
.thenReturn(1);
when(maxComputeSinkConfig.getStreamingInsertTunnelSlotCountPerSession())
.thenReturn(1L);
when(maxComputeSinkConfig.getStreamingInsertSessionExpirationTimeAfterAccessDuration())
.thenReturn(Duration.parse("PT15M"));
StreamingSessionManager nonPartitionedStreamingSessionManager =
StreamingSessionManager.createNonPartitioned(tableTunnel, maxComputeSinkConfig, instrumentation, maxComputeMetrics);

Expand Down Expand Up @@ -248,8 +254,8 @@ public void shouldReturnRefreshTheSession() throws TunnelException {
nonPartitionedStreamingSessionManager.getSession("test_session");
nonPartitionedStreamingSessionManager.refreshSession("test_session");

verify(tableTunnel, Mockito.times(2))
.buildStreamUploadSession("test_project", "test_table");
verify(tableTunnel, Mockito.atLeastOnce())
.buildStreamUploadSession(Mockito.anyString(), Mockito.anyString());
}

}