카프카 컨슈머 구성 요소에 대해 알아보겠습니다. 이번 포스팅을 통해 컨슈머 그룹, 오프셋 관리, 컨슈머들의 파티션 할당 정책, 트랜잭션 컨슈머의 내부 동작에 대해 이해하고 안정적인 운영을 위한 컨슈머 설정 방법을 소개하도록 하겠습니다.
목차
Toggle목표
- 카프카 컨슈머 그룹
- 오프셋 관리
- 카프카 컨슈머들의 파티션 할당 정책
- 트랜잭션 컨슈머의 내부 동작
1. 카프카 컨슈머 오프셋 관리
- 오프셋(offset) : 카프카에서 메시지의 위치를 나타내는 숫자값.
- 카프카 컨슈머 그룹은 자신의 오프셋 정보를 토픽에 저장함. 즉, __consumer_offsets 토픽에 각 컨슈머 그룹별로 오프셋 위치 정보가 기록됨. 컨슈머 그룹, 토픽, 파티션 등의 내용이 통합되어 기록.
- 오프셋값은 현재 읽은 위치가 아닌 다음으로 읽어야 할 위치를 말함.
- __consumer_offsets 은 내부 토픽으로 파티션 수와 리플리케이션 팩터수를 설정할 수 있음. server.propeties 설정 파일에 저장.
- offsets.topic.num.partitions : 기본값 50
- offsets.topic.replication.factor : 기본값 3
2. 그룹 코디네이터
- 리밸런싱 : 컨슈머 그룹에서 각 컨슈머들에게 작업을 균등하게 분해하는 동작
- 그룹 코디네이터 : 컨슈머 그룹 관리르 위한 코디네이터
- 컨슈머 그룹이 구독한 토픽의 파티션들과 그룹의 멤버들을 트래킹하는 역할.
- 카프카 컨슈머 하트비트 옵션
- heartbeat.interval.ms
- 기본값 : 3000
- 그룹 코디네이터와 하트비트 인터벌 시간. 해당 시간은 session.timeout.ms보다 낮게 설정함. 1/3 수준이 적절함.
- session.timeout.ms
- 기본값 : 10000
- 어떤 컨슈머가 특정 시간 안에 하트비트를 받지 못하면 문제가 발생했다고 판단해 컨슈머 그룹에서 해당 컨슈머는 제거되고 리밸런싱 동작이 일어남.
- max.poll.interval.ms
- 기본값 : 300000
- 컨슈머는 주기적으로 poll을 호출해 토픽으로부터 레코드들을 가져옴. poll 호출 후 최대 5분간 poll 호출이 없으면 컨슈머에 문제가 있는 것으로 판단해 리밸런싱함.
- heartbeat.interval.ms
- 그룹 코디네이터가 컨슈머의 다운을 빠르게 감지하도록 설정할 경우
- 일시적인 컨슈머의 타임아웃이나 일시적인 TCP 패킷 손실로 인해 원치 않은 리밸런싱이 빈번하게 발생.
- 그룹 코디네이터가 컨슈머의 다운을 느리게 감지하도록 설정할 경우
- 그 시간만큰 해당 파티션의 메시지를 읽지 못하는 현상이 발생할 수 있음.
- 기본값 설정을 유지하길 권장함.
3. 스태틱 멤버십
- 불필요한 리밸러싱 방어 역할.
- 컨슈머 그룹 내에서 컨슈커가 재시작 등으로 그룹에서 나갔다가 다시 합류하더라도 리밸런성이 일어나지 않음.
- 아파치 카프카 2.3 이상에서만 지원.
- 설정 방법
- group.instance.id = consumer-hostname1 → null이 아닌 id 로 설정.
- session.timeout.ms를 기본값보다는 큰 값으로 조정해야 함. 컨슈머의 재시작 시간을 고려해서 적절한 시간값으로 설정. 재시작 시간이 2분이면 2분보다 큰 값으로 설정함.
- 파이썬 컨플루언트 실습
#아마존 리눅스 2
sudo yum -y install python3
python3 -m venv venv6
source venv6/bin/activate
pip install confluent-kafka
sudo yum -y install git
git clone <https://github.com/onlybooks/kafka2>
cd kafka2/chapter6/
python consumer_standard.py
# 파이션 가상환경 나갈 때
deactivate
- 컨플루언트 파이썬 코드 예제
https://github.com/confluentinc/confluent-kafka-python
- 스태틱 컨슈머 설정
from confluent_kafka import Consumer
import socket
hostname = socket.gethostname()
broker = 'peter-kafka01.foo.bar'
group = 'peter-consumer02'
topic = 'peter-test06'
c = Consumer({
'bootstrap.servers': broker,
'group.id': group,
'session.timeout.ms': 30000,
'group.instance.id': 'consumer-' + hostname,
'auto.offset.reset': 'earliest'
})
c.subscribe([topic])
while True:
msg = c.poll(1.0)
if msg is None:
continue
if msg.error():
print("Consumer error: {}".format(msg.error()))
continue
print('Topic: {}, '
'Partition: {}, '
'Offset: {}, '
'Received message: {}'.format(msg.topic(),
msg.partition(),
msg.offset(),
msg.value().decode('utf-8')))
c.close()
- 스태틱 멤버십 기능 적용 추천함!
4. 카프카 컨슈머 파티션 할당 전략
- 파티션 할당 전략 : partition.assignment.strategy
- 레인지 파티션 할당 전략 : RangeAssignor
- 파티션 할당 전략의 기본값으로서 토픽별로 할당 전략을 사용함. 동일한 키를 이용하는 2개 이상의 토픽을 컨슘할 때 유용함
- 라운드 로빈 파티션 할달 전략 : RoundRobinAssignor
- 사용 가능한 파티션과 컨슈머들을 라운드 로빈으로 할당함. 균등한 분배 가능.
- 스티커 파티션 할당 전략 : StickyAssignor
- 컨슈머가 컨슘하고 있는 파티션을 계속 유지할 수 있음.
- 협력적 스티커 전략 : CooperativeStickyAssignor
- 스티키 방식과 유사하지만, 전체 일시 정지가 아닌 연속적인 재조정 방식임.
- 레인지 파티션 할당 전략 : RangeAssignor
1) 레인지 파티션 할당 전략
- 기본값으로 먼저 구독하는 토픽에 대한 파티션을 순서대로 나열한 후 컨슈머를 순서대로 정렬, 토픽 별로 균등하게 파타션에 컨슈머를 할당함. 남으면 앞 쪽에 할당함.
- 토픽 1에 파티션 3, 토픽 2에 파티션 3이 있고 컨슈머가 2개 있다고 가정
- 3/2 = 1.5 이므로 토픽 1에 대해 컨슈머 1은 2개 파티션 할당, 컨슈머 2는 1개 할당 됨.
- 총 컨슈머 1은 4개 파티션 할당, 컨슈머 2는 2개 파티션 할당됨.
- 토픽 1의 파티션 0에 키 abc, 토픽 2의 파티션 1에 키 abc가 저장되면 컨슈머 1은 abc 를 처리하게 됨. 따라서 동일한 키를 이용하는 2개 이상의 토픽을 컨슘할 때 유용함.
- 토픽 1에 파티션 3, 토픽 2에 파티션 3이 있고 컨슈머가 2개 있다고 가정
- 카프카 컨슈머에 균등하게 파티션이 분배되지 않으므로 컨슈머 그룹은 불균형한 상태로 운영될 수 있음.
2) 라운드 로빈 파티션 할당 전략
- 전체 파티션과 컨슈머를 나열하고 순차적으로 할당함.
- 할당 예시
- 토픽 2개에 각 토픽 당 파티션 3개, 컨슈머 2개일 경우
- 토픽1-파티션0 : 컨슈머 1
- 토픽1-파티션1 : 컨슈머 2
- 토픽1-파티션2 : 컨슈머 1
- 토픽2-파티션0 : 컨슈머 2
- 토픽2-파티션1 : 컨슈머 1
- 토픽2-파티션2 : 컨슈머 2
- 토픽 2개에 각 토픽 당 파티션 3개, 컨슈머 2개일 경우
- 파티션과 컨슈머를 더욱 균등하게 매핑함.
3) 스티커 파티션 할당 전략
- 이 전략의 목적은 가능한 한 균형 잡힌 파티션 할당을 우선시하며 재할당이 발생할 때 되도록 기존의 할당된 파티션 정보를 보장하는 것임.
- 라운드 로빈 방식에서는 컨슈머가 죽으면 나머지 리밸런싱됨. 기존과 비교해 다시 할당됨.
- 이 전략은 기존은 유지되고 죽은 컨슈머의 파티션들을 나머지 컨슈머가 균등하게 나눠갖게 됨.
4) 협력적 스티커 파티션 할당 전략
- 스티키 파티션 할당 전략과 유사하게 더 고도화된 방식임.
- 리밸런싱될 때 전체 컨슈머의 동작이 멈추는 것을 방지하기 위함.
- 동작 중인 컨슈머에는 영향을 주지 않고 안전하게 파티션의 소유권을 이동하기 위한 방법임.
- 이 방식으로 컨슈머의 확장 및 축소, 롤링 지시작 등의 작업 진행 시 컨슈머 리밸런싱에 대한 부담이 감소.
- 기존 리밸런싱 방식인 EAGER와 COOPERATIVE 프로토콜 방식의 성능 비교 결과
Incremental Cooperative Rebalancing in Apache Kafka
5) 정확히 한 번 컨슈머 동작
- 예제 코드
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.Arrays;
import java.util.Properties;
public class ExactlyOnceConsumer {
public static void main(String[] args) {
String bootstrapServers = "peter-kafka01.foo.bar:9092";
Properties props = new Properties();
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "peter-consumer-01");
props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.setProperty(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"); // 정확히 한번 전송을 위한 설정
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("peter-test05"));
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("Topic: %s, Partition: %s, Offset: %d, Key: %s, Value: %s\\n",
record.topic(), record.partition(), record.offset(), record.key(), record.value());
}
consumer.commitAsync();
}
} catch (Exception e) {
e.printStackTrace();
} finally {
consumer.close();
}
}
}
- ISOLATION_LEVEL_CONFIG 설정만 추가하면 트랜잭션 컨슈머로 동작함.
- read_uncommitted : 기본값, 모든 메시지를 읽을 수 있음.
- read_committed : 트랜잭션이 완료된 메시지만 읽을 수 있음.
- 카프카 컨슈머의 동작까지 정확히 한 번 처리가 가능해지려면 ‘컨슘-메시지 처리- 프로듀싱’ 동작이 모두 하나의 트랜잭션으로 처리돼야 함.
이상으로 카프카 컨슈머에 대한 내용이었습니다.
좀더 자세한 설명을 원하시는 분은 실전 카프카 개발부터 운영까지 도서를 참고바립니다.
카프카 컨슈머 설정에 대한 내용이 궁금하신 분은 여기 링크를 참고바랍니다.
참고 : 실전 카프카 개발부터 운영까지
아래는 카프카 관련 국내 도서입니다.
“이 포스팅은 쿠팡 파트너스 활동의 일환으로, 이에 따른 일정액의 수수료를 제공받습니다.”