diff --git a/datastream-common/src/main/java/com/linkedin/datastream/common/ReflectionUtils.java b/datastream-common/src/main/java/com/linkedin/datastream/common/ReflectionUtils.java index fd53d1b7c..083747dc6 100644 --- a/datastream-common/src/main/java/com/linkedin/datastream/common/ReflectionUtils.java +++ b/datastream-common/src/main/java/com/linkedin/datastream/common/ReflectionUtils.java @@ -8,6 +8,7 @@ import java.lang.reflect.Constructor; import java.lang.reflect.Field; import java.lang.reflect.Method; +import java.lang.reflect.Modifier; import java.util.stream.IntStream; import org.apache.commons.lang.Validate; @@ -98,6 +99,34 @@ public static T setField(Object object, String field, T value) throws Except } } + /** + * Write a private final field with reflection. + * @param object instance whose field is to be accessed + * @param field name of private field + * @param value value to be set to the field + * @param type of the field + * @return thr new value just set or null if failed + */ + public static T setFinalField(Object object, String field, T value) { + Validate.notNull(object, "null target object"); + Validate.notNull(field, "null field name"); + + try { + Field fieldObj = object.getClass().getDeclaredField(field); + fieldObj.setAccessible(true); + + Field modifiersField = Field.class.getDeclaredField("modifiers"); + modifiersField.setAccessible(true); + modifiersField.setInt(fieldObj, fieldObj.getModifiers() & ~Modifier.FINAL); + + fieldObj.set(object, value); + return value; + } catch (Exception e) { + LOG.warn(String.format("Failed to set field, object = %s field = %s value = %s", object, field, value), e); + return null; + } + } + /** * Read a private field with reflection. * @param object instance whose field is to be accessed diff --git a/datastream-server-restli/src/test/java/com/linkedin/datastream/server/TestCoordinator.java b/datastream-server-restli/src/test/java/com/linkedin/datastream/server/TestCoordinator.java index bd507a0e1..a46dac2c3 100644 --- a/datastream-server-restli/src/test/java/com/linkedin/datastream/server/TestCoordinator.java +++ b/datastream-server-restli/src/test/java/com/linkedin/datastream/server/TestCoordinator.java @@ -935,6 +935,8 @@ public void testDatastreamWithoutConnectorManagedDestination() throws Exception Assert.assertEquals(transportProviderAdminFactory._createDestinationCount, 1, "Create destination count should have been 1, since Datastream does not have connector-managed destination"); + ReflectionUtils.setFinalField(coordinator, "TOPIC_DELETION_DELAY", Duration.ofSeconds(1)); + resource.delete(datastreamName); String path = KeyBuilder.datastream(testCluster, datastreamName); Assert.assertTrue(PollUtils.poll(() -> !zkClient.exists(path), 200, WAIT_TIMEOUT_MS)); diff --git a/datastream-server/src/main/java/com/linkedin/datastream/server/Coordinator.java b/datastream-server/src/main/java/com/linkedin/datastream/server/Coordinator.java index d286bde0f..7b1e383d7 100644 --- a/datastream-server/src/main/java/com/linkedin/datastream/server/Coordinator.java +++ b/datastream-server/src/main/java/com/linkedin/datastream/server/Coordinator.java @@ -79,7 +79,6 @@ import static com.linkedin.datastream.common.DatastreamUtils.hasValidDestination; import static com.linkedin.datastream.common.DatastreamUtils.isReuseAllowed; - /** * * Coordinator is the object that bridges ZooKeeper with Connector implementations. There is one instance @@ -168,6 +167,7 @@ public class Coordinator implements ZkAdapter.ZkAdapterListener, MetricsAware { private static final String NUM_ORPHAN_CONNECTOR_TASKS = "numOrphanConnectorTasks"; private static final String MAX_PARTITION_COUNT_IN_TASK = "maxPartitionCountInTask"; private static final String IS_LEADER = "isLeader"; + private static final Duration TOPIC_DELETION_DELAY = Duration.ofHours(1); // Connector common metrics private static final String NUM_DATASTREAMS = "numDatastreams"; @@ -863,6 +863,33 @@ private void handleDatastreamAddOrDelete() { _eventQueue.put(CoordinatorEvent.createLeaderDoAssignmentEvent()); } + private void scheduleTopicForDeletion(Datastream ds) { + try { + if (DatastreamUtils.isUserManagedDestination(ds)) { + _log.info("BYOT(bring your own topic topic will not be deleted"); + } else if (DatastreamUtils.isConnectorManagedDestination(ds)) { + _log.info("Datastream contains connector-managed destinations, topic will not be deleted"); + } else { + _executor.schedule(() -> { + deleteTopic(ds); + }, TOPIC_DELETION_DELAY.toHours(), TimeUnit.HOURS); + _log.info("Destination topic for {} scheduled for deletion.", ds); + } + } catch (Exception e) { + _log.error("Failed to retrieve metadata for {}", ds); + } + } + + private void deleteTopic(Datastream ds) { + _log.info("Deleting destination topic for {}", ds); + + try { + _transportProviderAdmins.get(ds.getTransportProviderName()).dropDestination(ds); + } catch (Exception e) { + _log.error("Runtime Exception while delete topic", e); + } + } + private void hardDeleteDatastream(Datastream ds, List allStreams) { String taskPrefix; if (DatastreamUtils.containsTaskPrefix(ds)) { @@ -881,7 +908,7 @@ private void hardDeleteDatastream(Datastream ds, List allStreams) { "No datastream left in the datastream group with taskPrefix {}. Deleting all tasks corresponding to the datastream.", taskPrefix); _adapter.deleteTasksWithPrefix(ds.getConnectorName(), taskPrefix); - deleteTopic(ds); + scheduleTopicForDeletion(ds); } else { _log.info("Found duplicate datastream {} for the datastream to be deleted {}. Not deleting the tasks.", duplicateStream.get().getName(), ds.getName()); @@ -914,20 +941,6 @@ private String createTopic(Datastream datastream) throws TransportException { return datastream.getDestination().getConnectionString(); } - private void deleteTopic(Datastream datastream) { - try { - if (DatastreamUtils.isUserManagedDestination(datastream)) { - _log.info("BYOT(bring your own topic), topic will not be deleted"); - } else if (DatastreamUtils.isConnectorManagedDestination(datastream)) { - _log.info("Datastream contains connector-managed destinations, topic will not be deleted"); - } else { - _transportProviderAdmins.get(datastream.getTransportProviderName()).dropDestination(datastream); - } - } catch (Exception e) { - _log.error("Runtime Exception while delete topic", e); - } - } - private List fetchDatastreamGroups() { // Get all streams that are assignable. Assignable datastreams are the ones: // 1) has a valid destination