Skip to content

Conversation

@SongChujun
Copy link
Member

@SongChujun SongChujun commented Oct 26, 2025

Description

Motivation behind this change is described in #27121

Additional context and related issues

Release notes

( ) This is not user-visible or is docs only, and no release notes are required.
( ) Release notes are required. Please propose a release note for me.
( ) Release notes are required, with the following suggested text:

## Section
* Fix some things. ({issue}`issuenumber`)

Summary by Sourcery

Enable scheduling of additional leaf drivers when existing drivers are blocked by tracking blocked splits and adjusting concurrency and scheduling logic.

Bug Fixes:

  • Allow additional leaf drivers to be scheduled when others are blocked instead of leaving capacity idle

Enhancements:

  • Track blocked leaf splits in TaskEntry and notify block/unblock events via SplitBlockedStateChangeListener
  • Adjust effective running concurrency by subtracting blocked splits when updating the ConcurrencyController
  • Include blocked splits in the global scheduling calculation to fill available driver slots
  • Add validation to ensure currentConcurrency is non-negative in ConcurrencyController

@cla-bot cla-bot bot added the cla-signed label Oct 26, 2025
@sourcery-ai
Copy link

sourcery-ai bot commented Oct 26, 2025

Reviewer's Guide

Introduce tracking of blocked leaf drivers to free up scheduling slots when splits are blocked: SplitProcessor now notifies TaskEntry of blocked/running state, TaskEntry tracks blocked splits and updates concurrency accordingly, ThreadPerDriverTaskExecutor includes blocked splits in target calculation, and concurrency controller adds basic argument validation.

Class diagram for updated SplitProcessor and TaskEntry

classDiagram
class SplitProcessor {
  +TaskId taskId
  +int splitId
  +SplitRunner split
  +Tracer tracer
  +SplitBlockedStateChangeListener blockStateChangeListener
  +run(SchedulerContext context)
  <<interface>> SplitBlockedStateChangeListener
  +splitBlocked()
  +splitRunning()
}
class TaskEntry {
  -int runningLeafSplits
  -int blockedLeafSplits
  -Set<SplitRunner> blockedLeafSplitSet
  +runSplit(SplitRunner split, boolean trackLeafSplit)
  +leafSplitDone(QueuedSplit split)
  +blockedLeafSplits()
  +updateConcurrency()
  +onSplitBlocked(SplitRunner split)
  +onSplitRunning(SplitRunner split)
}
SplitProcessor --> SplitProcessor.SplitBlockedStateChangeListener
TaskEntry --> SplitProcessor.SplitBlockedStateChangeListener
Loading

Class diagram for ThreadPerDriverTaskExecutor and ConcurrencyController changes

classDiagram
class ThreadPerDriverTaskExecutor {
  -scheduleMoreLeafSplits()
}
class ConcurrencyController {
  +update(double utilization, int currentConcurrency)
}
ThreadPerDriverTaskExecutor --> TaskEntry
TaskEntry --> ConcurrencyController
Loading

File-Level Changes

Change Details Files
Track blocked leaf splits in TaskEntry
  • Add blockedLeafSplits counter and blockedLeafSplitSet
  • Clear blocked split state on destroy
  • Overload runSplit to attach blocking listener
  • Implement onSplitBlocked and onSplitRunning to update state
TaskEntry.java
Propagate split-blocked/running events in SplitProcessor
  • Introduce SplitBlockedStateChangeListener interface and NOOP default
  • Add listener field to SplitProcessor constructor
  • Invoke splitBlocked/splitRunning at appropriate points
SplitProcessor.java
Adjust concurrency and scheduling to account for blocked splits
  • Subtract blockedLeafSplits in TaskEntry.updateConcurrency
  • Include total blockedLeafSplits in global scheduling target
TaskEntry.java
ThreadPerDriverTaskExecutor.java
Validate concurrency input
  • Add checkArgument for non-negative currentConcurrency
ConcurrencyController.java

Tips and commands

Interacting with Sourcery

  • Trigger a new review: Comment @sourcery-ai review on the pull request.
  • Continue discussions: Reply directly to Sourcery's review comments.
  • Generate a GitHub issue from a review comment: Ask Sourcery to create an
    issue from a review comment by replying to it. You can also reply to a
    review comment with @sourcery-ai issue to create an issue from it.
  • Generate a pull request title: Write @sourcery-ai anywhere in the pull
    request title to generate a title at any time. You can also comment
    @sourcery-ai title on the pull request to (re-)generate the title at any time.
  • Generate a pull request summary: Write @sourcery-ai summary anywhere in
    the pull request body to generate a PR summary at any time exactly where you
    want it. You can also comment @sourcery-ai summary on the pull request to
    (re-)generate the summary at any time.
  • Generate reviewer's guide: Comment @sourcery-ai guide on the pull
    request to (re-)generate the reviewer's guide at any time.
  • Resolve all Sourcery comments: Comment @sourcery-ai resolve on the
    pull request to resolve all Sourcery comments. Useful if you've already
    addressed all the comments and don't want to see them anymore.
  • Dismiss all Sourcery reviews: Comment @sourcery-ai dismiss on the pull
    request to dismiss all existing Sourcery reviews. Especially useful if you
    want to start fresh with a new review - don't forget to comment
    @sourcery-ai review to trigger a new review!

Customizing Your Experience

Access your dashboard to:

  • Enable or disable review features such as the Sourcery-generated pull request
    summary, the reviewer's guide, and others.
  • Change the review language.
  • Add, remove or edit custom review instructions.
  • Adjust other review settings.

Getting Help

