When using Reliable Delivery with a standard ProducerController backed by a durable queue (e.g., EventSourcedProducerQueue), restarting the producer causes an immediate crash if all previous messages were confirmed before the restart.
The ProducerController successfully reloads its state but crashes with the following error before a ConsumerController can even establish demand:
java.lang.IllegalStateException: Unexpected Msg when no demand, requested true, requestedSeqNr 1, currentSeqNr X (where X is the actual restored sequence number).
Note: The WorkPullingProducerController handles state initialization differently and is unaffected by this issue.
Steps to Reproduce:
I have verified this behavior using an isolated test with Pekko's in-memory journal.
- Start a
ProducerController with a durable queue and a ConsumerController.
- Send a message, and allow the consumer to receive and confirm it (clearing the unconfirmed buffer).
- Wait for the
ProducerController to receive the next RequestNext, ensuring the confirmed state is fully written to the durable queue.
- Stop and restart the ProducerController to simulate a restart, crash, or deployment.
- The framework emits a
RequestNext with seqNr = 1 instead of the restored sequence number. When the producer supplies the next message, it fails the internal demand check and crashes with an IllegalStateException.
(See the attached ProducerControllerBugTest.scala snippet below for the fully reproducible test case. Note that the test is inverted: it succeeds if it can reproduce the bug).
ProducerControllerBugTest.txt
Root Cause:
In ProducerControllerImpl.scala, the state recovery logic ignores the loaded sequence number when initializing the demand window and requesting the next message from the local producer:
In createState, requestedSeqNr is hardcoded to 1L instead of adopting loadedState.currentSeqNr.
In becomeActive, if state.unconfirmed.isEmpty is true, it hardcodes 1L and 0L into the RequestNext message and the flight recorder, rather than using state.currentSeqNr and state.confirmedSeqNr.
(See the attached bugfix.txt which contains a git diff of the fix that apparently solves this issue)
bugfix.txt
When using Reliable Delivery with a standard
ProducerControllerbacked by a durable queue (e.g.,EventSourcedProducerQueue), restarting the producer causes an immediate crash if all previous messages were confirmed before the restart.The
ProducerControllersuccessfully reloads its state but crashes with the following error before aConsumerControllercan even establish demand:java.lang.IllegalStateException: Unexpected Msg when no demand, requested true, requestedSeqNr 1, currentSeqNr X(whereXis the actual restored sequence number).Note: The
WorkPullingProducerControllerhandles state initialization differently and is unaffected by this issue.Steps to Reproduce:
I have verified this behavior using an isolated test with Pekko's in-memory journal.
ProducerControllerwith a durable queue and aConsumerController.ProducerControllerto receive the nextRequestNext, ensuring the confirmed state is fully written to the durable queue.RequestNextwithseqNr = 1instead of the restored sequence number. When the producer supplies the next message, it fails the internal demand check and crashes with anIllegalStateException.(See the attached ProducerControllerBugTest.scala snippet below for the fully reproducible test case. Note that the test is inverted: it succeeds if it can reproduce the bug).
ProducerControllerBugTest.txt
Root Cause:
In ProducerControllerImpl.scala, the state recovery logic ignores the loaded sequence number when initializing the demand window and requesting the next message from the local producer:
In
createState,requestedSeqNris hardcoded to1Linstead of adoptingloadedState.currentSeqNr.In
becomeActive, ifstate.unconfirmed.isEmptyistrue, it hardcodes1Land0Linto theRequestNextmessage and the flight recorder, rather than usingstate.currentSeqNrandstate.confirmedSeqNr.(See the attached bugfix.txt which contains a
git diffof the fix that apparently solves this issue)bugfix.txt