From 9099fdf68511e4aa4bf7b69c68f0d6443043e9c7 Mon Sep 17 00:00:00 2001 From: Jhora Zakaryan Date: Wed, 11 Mar 2020 10:15:01 -0700 Subject: [PATCH 1/5] Changed destination topic deletion logic so now it doesn't get deletede immediately but is scheduled for deletion later --- .../datastream/server/Coordinator.java | 47 ++++++++++++------- 1 file changed, 30 insertions(+), 17 deletions(-) diff --git a/datastream-server/src/main/java/com/linkedin/datastream/server/Coordinator.java b/datastream-server/src/main/java/com/linkedin/datastream/server/Coordinator.java index d286bde0f..9cbf8f8af 100644 --- a/datastream-server/src/main/java/com/linkedin/datastream/server/Coordinator.java +++ b/datastream-server/src/main/java/com/linkedin/datastream/server/Coordinator.java @@ -76,8 +76,7 @@ import static com.linkedin.datastream.common.DatastreamMetadataConstants.CREATION_MS; import static com.linkedin.datastream.common.DatastreamMetadataConstants.SYSTEM_DESTINATION_PREFIX; import static com.linkedin.datastream.common.DatastreamMetadataConstants.TTL_MS; -import static com.linkedin.datastream.common.DatastreamUtils.hasValidDestination; -import static com.linkedin.datastream.common.DatastreamUtils.isReuseAllowed; +import static com.linkedin.datastream.common.DatastreamUtils.*; /** @@ -168,6 +167,7 @@ public class Coordinator implements ZkAdapter.ZkAdapterListener, MetricsAware { private static final String NUM_ORPHAN_CONNECTOR_TASKS = "numOrphanConnectorTasks"; private static final String MAX_PARTITION_COUNT_IN_TASK = "maxPartitionCountInTask"; private static final String IS_LEADER = "isLeader"; + private static final Duration TOPIC_DELETION_DELAY = Duration.ofHours(1); // Connector common metrics private static final String NUM_DATASTREAMS = "numDatastreams"; @@ -863,6 +863,33 @@ private void handleDatastreamAddOrDelete() { _eventQueue.put(CoordinatorEvent.createLeaderDoAssignmentEvent()); } + private void scheduleTopicForDeletion(Datastream ds) { + try { + if (DatastreamUtils.isUserManagedDestination(ds)) { + _log.info("BYOT(bring your own topic topic will not be deleted"); + } else if (DatastreamUtils.isConnectorManagedDestination(ds)) { + _log.info("Datastream contains connector-managed destinations, topic will not be deleted"); + } else { + _executor.schedule(() -> { + deleteTopic(ds); + }, TOPIC_DELETION_DELAY.toHours(), TimeUnit.HOURS); + _log.info("Destination topic for {} scheduled for deletion.", ds); + } + } catch (Exception e) { + _log.error("Failed to retrieve metadata for {}", ds); + } + } + + private void deleteTopic(Datastream ds) { + _log.info("Deleting destination topic for {}", ds); + + try { + _transportProviderAdmins.get(ds.getTransportProviderName()).dropDestination(ds); + } catch (Exception e) { + _log.error("Runtime Exception while delete topic", e); + } + } + private void hardDeleteDatastream(Datastream ds, List allStreams) { String taskPrefix; if (DatastreamUtils.containsTaskPrefix(ds)) { @@ -881,7 +908,7 @@ private void hardDeleteDatastream(Datastream ds, List allStreams) { "No datastream left in the datastream group with taskPrefix {}. Deleting all tasks corresponding to the datastream.", taskPrefix); _adapter.deleteTasksWithPrefix(ds.getConnectorName(), taskPrefix); - deleteTopic(ds); + scheduleTopicForDeletion(ds); } else { _log.info("Found duplicate datastream {} for the datastream to be deleted {}. Not deleting the tasks.", duplicateStream.get().getName(), ds.getName()); @@ -914,20 +941,6 @@ private String createTopic(Datastream datastream) throws TransportException { return datastream.getDestination().getConnectionString(); } - private void deleteTopic(Datastream datastream) { - try { - if (DatastreamUtils.isUserManagedDestination(datastream)) { - _log.info("BYOT(bring your own topic), topic will not be deleted"); - } else if (DatastreamUtils.isConnectorManagedDestination(datastream)) { - _log.info("Datastream contains connector-managed destinations, topic will not be deleted"); - } else { - _transportProviderAdmins.get(datastream.getTransportProviderName()).dropDestination(datastream); - } - } catch (Exception e) { - _log.error("Runtime Exception while delete topic", e); - } - } - private List fetchDatastreamGroups() { // Get all streams that are assignable. Assignable datastreams are the ones: // 1) has a valid destination From 9ebfbe8a92fc28ed0a0c13ea7c7ac6d202738331 Mon Sep 17 00:00:00 2001 From: Jhora Zakaryan Date: Wed, 11 Mar 2020 10:40:46 -0700 Subject: [PATCH 2/5] Minor fixes to make the build pass --- .../main/java/com/linkedin/datastream/server/Coordinator.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datastream-server/src/main/java/com/linkedin/datastream/server/Coordinator.java b/datastream-server/src/main/java/com/linkedin/datastream/server/Coordinator.java index 9cbf8f8af..7b1e383d7 100644 --- a/datastream-server/src/main/java/com/linkedin/datastream/server/Coordinator.java +++ b/datastream-server/src/main/java/com/linkedin/datastream/server/Coordinator.java @@ -76,8 +76,8 @@ import static com.linkedin.datastream.common.DatastreamMetadataConstants.CREATION_MS; import static com.linkedin.datastream.common.DatastreamMetadataConstants.SYSTEM_DESTINATION_PREFIX; import static com.linkedin.datastream.common.DatastreamMetadataConstants.TTL_MS; -import static com.linkedin.datastream.common.DatastreamUtils.*; - +import static com.linkedin.datastream.common.DatastreamUtils.hasValidDestination; +import static com.linkedin.datastream.common.DatastreamUtils.isReuseAllowed; /** * From 569bb756eb1e949aec4256ab33105e6e648d2b2d Mon Sep 17 00:00:00 2001 From: Jhora Zakaryan Date: Wed, 11 Mar 2020 12:50:41 -0700 Subject: [PATCH 3/5] Fixed tests for the coordinator --- .../datastream/common/ReflectionUtils.java | 29 +++++++++++++++++++ .../datastream/server/TestCoordinator.java | 6 ++-- 2 files changed, 33 insertions(+), 2 deletions(-) diff --git a/datastream-common/src/main/java/com/linkedin/datastream/common/ReflectionUtils.java b/datastream-common/src/main/java/com/linkedin/datastream/common/ReflectionUtils.java index fd53d1b7c..083747dc6 100644 --- a/datastream-common/src/main/java/com/linkedin/datastream/common/ReflectionUtils.java +++ b/datastream-common/src/main/java/com/linkedin/datastream/common/ReflectionUtils.java @@ -8,6 +8,7 @@ import java.lang.reflect.Constructor; import java.lang.reflect.Field; import java.lang.reflect.Method; +import java.lang.reflect.Modifier; import java.util.stream.IntStream; import org.apache.commons.lang.Validate; @@ -98,6 +99,34 @@ public static T setField(Object object, String field, T value) throws Except } } + /** + * Write a private final field with reflection. + * @param object instance whose field is to be accessed + * @param field name of private field + * @param value value to be set to the field + * @param type of the field + * @return thr new value just set or null if failed + */ + public static T setFinalField(Object object, String field, T value) { + Validate.notNull(object, "null target object"); + Validate.notNull(field, "null field name"); + + try { + Field fieldObj = object.getClass().getDeclaredField(field); + fieldObj.setAccessible(true); + + Field modifiersField = Field.class.getDeclaredField("modifiers"); + modifiersField.setAccessible(true); + modifiersField.setInt(fieldObj, fieldObj.getModifiers() & ~Modifier.FINAL); + + fieldObj.set(object, value); + return value; + } catch (Exception e) { + LOG.warn(String.format("Failed to set field, object = %s field = %s value = %s", object, field, value), e); + return null; + } + } + /** * Read a private field with reflection. * @param object instance whose field is to be accessed diff --git a/datastream-server-restli/src/test/java/com/linkedin/datastream/server/TestCoordinator.java b/datastream-server-restli/src/test/java/com/linkedin/datastream/server/TestCoordinator.java index bd507a0e1..cca0a3df5 100644 --- a/datastream-server-restli/src/test/java/com/linkedin/datastream/server/TestCoordinator.java +++ b/datastream-server-restli/src/test/java/com/linkedin/datastream/server/TestCoordinator.java @@ -862,8 +862,8 @@ public void testDatastreamWithBYOT() throws Exception { resource.delete(datastreamName); String path = KeyBuilder.datastream(testCluster, datastreamName); Assert.assertTrue(PollUtils.poll(() -> !zkClient.exists(path), 200, WAIT_TIMEOUT_MS)); - Assert.assertEquals(transportProviderAdminFactory._dropDestinationCount, 0, - "Delete destination count should have been 0, since Datastream uses BYOT"); + Assert.assertTrue(PollUtils.poll(() -> transportProviderAdminFactory._dropDestinationCount == 0, + 2000, WAIT_TIMEOUT_MS)); } /** @@ -935,6 +935,8 @@ public void testDatastreamWithoutConnectorManagedDestination() throws Exception Assert.assertEquals(transportProviderAdminFactory._createDestinationCount, 1, "Create destination count should have been 1, since Datastream does not have connector-managed destination"); + ReflectionUtils.setFinalField(coordinator, "TOPIC_DELETION_DELAY", Duration.ofSeconds(1)); + resource.delete(datastreamName); String path = KeyBuilder.datastream(testCluster, datastreamName); Assert.assertTrue(PollUtils.poll(() -> !zkClient.exists(path), 200, WAIT_TIMEOUT_MS)); From 8b71d2786a65225c9b964e64dd4c8c897ead51e8 Mon Sep 17 00:00:00 2001 From: Jhora Zakaryan Date: Wed, 11 Mar 2020 13:38:12 -0700 Subject: [PATCH 4/5] Reverted an unneeded change in the test file --- .../java/com/linkedin/datastream/server/TestCoordinator.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datastream-server-restli/src/test/java/com/linkedin/datastream/server/TestCoordinator.java b/datastream-server-restli/src/test/java/com/linkedin/datastream/server/TestCoordinator.java index cca0a3df5..a46dac2c3 100644 --- a/datastream-server-restli/src/test/java/com/linkedin/datastream/server/TestCoordinator.java +++ b/datastream-server-restli/src/test/java/com/linkedin/datastream/server/TestCoordinator.java @@ -862,8 +862,8 @@ public void testDatastreamWithBYOT() throws Exception { resource.delete(datastreamName); String path = KeyBuilder.datastream(testCluster, datastreamName); Assert.assertTrue(PollUtils.poll(() -> !zkClient.exists(path), 200, WAIT_TIMEOUT_MS)); - Assert.assertTrue(PollUtils.poll(() -> transportProviderAdminFactory._dropDestinationCount == 0, - 2000, WAIT_TIMEOUT_MS)); + Assert.assertEquals(transportProviderAdminFactory._dropDestinationCount, 0, + "Delete destination count should have been 0, since Datastream uses BYOT"); } /** From 67d59b5680e88ab56dfa2d548479efd39f5c9a77 Mon Sep 17 00:00:00 2001 From: Jhora Zakaryan Date: Wed, 11 Mar 2020 10:15:01 -0700 Subject: [PATCH 5/5] Changed destination topic deletion logic so now it doesn't get deletede immediately but is scheduled for deletion later Minor fixes to make the build pass Fixed tests for the coordinator Reverted an unneeded change in the test file --- .../datastream/common/ReflectionUtils.java | 29 ++++++++++++ .../datastream/server/TestCoordinator.java | 2 + .../datastream/server/Coordinator.java | 45 ++++++++++++------- 3 files changed, 60 insertions(+), 16 deletions(-) diff --git a/datastream-common/src/main/java/com/linkedin/datastream/common/ReflectionUtils.java b/datastream-common/src/main/java/com/linkedin/datastream/common/ReflectionUtils.java index fd53d1b7c..083747dc6 100644 --- a/datastream-common/src/main/java/com/linkedin/datastream/common/ReflectionUtils.java +++ b/datastream-common/src/main/java/com/linkedin/datastream/common/ReflectionUtils.java @@ -8,6 +8,7 @@ import java.lang.reflect.Constructor; import java.lang.reflect.Field; import java.lang.reflect.Method; +import java.lang.reflect.Modifier; import java.util.stream.IntStream; import org.apache.commons.lang.Validate; @@ -98,6 +99,34 @@ public static T setField(Object object, String field, T value) throws Except } } + /** + * Write a private final field with reflection. + * @param object instance whose field is to be accessed + * @param field name of private field + * @param value value to be set to the field + * @param type of the field + * @return thr new value just set or null if failed + */ + public static T setFinalField(Object object, String field, T value) { + Validate.notNull(object, "null target object"); + Validate.notNull(field, "null field name"); + + try { + Field fieldObj = object.getClass().getDeclaredField(field); + fieldObj.setAccessible(true); + + Field modifiersField = Field.class.getDeclaredField("modifiers"); + modifiersField.setAccessible(true); + modifiersField.setInt(fieldObj, fieldObj.getModifiers() & ~Modifier.FINAL); + + fieldObj.set(object, value); + return value; + } catch (Exception e) { + LOG.warn(String.format("Failed to set field, object = %s field = %s value = %s", object, field, value), e); + return null; + } + } + /** * Read a private field with reflection. * @param object instance whose field is to be accessed diff --git a/datastream-server-restli/src/test/java/com/linkedin/datastream/server/TestCoordinator.java b/datastream-server-restli/src/test/java/com/linkedin/datastream/server/TestCoordinator.java index bd507a0e1..a46dac2c3 100644 --- a/datastream-server-restli/src/test/java/com/linkedin/datastream/server/TestCoordinator.java +++ b/datastream-server-restli/src/test/java/com/linkedin/datastream/server/TestCoordinator.java @@ -935,6 +935,8 @@ public void testDatastreamWithoutConnectorManagedDestination() throws Exception Assert.assertEquals(transportProviderAdminFactory._createDestinationCount, 1, "Create destination count should have been 1, since Datastream does not have connector-managed destination"); + ReflectionUtils.setFinalField(coordinator, "TOPIC_DELETION_DELAY", Duration.ofSeconds(1)); + resource.delete(datastreamName); String path = KeyBuilder.datastream(testCluster, datastreamName); Assert.assertTrue(PollUtils.poll(() -> !zkClient.exists(path), 200, WAIT_TIMEOUT_MS)); diff --git a/datastream-server/src/main/java/com/linkedin/datastream/server/Coordinator.java b/datastream-server/src/main/java/com/linkedin/datastream/server/Coordinator.java index d286bde0f..7b1e383d7 100644 --- a/datastream-server/src/main/java/com/linkedin/datastream/server/Coordinator.java +++ b/datastream-server/src/main/java/com/linkedin/datastream/server/Coordinator.java @@ -79,7 +79,6 @@ import static com.linkedin.datastream.common.DatastreamUtils.hasValidDestination; import static com.linkedin.datastream.common.DatastreamUtils.isReuseAllowed; - /** * * Coordinator is the object that bridges ZooKeeper with Connector implementations. There is one instance @@ -168,6 +167,7 @@ public class Coordinator implements ZkAdapter.ZkAdapterListener, MetricsAware { private static final String NUM_ORPHAN_CONNECTOR_TASKS = "numOrphanConnectorTasks"; private static final String MAX_PARTITION_COUNT_IN_TASK = "maxPartitionCountInTask"; private static final String IS_LEADER = "isLeader"; + private static final Duration TOPIC_DELETION_DELAY = Duration.ofHours(1); // Connector common metrics private static final String NUM_DATASTREAMS = "numDatastreams"; @@ -863,6 +863,33 @@ private void handleDatastreamAddOrDelete() { _eventQueue.put(CoordinatorEvent.createLeaderDoAssignmentEvent()); } + private void scheduleTopicForDeletion(Datastream ds) { + try { + if (DatastreamUtils.isUserManagedDestination(ds)) { + _log.info("BYOT(bring your own topic topic will not be deleted"); + } else if (DatastreamUtils.isConnectorManagedDestination(ds)) { + _log.info("Datastream contains connector-managed destinations, topic will not be deleted"); + } else { + _executor.schedule(() -> { + deleteTopic(ds); + }, TOPIC_DELETION_DELAY.toHours(), TimeUnit.HOURS); + _log.info("Destination topic for {} scheduled for deletion.", ds); + } + } catch (Exception e) { + _log.error("Failed to retrieve metadata for {}", ds); + } + } + + private void deleteTopic(Datastream ds) { + _log.info("Deleting destination topic for {}", ds); + + try { + _transportProviderAdmins.get(ds.getTransportProviderName()).dropDestination(ds); + } catch (Exception e) { + _log.error("Runtime Exception while delete topic", e); + } + } + private void hardDeleteDatastream(Datastream ds, List allStreams) { String taskPrefix; if (DatastreamUtils.containsTaskPrefix(ds)) { @@ -881,7 +908,7 @@ private void hardDeleteDatastream(Datastream ds, List allStreams) { "No datastream left in the datastream group with taskPrefix {}. Deleting all tasks corresponding to the datastream.", taskPrefix); _adapter.deleteTasksWithPrefix(ds.getConnectorName(), taskPrefix); - deleteTopic(ds); + scheduleTopicForDeletion(ds); } else { _log.info("Found duplicate datastream {} for the datastream to be deleted {}. Not deleting the tasks.", duplicateStream.get().getName(), ds.getName()); @@ -914,20 +941,6 @@ private String createTopic(Datastream datastream) throws TransportException { return datastream.getDestination().getConnectionString(); } - private void deleteTopic(Datastream datastream) { - try { - if (DatastreamUtils.isUserManagedDestination(datastream)) { - _log.info("BYOT(bring your own topic), topic will not be deleted"); - } else if (DatastreamUtils.isConnectorManagedDestination(datastream)) { - _log.info("Datastream contains connector-managed destinations, topic will not be deleted"); - } else { - _transportProviderAdmins.get(datastream.getTransportProviderName()).dropDestination(datastream); - } - } catch (Exception e) { - _log.error("Runtime Exception while delete topic", e); - } - } - private List fetchDatastreamGroups() { // Get all streams that are assignable. Assignable datastreams are the ones: // 1) has a valid destination