@SongChujun SongChujun marked this pull request as ready for review October 27, 2025 22:24
Copy link

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey there - I've reviewed your changes - here's some feedback:

  • In scheduleMoreLeafSplits, consider tracking a global blocked leaf split count rather than iterating tasks on every scheduling pass to avoid potential performance overhead when many tasks are present.
  • Add a comment explaining why blockedLeafSplits are subtracted from runningLeafSplits in the concurrency update logic to clarify the intended scheduling impact for future maintainers.
Prompt for AI Agents
Please address the comments from this code review:

## Overall Comments
- In scheduleMoreLeafSplits, consider tracking a global blocked leaf split count rather than iterating tasks on every scheduling pass to avoid potential performance overhead when many tasks are present.
- Add a comment explaining why blockedLeafSplits are subtracted from runningLeafSplits in the concurrency update logic to clarify the intended scheduling impact for future maintainers.

## Individual Comments

### Comment 1
<location> `core/trino-main/src/main/java/io/trino/execution/executor/dedicated/TaskEntry.java:238-99` </location>
<code_context>

     private record QueuedSplit(SplitRunner split, SettableFuture<Void> done) {}

+    private synchronized void onSplitBlocked(SplitRunner split)
+    {
+        if (blockedLeafSplitSet.add(split)) {
+            blockedLeafSplits++;
+        }
+    }
+
</code_context>

<issue_to_address>
**issue (bug_risk):** Blocked split tracking may not handle repeated state transitions correctly.

Repeated calls to splitBlocked or splitRunning for the same SplitRunner may lead to incorrect blockedLeafSplits counts. Please review if additional state validation is needed to maintain accurate tracking.
</issue_to_address>

### Comment 2
<location> `core/trino-main/src/main/java/io/trino/execution/executor/dedicated/SplitProcessor.java:100-107` </location>
<code_context>
                             }
                         }
                         else {
+                            blockStateChangeListener.splitBlocked();
                             processSpan.addEvent("blocked");
                             processSpan.end();
                             if (!context.block(blocked)) {
                                 processSpan = null;
                                 return;
                             }
+                            blockStateChangeListener.splitRunning();
                         }
                         processSpan = newSpan(splitSpan, processSpan);
</code_context>

<issue_to_address>
**suggestion:** Listener callbacks for splitBlocked and splitRunning may not cover all transition scenarios.

Consider cases where splits transition repeatedly or encounter errors; ensure the listener is notified for all relevant state changes and add hooks if necessary.

Suggested implementation:

```java
    interface SplitBlockedStateChangeListener
    {
        void splitBlocked();

        void splitRunning();

        void splitUnblocked();

        void splitError(Throwable error);

        void splitCompleted();

```

```java
                        else {
                            blockStateChangeListener.splitBlocked();
                            processSpan.addEvent("blocked");
                            processSpan.end();
                            if (!context.block(blocked)) {
                                processSpan = null;
                                blockStateChangeListener.splitUnblocked();
                                return;
                            }
                            blockStateChangeListener.splitRunning();
                        }
                        processSpan = newSpan(splitSpan, processSpan);

```

```java
                    }
                    // Example: handle error scenario
                    catch (Throwable error) {
                        blockStateChangeListener.splitError(error);
                        throw error;
                    }
                    // Example: handle completion scenario
                    finally {
                        blockStateChangeListener.splitCompleted();
                    }

```

- You will need to implement the new methods (`splitUnblocked`, `splitError`, `splitCompleted`) in all classes that implement `SplitBlockedStateChangeListener`.
- Review all split state transitions in the file and ensure the appropriate listener method is called for each transition (blocked, running, unblocked, error, completed).
- If there are other places in the file where splits transition state (not shown in the provided code), add the relevant listener calls there as well.
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.


for (SplitRunner split : running) {
split.close();
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (bug_risk): Blocked split tracking may not handle repeated state transitions correctly.

Repeated calls to splitBlocked or splitRunning for the same SplitRunner may lead to incorrect blockedLeafSplits counts. Please review if additional state validation is needed to maintain accurate tracking.

Comment on lines +100 to +107
blockStateChangeListener.splitBlocked();
processSpan.addEvent("blocked");
processSpan.end();
if (!context.block(blocked)) {
processSpan = null;
return;
}
blockStateChangeListener.splitRunning();
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: Listener callbacks for splitBlocked and splitRunning may not cover all transition scenarios.

Consider cases where splits transition repeatedly or encounter errors; ensure the listener is notified for all relevant state changes and add hooks if necessary.

Suggested implementation:

    interface SplitBlockedStateChangeListener
    {
        void splitBlocked();

        void splitRunning();

        void splitUnblocked();

        void splitError(Throwable error);

        void splitCompleted();
                        else {
                            blockStateChangeListener.splitBlocked();
                            processSpan.addEvent("blocked");
                            processSpan.end();
                            if (!context.block(blocked)) {
                                processSpan = null;
                                blockStateChangeListener.splitUnblocked();
                                return;
                            }
                            blockStateChangeListener.splitRunning();
                        }
                        processSpan = newSpan(splitSpan, processSpan);
                    }
                    // Example: handle error scenario
                    catch (Throwable error) {
                        blockStateChangeListener.splitError(error);
                        throw error;
                    }
                    // Example: handle completion scenario
                    finally {
                        blockStateChangeListener.splitCompleted();
                    }
  • You will need to implement the new methods (splitUnblocked, splitError, splitCompleted) in all classes that implement SplitBlockedStateChangeListener.
  • Review all split state transitions in the file and ensure the appropriate listener method is called for each transition (blocked, running, unblocked, error, completed).
  • If there are other places in the file where splits transition state (not shown in the provided code), add the relevant listener calls there as well.

@github-actions
Copy link

This pull request has gone a while without any activity. Ask for help on #core-dev on Trino slack.

@github-actions github-actions bot added the stale label Nov 18, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Development

Successfully merging this pull request may close these issues.

1 participant