KStreams PAPI test for TimestampedWindowStoreWithHeaders#4212
KStreams PAPI test for TimestampedWindowStoreWithHeaders#4212Lucy Liu (lucliu1108) wants to merge 3 commits intoconfluentinc:masterfrom
Conversation
|
🎉 All Contributor License Agreements have been signed. Ready to merge. |
There was a problem hiding this comment.
Pull request overview
Adds a new Streams integration test class to validate TimestampedWindowStoreWithHeaders behavior (including header-based schema ID transport) across a variety of store operations and query paths.
Changes:
- Introduces
TimestampedWindowStoreWithHeadersIntegrationTestcovering PAPI-style store ops, IQv1 querying, delete/tombstone behavior, and iterator method behavior. - Adds helper utilities for topic creation, Streams lifecycle, producing/consuming records, and schema-id header assertions.
- Fixes trailing indentation/formatting at the end of
streams-integration-tests/pom.xml.
Reviewed changes
Copilot reviewed 1 out of 2 changed files in this pull request and generated 10 comments.
| File | Description |
|---|---|
streams-integration-tests/src/test/java/io/confluent/kafka/streams/integration/TimestampedWindowStoreWithHeadersIntegrationTest.java |
New, comprehensive integration test suite for window-store-with-headers operations and IQ querying. |
streams-integration-tests/pom.xml |
Minor formatting change (project closing tag alignment). |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| "Tombstone record should have key schema ID header"); | ||
| } | ||
| } | ||
| System.out.println("Tombstone count: " + tombstoneCount); |
There was a problem hiding this comment.
Avoid printing to stdout from tests. This adds noise to CI logs and can hide relevant output; if you need diagnostics, use assertions/messages or a test logger that can be enabled when debugging.
| System.out.println("Tombstone count: " + tombstoneCount); |
| assertTrue(startedLatch.await(30, TimeUnit.SECONDS), "KafkaStreams should reach RUNNING state"); | ||
| return streams; |
There was a problem hiding this comment.
If the RUNNING-state await times out, startStreamsAndAwaitRunning throws an AssertionError after starting KafkaStreams, but the instance is never closed (call site won't assign it due to the exception). Consider closing streams before failing (e.g., in a try/finally around the await) to avoid thread/resource leaks and noisy CI failures when the startup is flaky.
| assertTrue(startedLatch.await(30, TimeUnit.SECONDS), "KafkaStreams should reach RUNNING state"); | |
| return streams; | |
| boolean running = false; | |
| try { | |
| running = startedLatch.await(30, TimeUnit.SECONDS); | |
| assertTrue(running, "KafkaStreams should reach RUNNING state"); | |
| return streams; | |
| } finally { | |
| if (!running) { | |
| closeStreams(streams); | |
| } | |
| } |
86f9cf8 to
fb6a067
Compare
TengYao Chi (frankvicky)
left a comment
There was a problem hiding this comment.
Thanks for the PR
| } | ||
|
|
||
| /** | ||
| * Fetch range for event-2 from 6-11min (240000ms to 660000ms). |
There was a problem hiding this comment.
seems the comment not accurate?
| @Override | ||
| public void process(Record<GenericRecord, GenericRecord> record) { | ||
| String operation = record.value().get("operation").toString(); | ||
| long windowStart = (record.timestamp() / WINDOW_SIZE.toMillis()) * WINDOW_SIZE.toMillis(); |
There was a problem hiding this comment.
Could we have a helper method for window start/end calculation?
There was a problem hiding this comment.
Edit: added helper method calculateWindowStartTime to calculate window start time
| } | ||
| } | ||
|
|
||
| private List<ConsumerRecord<GenericRecord, GenericRecord>> consumeRecords( |
There was a problem hiding this comment.
Should we need add a check when received count > expected count?
There was a problem hiding this comment.
Edit: added a number check after the poll loop completed
address comments address comments
fb6a067 to
d3c244e
Compare
Summary
This PR adds:
What
Checklist
Please answer the questions with Y, N or N/A if not applicable.
References
JIRA:
Test & Review
Open questions / Follow-ups