카프카 컨슈머 설정 – 13가지 옵션 정리

카프카 컨슈머 설정 – 13가지 옵션 정리

카프카 컨슈머 설정 옵션들에 대한 내용입니다. 카프카 컨슈머 메시지를 보내는 역할을 하는 클라이언트를 의미합니다. 안정적인 데이타 파이프라인 구축을 위해서는 서비스 요구사항에 맞는 컨슈머 설정이 매우 중요하므로 각 설정에 대한 명확한 이해가 필요합니다.

카프카 프로듀서 옵션 설명이 필요하신 분은 여기 링크를 참고바랍니다.


카프카 컨슈머 설정 옵션

  • bootstrap.servers : 컨슈머가 연결할 브로커의 정보를 입력합니다.

  • fetch.min.bytes : 한 번에 가져올 수 최소 데이터 크기를 의미합니다. 만약 지정한 크기보다 작은 경우, 요청에 응답하지 않고 데이터가 누적될 때까지 기다립니다.
  • heartbeat.interval.ms : 컨슈머의 상태가 active임을 의미합니다. 정상 동작을 우해서는 sesson.timeout.ms의 1/3으로 설정해야 합니다.
  • max.partition.fetch.bytes : 파티션 당 가져올 수 있는 최대 크기입니다.
  • session.timeout.ms : 해당 주기마다 컨슈머가 종료된 것인지 판단합니다. 이 시간 전까지 하트비트를 보내지 않으면 컨슈머는 종료된 것으로 간주하고 컨슈머그룹에서 제외하고 리밸런싱을 시작합니다.
  • enable.auto.commit : 백그라운드로 주기적으로 오프셋을 커밋합니다.
  • auto.offset.reset : 카프카에서 초기 오프셋이 없거나 현재 오프셋이 더 이상 존재하지 않을 경우 다음 옵션으로 reset합니다.
  • fetch.max.bytes : 한 번의 가져오기 요청으로 가져올 수 있는 최대 크기입니다.
  • group.instance.id : 컨슈머의 고유한 식별자입니다. 만약 이 값이 설정되어 있다면 static 멤버로 간주되어 불필요한 리밸런싱을 하지 않습니다.
  • isolation.level : 트랙잭션 컨슈머에서 사용되는 옵션입니다. read_uncomitted는 기본값으로 모든 메시지를 읽습니다. 하지만 read_committed는 트랜잭션이 완료된 메시지만 읽습니다.
  • max.poll.records : 한 번의 poll() 요청으로 가죠오는 최대 메시지 수입니다.
  • partition.assignment.strategy : 파티션 할당 전략이며, 기본값은 range입니다.
  • fetch.max.wait.ms : fetch.min.bytes에 의해 설정된 데이터보다 적은 경우 요청에 대한 응답을 기다리는 최대 시간입니다.


카프카 컨슈머 설정 샘플 코드

아래는 위에서 설명한 설정 중에 JAVA 언어로 생성하는 컨슈머의 예시입니다. 간략히 설명하겠습니다.

  1. bootstrap.servers, group.id, enable.auto.commit, auto.offset.reset, key.deserializer, value.deserializer 옵션을 설정합니다.
  2. 해당 옵션으로 컨슈머를 생성합니다.
  3. polling 방식으로 1초마다 브로셔에서 배치형태로 토픽을 가져옵니다.
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;
import java.util.Properties;

public class ConsumerSample {
    public static void main(String[] args) {
        Properties propts = new Properties();  //Properties 생성.
        propts.put("bootstrap.servers", "kafka01.sample.test:9092,kafka02.sample.test:9092,kafka03.sample.test:9092"); // bootstrap.servers -> 연결할 브로커 설정.
        propts.put("group.id", "consumer_01"); // 컨슈머 그룹 아이디 설정.
        propts.put("enable.auto.commit", "true"); //오토 커밋을 사용 설정.
        propts.put("auto.offset.reset", "latest"); //컨슈머 오프셋을 찾지 못하는 경우 latest로 초기화하여 가장 최근부터 메시지를 가져옵니다.
        propts.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); //문자열을 사용했으므로 StringDeserializer 지정합니다.
        propts.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(propts);  //Properties 오브젝트로 컨슈머를 생성합니다.
        consumer.subscribe(Arrays.asList("topic01")); // 구독할 토픽을 지정합니다.

        try {
            while (true) { // 메시지를 가져오기 위해 카프카에 계속 poll()을 합니다.
                ConsumerRecords<String, String> records = consumer.poll(1000); //컨슈머는 1초마다 폴링합니다. 해당 시간만큼 블럭됩니다.
                for (ConsumerRecord<String, String> record : records) { //poll()은 레코드 전체를 리턴합니다, 하나의 메시지만 가져오는 것이 아닌 배치형태로 가져오므로 반복문으로 하나씩 메세지를 처리합니다.
                    System.out.printf("Topic: %s, Partition: %s, Offset: %d, Key: %s, Value: %s\n",
                            record.topic(), record.partition(), record.offset(), record.key(), record.value());
                }
            }
        } catch (Exception e){
            e.printStackTrace();
        } finally {
            consumer.close(); //컨슈머를 종료합니다.
        }
    }
}

이상으로 간략한 카프카 컨슈머 설정 정리를 마치겠습니다.

카프카 컨슈머 설정에 대한 자세한 내용은 여기 링크를 참고바랍니다.

좀더 자세한 설명을 원하시는 분은 여기 링크로 ‘실전 카프카 개발부터 운영까지’ 책을 사서 보시길 추천드립니다.


참고 : 실전 카프카 개발부터 운영까지

실전 카프카 개발부터 운영까지:데이터플랫폼의 중추 아파치 카프카의 내부동작과 개발 운영 보안의 모든것, 책만 아파치 카프카의 모든 것 세트 : 카프카 데이터 플랫폼의 최강자+실전 카프카 개발부터 운영까지, 책만 (서점추천) 헤드 퍼스트 디자인 패턴 + 실전 카프카 개발부터 운영까지 (전2권) (서점추천) 24단계 실습으로 정복하는 쿠버네티스 + 실전 카프카 개발부터 운영까지 (전2권), 위키북스 (서점추천) 데이터 파이프라인 핵심 가이드 + 실전 카프카 개발부터 운영까지 (전2권) (서점추천) 몽고DB 완벽 가이드 + 실전 카프카 개발부터 운영까지 (전2권) (서점추천) 클린 코드의 기술 + 실전 카프카 개발부터 운영까지 (전2권), 영진닷컴 (서점추천) 카프카 핵심 가이드 + 디지털 플랫폼 전략 수립을 위한 쿠버네티스 실전 활용서 (전2권), 제이펍 아파치 카프카 애플리케이션 프로그래밍 with 자바:카프카의 개념부터 스트림즈 커넥트 스프링 카프카까지, 비제이퍼블릭 카프카 핵심 가이드 개정증보판, 제이펍 실전 아파치 카프카:애플리케이션 개발부터 파이프라인 사물인터넷 데이터 허브 구축까지, 한빛미디어 카프카 데이터 플랫폼의 최강자:실시간 비동기 스트리밍 솔루션 Kafka의 기본부터 확장 응용까지, 책만 카프카 스트림즈와 ksqlDB 정복:실시간 데이터 처리, 에이콘출판 카프카 핵심가이드:실시간 데이터와 스트림 프로세싱, 제이펍

“이 포스팅은 쿠팡 파트너스 활동의 일환으로, 이에 따른 일정액의 수수료를 제공받습니다.”

Back to top