This repository was archived by the owner on Dec 14, 2022. It is now read-only.

Description
Describe the bug
if checkpoint not turn on,the offset will never got commit, when we restart the job it will consumer from earliest offset again
can i fix it like this ? by commit it peroid
At ReaderThread run() method
After emitRecord(message) is call
// auto commit if checkpoint is not enable
if (!isCheckpointingEnabled && (SystemClock.now() - ackPeroid > lastAckTime)) {
offset.put(topicRange, message.getMessageId());
metadataReader.commitCursorToOffset(offset);
lastAckTime = SystemClock.now();
}