Skip to content

Commit df790f5

Browse files
committed
Update Kafka Post " CHAPTER 4. 카프카 컨슈머: 컨슈머 설정 및 오프셋과 커밋 "
- 컨슈머 관련 설정값 추가 및 내용 보완
1 parent 1e6561d commit df790f5

File tree

1 file changed

+75
-36
lines changed

1 file changed

+75
-36
lines changed

_posts/2025-08-14-Kafka-Consumer-Config-And-Commit.md

Lines changed: 75 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -172,28 +172,43 @@ public class KafkaConsumerConfig {
172172

173173
@Bean
174174
public ConsumerFactory<String, CouponIssueMessage> consumerFactory() {
175-
// ... JsonDeserializer 설정 ...
176-
177-
Map<String, Object> config = new HashMap<>();
178-
// 필수 설정
179-
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
180-
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
181-
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserializer);
182-
config.put(ConsumerConfig.GROUP_ID_CONFIG, "group_1");
183-
184-
// 1. 신뢰성: 오프셋 수동 커밋 설정
185-
config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
186-
187-
// 2. 성능: 한 번의 poll() 호출로 가져올 최대 레코드 수를 지정
188-
config.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);
189-
190-
// 3. 안정성: poll() 간 최대 시간. 메시지 처리 시간보다 넉넉하게 10분으로 설정
191-
config.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 600000);
192-
193-
// 4. 안정성: 리밸런싱 시 중단을 최소화하는 협력적 할당 전략 사용
194-
config.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "org.apache.kafka.clients.consumer.CooperativeStickyAssignor");
195-
196-
return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(), deserializer);
175+
JsonDeserializer<CouponIssueMessage> deserializer = new JsonDeserializer<>(CouponIssueMessage.class);
176+
deserializer.setRemoveTypeHeaders(false);
177+
deserializer.addTrustedPackages("*");
178+
deserializer.setUseTypeMapperForKey(true);
179+
180+
// 공통 필수 설정
181+
Map<String, Object> config = new HashMap<>();
182+
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
183+
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
184+
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserializer);
185+
config.put(ConsumerConfig.GROUP_ID_CONFIG, "group_1");
186+
187+
// 1. 신뢰성 관련 설정
188+
// 1-1) 오프셋 수동 커밋 설정 -> 메시지 처리 완료 후 애플리케이션이 직접 커밋 시점을 제어하여, 메시지 유실을 방지.
189+
config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
190+
// 1-2) 새로운 컨슈머 그룹이 처음 토픽을 읽을 때, 가장 오래된 메시지부터 읽도록 설정
191+
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
192+
193+
// 2. 안정성 관련 설정
194+
// 2-1) 컨슈머가 브로커(그룹 코디네이터)에게 하트비트를 보내지 않고 버틸 수 있는 최대 시간(기본값: 10,000 ms = 10초)
195+
config.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
196+
// 2-2) 컨슈머가 얼마자 자주 하트비트를 보낼지 결정하는 주기. (기본값: 3,000ms = 3초)
197+
config.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 10000);
198+
// 2-3) poll() 간 최대 시간. 메시지 처리 시간보다 넉넉하게 설정. (기본값: 300,000ms = 5분)
199+
config.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 600000); // 10분
200+
// 2-4) 리밸런싱 시 중단을 최소화하는 '협력적 리밸런스' 전략 사용.
201+
config.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "org.apache.kafka.clients.consumer.CooperativeStickyAssignor");
202+
203+
// 3. 성능 관련 설정
204+
// 3-1) poll() 요청 시 브로커가 응답해야 할 최소 데이터 크기. (기본값: 1 byte)
205+
config.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 2);
206+
// 3-2) poll() 요청 시 브로커에서 대기할 최대 시간. (기본값: 500ms)
207+
config.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 1000);
208+
// 3-3) 컨슈머가 브로커로부터 한 번의 poll() 호출로 가져올 최대 메시지 개수. (기본값: 500)
209+
config.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1000);
210+
211+
return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(), deserializer);
197212
}
198213

