카프카 프로듀서 구성에 대해 알아보도록 하겠습니다. 파티셔너 역할, 프로듀서의 배치 전송 기능, 프로듀서의 신뢰성있는 전송 방식에 대해 소개하도록 하겠습니다.
목차
Toggle1. 파티셔너
- 카프카 프로듀서가 토픽으로 메세지를 보낼 때 어떤 파티션으로 보내야 할 지 결정하는 역할을 담담.
- 메시지의 키를 해시처리해서 파티션을 결정함. 키가 동일하면 같은 파티션에 할당됨.
1) 라운드 로빈 전략
- 전송시 키값을 지정하지 않으면 키값이 null이 되고 기본값인 라운드 로빈(round-robin)알고리즘을 사용해 카프카 프로듀서는 목적지 토픽의 파티션들로 레코드를 랜덤 전송함.
- 배치 전송에 지연을 발생시킬 수 있음. 동일한 키값이 3개(설정 변경 가능) 이상 모였을 때 배치 전송되기 때문에 3개가 모이지 않으면 전송하지 않고 대기하게 됨.
2) 스티키 파티셔닝 전략
- 하나의 파티션에 레코드 수를 먼저 채워서 카프카로 빠르게 배치 전송하는 전략.
- 키값에 상관없이 전송하기 위한 배치수를 채우는 방식.
- 라운드 로빈 전략에 비해 약 30% 지연시간이 감소, 프로듀서의 CPU 사용률도 줄어드는 효과.
- 전송 메시지의 순서가 중요하지 않다면 이 방식을 사용할 것을 추천합니다.
Apache Kafka Producer Improvements: Sticky Partitioner
2. 카프카 프로듀서의 배치 전송 기능
- 토픽의 처리량 증가 방법 : 토픽을 파티션으로 나눠 처리, 배치 전송.
- 배치 전송 옵션
- buffer.memory : 카프카로 메세지를 전송하기 위해 담아두는 프로듀서 전체 메모리 크기 옵션. 파티션이 3개라면 이 3개의 메모리 크기를 의미함. 기본값 32MB, 조정 가능.
- batch.size : 전송을 위한 배치 크기 옵션. 하나의 파티션 안에 배치 크기를 의미함.기본값 16KB로 설정. 조정 가능. buffer.memory > batch.size
- linger.ms : 배치 전송을 위해 메모리에서 대기하는 메시지들의 최대 대기시간. 기본값 0ms. 0이면 배치 전송을 기다리지 않고 바로 전송됨.
- 파티션이 3개라면 batch.size(16KB) * 3 이므로 buffer.memory를 48KB 보다 큰 값으로 설정한다.
- 배치 전송은 I/O를 줄일 수 있어 매유 효율적이고, 카프카의 요청 수를 줄여주는 효과가 있음.
- 대용량 처리를 위해서는 배치 설정, 지연 없는 처리를 위해서는 배치 설정 제거.
- 압축 기능을 같이 사용
- gzip, snappy. lz4, zstd 등 압축 포맷 지원
- 압축률 선호 시 gzip, zstd
- 낮은 지연 시간 선호 시 lz4, snappy
3. 중복 없는 전송
- 멱등성이란 동일한 작업을 여러 번 수행하더라도 결과가 달라지지 않는 것을 의미.
- 자세한 내용은 아래 링크를 참조.
- 메시지 전송 방식
- 적어도 한 번 전송 at-least-once
- 네트워크 회선이나 기타 장애 상황 시 일부 메시지 중복이 발생할 수 있지만 최소한 하나의 메시지는 반드시 보장된다라는 법칙. 카프카의 기본 전송 방식
- 최대 한 번 전송 at-most-once
- ACK를 받지 못하더라도 재전송하지 않음. 일부 메시지 손실은 발생하지만 중복을 피할 수 있음.
- 높은 처리량을 필요로 하는 대량의 로그 수집이나 IoT 같은 환경에서 사용
- 정확히 한 번 전송 exactly-once
- PID(카프카 프로듀서 고유번호)와 메시지 번호로 중복 없는 전송이 가능함.
- 메시지에 PID와 메시지 번호를 추가하여 전송.
- 이 값들은 프로듀서에서 자동 생성.
- 오버헤드 발생으로 기존 대비 20% 성능 감소함.
- 관련 내용은 아래를 참고
- 중복없는 전송을 위한 프로듀서 설정
- enable.idempotence
- 값 : true
- 프로듀서가 중복없는 전송을 허용할 지 결정하는 옵션. 기본값은 false.
- true 설정 시 아래 옵셥은 반드시 변경해야 함. 그렇지 않으면 ConfigException 발성
- max.in.flight.requests.per.connection
- 값 : 1~5
- ACK를 받지 않은 상태에서 하나의 커넥션에 보낼 수 있는 최대 요청 수
- 기본값은 5, 5이하로 설정해야 함.
- acks
- 값 : all
- 카프카 프로듀서 acks와 관련된 옵션으로서, 기본값은 1이며 all로 설정해야 함.
- retries
- 값 : 5
- ACK를 받지 못한 경우 재시도를 해야 함. 0보다 큰 값으로 설정해야 함.
- enable.idempotence
- 적어도 한 번 전송 at-least-once
- 중복없는 전송을 위한 카프카 프로듀서 설정 파일 셋팅 방법
#producer.config 파일 내용
enable.idempotence=true
max.in.flight.requests.per.connection=5
retries=5
acks=all
- 콘솔 프로듀서를 이용해 중복없는 토픽 전송
kafka-console-producer.sh --bootstrap-server pf-kafka01.sample.test:9092 --topic test04 \\\\
producer.config /home/ec2-user/producer.config
- snapshot 확인 : 브로커가 PID와 시컨스 번호를 주기적으로 저장하는 파일
cd /data/kafka-logs/test02-@/
ls
# 파일 존재하지 않으면 카프카 클러스터의 다른 브로커로 ssh 접속한 후 동일한 경로에서 확인함.
# 그래도 존재하지 않으면 test04 토픽의 파티션 리더 브로커를 강제 종료
kafka-dump-log.sh --print-data-log --files /data/kafka-logs/test04-@/000000001.snapshot
4. 정확히 한 번 전송
- 중복 없는 전송 방식이 정확히 한 번 전송한다는 의미는 아님.
- 이건 트랙젹션과 같은 전체적인 프로세스 처리를 의미함.
- 카프카에서는 트랙잭션 API로 구현함.
- 트랙잭션에 대한 설명은 아래를 참고바랍니다.
1) 디자인
- 카프카 프로듀서가 정확히 한 번 방식으로 메시지를 전송할 때 원자적으로(atomic) 처리됨.
- atomic : 트랙잭션의 성질 중 하나로 전체 실행 또는 전체 실패
- 트랙잭션 코디네이터(transaction coordinator)가 서버 측에 존재함.
- 프로듀서에 의해 전송된 메시지를 관리하며 커밋 또는 중단 등을 표시.
- 트랜잭션도 트랜잭션 로그를 카프카의 내부 토픽인 __transaction_state에 저정함.
- __transaction_state 설정값
- tranaction.state.log.num.partition=50
- tranaction.state.log.replication.factor=3
2) 정확히 한 번 전송 Java 코드 예제
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class ExactlyOnceProducer {
public static void main(String[] args) {
String bootstrapServers = "peter-kafka01.foo.bar:9092";
Properties props = new Properties();
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true"); // 정확히 한번 전송을 위한 설정
props.setProperty(ProducerConfig.ACKS_CONFIG, "all"); // 정확히 한번 전송을 위한 설정
props.setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5"); // 정확히 한번 전송을 위한 설정
props.setProperty(ProducerConfig.RETRIES_CONFIG, "5"); // 정확히 한번 전송을 위한 설정
props.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "peter-transaction-01"); // 정확히 한번 전송을 위한 설정
Producer<String, String> producer = new KafkaProducer<>(props);
producer.initTransactions(); // 프로듀서 트랜잭션 초기화
producer.beginTransaction(); // 프로듀서 트랜잭션 시작
try {
for (int i = 0; i < 1; i++) {
ProducerRecord<String, String> record = new ProducerRecord<>("peter-test05", "Apache Kafka is a distributed streaming platform - " + i);
producer.send(record);
producer.flush();
System.out.println("Message sent successfully");
}
} catch (Exception e){
producer.abortTransaction(); // 프로듀서 트랜잭션 중단
e.printStackTrace();
} finally {
producer.commitTransaction(); // 프로듀서 트랜잭션 커밋
producer.close();
}
}
}
카프카 프로듀서의 설정 관련 정보가 필요하신 분은 여기 링크를 참고부탁드립니다.
좀더 자세한 설명을 원하시는 분은 실전 카프카 개발부터 운영까지 도서를 참고바립니다.
참고 : 실전 카프카 개발부터 운영까지
아래는 카프카 관련 국내 도서입니다.
“이 포스팅은 쿠팡 파트너스 활동의 일환으로, 이에 따른 일정액의 수수료를 제공받습니다.”