Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import java.lang.reflect.Constructor;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.util.stream.IntStream;

import org.apache.commons.lang.Validate;
Expand Down Expand Up @@ -98,6 +99,34 @@ public static <T> T setField(Object object, String field, T value) throws Except
}
}

/**
* Write a private final field with reflection.
* @param object instance whose field is to be accessed
* @param field name of private field
* @param value value to be set to the field
* @param <T> type of the field
* @return thr new value just set or null if failed
*/
public static <T> T setFinalField(Object object, String field, T value) {
Validate.notNull(object, "null target object");
Validate.notNull(field, "null field name");

try {
Field fieldObj = object.getClass().getDeclaredField(field);
fieldObj.setAccessible(true);

Field modifiersField = Field.class.getDeclaredField("modifiers");
modifiersField.setAccessible(true);
modifiersField.setInt(fieldObj, fieldObj.getModifiers() & ~Modifier.FINAL);

fieldObj.set(object, value);
return value;
} catch (Exception e) {
LOG.warn(String.format("Failed to set field, object = %s field = %s value = %s", object, field, value), e);
return null;
}
}

/**
* Read a private field with reflection.
* @param object instance whose field is to be accessed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -935,6 +935,8 @@ public void testDatastreamWithoutConnectorManagedDestination() throws Exception
Assert.assertEquals(transportProviderAdminFactory._createDestinationCount, 1,
"Create destination count should have been 1, since Datastream does not have connector-managed destination");

ReflectionUtils.setFinalField(coordinator, "TOPIC_DELETION_DELAY", Duration.ofSeconds(1));

resource.delete(datastreamName);
String path = KeyBuilder.datastream(testCluster, datastreamName);
Assert.assertTrue(PollUtils.poll(() -> !zkClient.exists(path), 200, WAIT_TIMEOUT_MS));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@
import static com.linkedin.datastream.common.DatastreamUtils.hasValidDestination;
import static com.linkedin.datastream.common.DatastreamUtils.isReuseAllowed;


/**
*
* Coordinator is the object that bridges ZooKeeper with Connector implementations. There is one instance
Expand Down Expand Up @@ -168,6 +167,7 @@ public class Coordinator implements ZkAdapter.ZkAdapterListener, MetricsAware {
private static final String NUM_ORPHAN_CONNECTOR_TASKS = "numOrphanConnectorTasks";
private static final String MAX_PARTITION_COUNT_IN_TASK = "maxPartitionCountInTask";
private static final String IS_LEADER = "isLeader";
private static final Duration TOPIC_DELETION_DELAY = Duration.ofHours(1);

// Connector common metrics
private static final String NUM_DATASTREAMS = "numDatastreams";
Expand Down Expand Up @@ -863,6 +863,33 @@ private void handleDatastreamAddOrDelete() {
_eventQueue.put(CoordinatorEvent.createLeaderDoAssignmentEvent());
}

private void scheduleTopicForDeletion(Datastream ds) {
try {
if (DatastreamUtils.isUserManagedDestination(ds)) {
_log.info("BYOT(bring your own topic topic will not be deleted");
} else if (DatastreamUtils.isConnectorManagedDestination(ds)) {
_log.info("Datastream contains connector-managed destinations, topic will not be deleted");
} else {
_executor.schedule(() -> {
deleteTopic(ds);
}, TOPIC_DELETION_DELAY.toHours(), TimeUnit.HOURS);
_log.info("Destination topic for {} scheduled for deletion.", ds);
}
} catch (Exception e) {
_log.error("Failed to retrieve metadata for {}", ds);
}
}

private void deleteTopic(Datastream ds) {
_log.info("Deleting destination topic for {}", ds);

try {
_transportProviderAdmins.get(ds.getTransportProviderName()).dropDestination(ds);
} catch (Exception e) {
_log.error("Runtime Exception while delete topic", e);
}
}

private void hardDeleteDatastream(Datastream ds, List<Datastream> allStreams) {
String taskPrefix;
if (DatastreamUtils.containsTaskPrefix(ds)) {
Expand All @@ -881,7 +908,7 @@ private void hardDeleteDatastream(Datastream ds, List<Datastream> allStreams) {
"No datastream left in the datastream group with taskPrefix {}. Deleting all tasks corresponding to the datastream.",
taskPrefix);
_adapter.deleteTasksWithPrefix(ds.getConnectorName(), taskPrefix);
deleteTopic(ds);
scheduleTopicForDeletion(ds);
} else {
_log.info("Found duplicate datastream {} for the datastream to be deleted {}. Not deleting the tasks.",
duplicateStream.get().getName(), ds.getName());
Expand Down Expand Up @@ -914,20 +941,6 @@ private String createTopic(Datastream datastream) throws TransportException {
return datastream.getDestination().getConnectionString();
}

private void deleteTopic(Datastream datastream) {
try {
if (DatastreamUtils.isUserManagedDestination(datastream)) {
_log.info("BYOT(bring your own topic), topic will not be deleted");
} else if (DatastreamUtils.isConnectorManagedDestination(datastream)) {
_log.info("Datastream contains connector-managed destinations, topic will not be deleted");
} else {
_transportProviderAdmins.get(datastream.getTransportProviderName()).dropDestination(datastream);
}
} catch (Exception e) {
_log.error("Runtime Exception while delete topic", e);
}
}

private List<DatastreamGroup> fetchDatastreamGroups() {
// Get all streams that are assignable. Assignable datastreams are the ones:
// 1) has a valid destination
Expand Down