카프카 컨슈머 설정 옵션들에 대한 내용입니다. 카프카 컨슈머 메시지를 보내는 역할을 하는 클라이언트를 의미합니다. 안정적인 데이타 파이프라인 구축을 위해서는 서비스 요구사항에 맞는 컨슈머 설정이 매우 중요하므로 각 설정에 대한 명확한 이해가 필요합니다.
카프카 프로듀서 옵션 설명이 필요하신 분은 여기 링크를 참고바랍니다.
목차
Toggle카프카 컨슈머 설정 옵션
- bootstrap.servers : 컨슈머가 연결할 브로커의 정보를 입력합니다.
- fetch.min.bytes : 한 번에 가져올 수 최소 데이터 크기를 의미합니다. 만약 지정한 크기보다 작은 경우, 요청에 응답하지 않고 데이터가 누적될 때까지 기다립니다.
- heartbeat.interval.ms : 컨슈머의 상태가 active임을 의미합니다. 정상 동작을 우해서는 sesson.timeout.ms의 1/3으로 설정해야 합니다.
- max.partition.fetch.bytes : 파티션 당 가져올 수 있는 최대 크기입니다.
- session.timeout.ms : 해당 주기마다 컨슈머가 종료된 것인지 판단합니다. 이 시간 전까지 하트비트를 보내지 않으면 컨슈머는 종료된 것으로 간주하고 컨슈머그룹에서 제외하고 리밸런싱을 시작합니다.
- enable.auto.commit : 백그라운드로 주기적으로 오프셋을 커밋합니다.
- auto.offset.reset : 카프카에서 초기 오프셋이 없거나 현재 오프셋이 더 이상 존재하지 않을 경우 다음 옵션으로 reset합니다.
- fetch.max.bytes : 한 번의 가져오기 요청으로 가져올 수 있는 최대 크기입니다.
- group.instance.id : 컨슈머의 고유한 식별자입니다. 만약 이 값이 설정되어 있다면 static 멤버로 간주되어 불필요한 리밸런싱을 하지 않습니다.
- isolation.level : 트랙잭션 컨슈머에서 사용되는 옵션입니다. read_uncomitted는 기본값으로 모든 메시지를 읽습니다. 하지만 read_committed는 트랜잭션이 완료된 메시지만 읽습니다.
- max.poll.records : 한 번의 poll() 요청으로 가죠오는 최대 메시지 수입니다.
- partition.assignment.strategy : 파티션 할당 전략이며, 기본값은 range입니다.
- fetch.max.wait.ms : fetch.min.bytes에 의해 설정된 데이터보다 적은 경우 요청에 대한 응답을 기다리는 최대 시간입니다.
카프카 컨슈머 설정 샘플 코드
아래는 위에서 설명한 설정 중에 JAVA 언어로 생성하는 컨슈머의 예시입니다. 간략히 설명하겠습니다.
- bootstrap.servers, group.id, enable.auto.commit, auto.offset.reset, key.deserializer, value.deserializer 옵션을 설정합니다.
- 해당 옵션으로 컨슈머를 생성합니다.
- polling 방식으로 1초마다 브로셔에서 배치형태로 토픽을 가져옵니다.
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Properties;
public class ConsumerSample {
public static void main(String[] args) {
Properties propts = new Properties(); //Properties 생성.
propts.put("bootstrap.servers", "kafka01.sample.test:9092,kafka02.sample.test:9092,kafka03.sample.test:9092"); // bootstrap.servers -> 연결할 브로커 설정.
propts.put("group.id", "consumer_01"); // 컨슈머 그룹 아이디 설정.
propts.put("enable.auto.commit", "true"); //오토 커밋을 사용 설정.
propts.put("auto.offset.reset", "latest"); //컨슈머 오프셋을 찾지 못하는 경우 latest로 초기화하여 가장 최근부터 메시지를 가져옵니다.
propts.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); //문자열을 사용했으므로 StringDeserializer 지정합니다.
propts.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(propts); //Properties 오브젝트로 컨슈머를 생성합니다.
consumer.subscribe(Arrays.asList("topic01")); // 구독할 토픽을 지정합니다.
try {
while (true) { // 메시지를 가져오기 위해 카프카에 계속 poll()을 합니다.
ConsumerRecords<String, String> records = consumer.poll(1000); //컨슈머는 1초마다 폴링합니다. 해당 시간만큼 블럭됩니다.
for (ConsumerRecord<String, String> record : records) { //poll()은 레코드 전체를 리턴합니다, 하나의 메시지만 가져오는 것이 아닌 배치형태로 가져오므로 반복문으로 하나씩 메세지를 처리합니다.
System.out.printf("Topic: %s, Partition: %s, Offset: %d, Key: %s, Value: %s\n",
record.topic(), record.partition(), record.offset(), record.key(), record.value());
}
}
} catch (Exception e){
e.printStackTrace();
} finally {
consumer.close(); //컨슈머를 종료합니다.
}
}
}
이상으로 간략한 카프카 컨슈머 설정 정리를 마치겠습니다.
카프카 컨슈머 설정에 대한 자세한 내용은 여기 링크를 참고바랍니다.
좀더 자세한 설명을 원하시는 분은 여기 링크로 ‘실전 카프카 개발부터 운영까지’ 책을 사서 보시길 추천드립니다.
참고 : 실전 카프카 개발부터 운영까지
“이 포스팅은 쿠팡 파트너스 활동의 일환으로, 이에 따른 일정액의 수수료를 제공받습니다.”