199214
@Bean
@@ -220,41 +235,65 @@ public class KafkaConsumerConfig {
220235

221236
위 코드는 신뢰성, 안정성, 성능을 모두 고려하여 다음과 같이 설정했다.
222237

223-
> 신뢰성 (수동 커밋)
238+
> 신뢰성
239+
240+
메시지를 유실 없이, 최소 한 번 이상(At-least-once) 처리하는 것을 보장하기 위한 설정이다.
224241

225242
* `ENABLE_AUTO_COMMIT_CONFIG``false`로, `AckMode``MANUAL`로 설정했다.
226243

227-
* 이 조합을 통해 카프카가 자동으로 오프셋을 커밋하는 것을 막고, `@KafkaListener`에서 비즈니스 로직이 완전히 성공했을 때만 `ack.acknowledge()`를 호출하여 직접 오프셋을 커밋할 수 있다.
244+
* 이 조합을 통해 카프카가 자동으로 오프셋을 커밋하는 것을 막고, `@KafkaListener`에서 비즈니스 로직이 완전히 성공했을 때만 `ack.acknowledge()`를 호출하여 직접 오프셋을 커밋할 수 있다.
245+
246+
* 이는 메시지 유실을 방지하는 확실한 방법이다.
247+
248+
* `AUTO_OFFSET_RESET_CONFIG`는 earliest로 설정했다.
228249

229-
* 이는 메시지 유실을 방지하는 확실한 방법이다.
250+
* 이를 통해 새로운 컨슈머 그룹이 서비스를 시작하거나 오프셋 정보가 유실되었을 때,
230251

231-
> 안정성 (컨슈머 그룹 운영)
252+
* 토픽의 가장 처음부터 모든 메시지를 처리하도록 하여 의도치 않게 데이터를 건너뛰는 상황을 방지한다.
232253

233-
* `MAX_POLL_INTERVAL_MS_CONFIG`를 10분으로 길게 설정하여, 쿠폰 발급과 같이 외부 시스템 연동으로 인해 메시지 처리가 길어지더라도 컨슈머가 '좀비' 상태로 오인받아 그룹에서 쫓겨나는 상황을 방지한다.
254+
> 안정성
234255
235-
* 또한`PARTITION_ASSIGNMENT_STRATEGY_CONFIG``CooperativeStickyAssignor`로 지정했다.
256+
컨슈머가 예기치 않은 상황에서도 그룹에서 이탈하지 않고 꾸준히 작업을 수행하도록 보장하기 위한 설정이다.
257+
258+
* 컨슈머 생존 및 동작 감지를 위해 3가지 타임아웃을 명시적으로 설정했다.
259+
260+
* `SESSION_TIMEOUT_MS_CONFIG`(30초)와 `HEARTBEAT_INTERVAL_MS_CONFIG`(10초)는 컨슈머 프로세스의 생존(Liveness)을,
261+
262+
* `MAX_POLL_INTERVAL_MS_CONFIG`(10분)은 메시지 처리 로직의 동작(Progress)을 감시한다.
263+
264+
* 이 값들을 넉넉하게 설정하여, 일시적인 부하나 긴 처리 시간으로 인해 컨슈머가 '좀비'로 오인받아 그룹에서 쫓겨나는 상황을 방지한다.
265+
266+
* 또한, `PARTITION_ASSIGNMENT_STRATEGY_CONFIG``CooperativeStickyAssignor`로 지정했다.
236267

237268
* 이는 컨슈머 인스턴스가 추가되거나 제거될 때 발생하는 리밸런싱 과정에서 전체 컨슈머 그룹의 작업 중단('Stop-the-world')을 막고,
238269

239270
* **변경이 필요한 파티션만 재할당**하여 서비스의 가용성을 극대화하는 좋은 선택이다.
240271

241-
> 성능 (병렬 처리)
272+
> 성능
273+
274+
병렬 처리와 효율적인 데이터 가져오기라는 두 가지 전략을 통해 성능을 최적화하는 설정이다.
242275

243276
* 컨슈머의 처리량을 높이는 가장 확실한 방법은 병렬 처리를 활용하는 것이다. 코드에서는 `setConcurrency(3)` 설정을 통해 3개의 컨슈머 스레드가 각각 다른 파티션의 메시지를 동시에 처리하도록 했다.
244277

245-
* 이는 마치 은행에 예금 상품 가입 신청서가 하나의 창구에만 쌓여 업무가 지연되자, 은행 지점장이 가입 전용 창구(컨슈머 스레드)를 3개로 늘리는 것과 같다.
278+
* 이는 마치 은행에 예금 상품 가입 신청서가 하나의 창구에만 쌓여 업무가 지연되자, 은행 지점장이 가입 전용 창구(컨슈머 스레드)를 3개로 늘리는 것과 같다.
279+
280+
* 토픽의 파티션이 3개일 때: 카프카는 3개의 창구에 미리 분류해 둔 신청서 묶음(파티션)을 하나씩 1:1로 배정한다.
281+
282+
* 1번 창구(텔러) → 1번 신청서 묶음 전담
283+
284+
* 2번 창구(텔러) → 2번 신청서 묶음 전담
246285

247-
* 토픽의 파티션이 3개일 때: 카프카는 3개의 창구에 미리 분류해 둔 신청서 묶음(파티션)을 하나씩 1:1로 배정한다.
286+
* 3번 창구(텔러) → 3번 신청서 묶음 전담
248287

249-
* 1번 창구(텔러) → 1번 신청서 묶음 전담
288+
* 그 결과, 3개의 창구에서 동시에 가입 신청(메시지)이 처리되므로, 이론적으로 전체 처리 속도가 3배로 향상된다. 이처럼 최대 성능을 내기 위해서는 보통 `Concurrency` 값을 파티션 개수와 동일하게 맞춘다.
250289

251-
* 2번 창구(텔러) → 2번 신청서 묶음 전담
290+
* 효율적인 데이터 가져오기를 위해 `poll` 관련 설정을 튜닝했다.
252291

253-
* 3번 창구(텔러) → 3번 신청서 묶음 전담
292+
* `MAX_POLL_RECORDS_CONFIG`를 1000으로 높여 한 번에 대량의 메시지를 가져오도록 하고,
254293

255-
* 결과, 3개의 창구에서 동시에 가입 신청(메시지)이 처리되므로, 이론적으로 전체 처리 속도가 3배로 향상된다. 이처럼 최대 성능을 내기 위해서는 보통 `Concurrency` 값을 파티션 개수와 동일하게 맞춘다.
294+
* `FETCH_MAX_WAIT_MS_CONFIG`를 1000ms(1초)로, `FETCH_MIN_BYTES_CONFIG`를 2바이트로 설정하여 트래픽이 적을 때도 **최대 1초간 기다리며 최소 2바이트 이상의 데이터를 최대한 모아** 한 번에 가져오도록 했다.
256295

257-
* 여기서 `MAX_POLL_RECORDS_CONFIG`는 한 번에 처리할 작업의 양을 조절하는 역할을 한다.
296+
* 이러한 설정은 약간의 지연 시간(Latency)을 감수하는 대신, 불필요한 네트워크 통신을 줄여 시스템의 전체적인 **처리량(Throughput)을 극대화**한다.
258297

259298
## References
260299

0 commit comments

Comments
 (0)