카프카 커넥트 활용법

카프카 커넥트 활용법

카프카 커넥트 활용법에 대해 알아보도록 하겠습니다. 카프카 커넥트는 데이터 베이스 같은 외부 시스템과 카프카를 연결하기 위한 프레임워크입니다. 코딩을 작성하지 않고도 대용량의 데이터를 카프카의 안팎으로 손쉽게 이동시킬 수 있음.

카프카 커넥트 장점

  • 데이터 중심 파이프라인 : 커넥트를 이용해 카프카로 데이터를 보내거나, 카프카로부터 데이터를 가져옵니다.
  • 유연성과 확장성
    • 커넥트는 테스트 및 일회성 작업을 위한 단독 모드 (standaione mode) 실행
    • 대규모 운영 환경을 위한 분산 모드(distributed mode)(클러스터형) 실행
  • 재사용성과 기능 확장
    • 기존 커넥트들을 활용 가능하고 운영 환경에서의 요구사항에 맞춰 빠르게 확장 가능
    • 손쉬운 확장으로 운영 오버헤드를 낮출 수 있음.
  • 장애 및 복구
    • 카프카 커넥트를 부산 모드로 실행하면 워커 노드(worker node)의 장애 상황에도 유연하게 대응 가능함.

1. 카프카 커넥트의 핵심 개념

  • 일반적인 구성
    • 소스 → 소스 커넥트 → 카프카 → 싱크 커넥트 → 싱크
    • 소스 커넥트 : 프로듀서의 역할.
    • 싱크 커넥트 : 컨슈머의 역할.
  • 워커
    • 분산 모드에서 커넥트는 다수의 워커로 구성할 수 있음.
    • 워커는 카프카 커넥트 프로세스가 실행되는 서버 또는 인스턴스 등을 의미함.
    • 커넥터나 태스크들이 워커에서 실행됨.
  • 커넥트
    • 직접 데이터를 복사하지 않고, 데이터를 어디에서 어디로 복사해야 하는지 작업을 정의하고 관리하는 역할
    • JDBC 소스 커넥터 : 관계형 데이터 베이스 → 카프카
    • HDFS 싱크 커넥터 : 카프카 → HDFS
  • 태스크
    • 커넥터가 정의한 작업을 직접 수행하는 역할.
    • 커넥터는 데이터 전송에 관한 작업을 정의한 후 각 태스크들을 워커에 분산함. 그 후 태스크들은 커넥터가 정의한 작업대로 데이터를 복사함.
    • 소스 태스크와 싱크 태스크로 분류됨.

2. 카프카 커넥트의 내부 동작

  • 카프카 커넥트의 내부 동작은 관련 공식 문서를 참고하시기 바랍니다.

Guide for Kafka Connector Developers | Confluent Documentation

3. 단독 모드 카프카 커넥트

  • 새로운 클러스터 구성
ansible-playbook -i hosts kafka1.yml
  • 각 ansible, zookeeper, kafka 서버는 2장 참고

1) 파일 소스 커넥터 실행

  • test.txt → 파일 소스 커넥터 → 카프카 → 파일 싱크 커넥터 → test-link.txt
  • text.txt 파일 생성
echo "hello-1" > test.txt
echo "hello-2" >> test.txt
echo "hello-3" >> test.txt
cat test.txt
  • 소스 커넥터 설정
sudo vi /usr/local/kafka/config/connect-file-source.properties
name=local-file-source # 커낵터 식별 이름
connector.class=FileStreamSource # 커넥터에서 사용하는 클래서
tasks.max=1 # 실제 작업을 처리하는 태스크의 최대 수 1로 지정
file=/home/ec2-user/test.txt # 파일 소스 커넥터가 읽을 파일
topic=connect-test # 파일 소스 커넥터가 읽은 내용을 카프카의 connect-test 토픽으로 전송
  • 단독 모드 카프카 커넥트 설정
sudo vi /usr/local/kafka/config/connect-standalone.properties
bootstrap.servers=localhost:9092  #브로커 주소 localhost:9092
key.converter=org.apache.kafka.connect.json.JsonConverter # 송수신 데이타 포맷
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false # 스키마 포함 구조 사용 여부
value.converter.schemas.enable=false 
offset.storage.file.filename=/tmp/connect.offsets # 재처리를 위한 오프셋 파일 저장 위치
offset.flush.interval.ms=10000  # 오프셋 플러수 주기 설정
  • 단독 모드 커넥트 실행
# 실행
sudo /usr/local/kafka/bin/connect-standalone.sh -daemon /usr/local/kafka/config/connect-standalone.properties /usr/local/kafka/config/connect-file-source.properties

# 카프카 커넥트 상태 확인
curl <http://localhost:8083/connectors/local-file-source> | python -m json.tool

2) 파일 싱크 커넥터 실행

  • 카프카 커넥트의 REST API를 이용한 실행
curl --header "Content-Type: application/json" --header "Accept: application/json" --request PUT --data '{
 "name":"local-file-sink", #커낵터 식별 이름
 "connector.class":"FileStreamSink", # 커낵터 사용 클래스
 "tasks.max":"1", # 실제 작업을 처리하는 태스크의 최대 수
 "file":"/home/ec2-user/test.sink.txt", # 파일 싱크 커낵터가 카프카의 connect-test에서 가져온 메시지를 이 경로에 저장
 "topics":"connect-test" # 파일 싱크 커넥터가 카프카의 connect-test에서 메시지를 가져옴.
}' <http://localhost:8083/connectors/local-file-sink/config>
# 파일 싱크 커넥터 실행 확인
curl <http://localhost:8083/connectors/local-file-sink> | python -m json.tool
# 실행 결과 확인
cat test.sink.txt
  • 커넥터 종료
sudo pkill -f connect
  • 카프카 커넥트 REST API
API 옵션설명
GET /커넥트의 버전과 클러스터 ID 확인
GET /connectors커넥터 리스트 확인
GET /connectors/커넥터 이름커넥터 이름의 상세 내용 확인
GET /connectors/커넥터 이름/config커넥터 이름의 config 정보 확인
GET /connectors/커넥터 이름/status커넥터 이름의 상태 확인
PUT /connectors/커넥터 이름/config커넥터 config 설정
PUT /connectors/커넥터 이름/pause커넥터의 일시 정지
PUT /connectors/커넥터 이름/resume커넥터의 다시 시작
DELETE /connectors/커넥터 이름커넥터의 삭제
GET /connectors/커넥터 이름/tasks커넥터의 태스크 정보 확인
GET /connectors/커넥터 이름/task/태스크 ID/status커넥터에서 특정 태스크의 상태 확인
POST /connectors/커넥터 이름/task/태스크 ID/restart커넥터에서 특정 태스크 재시작

4. 분산 모드 카프카 커넥트

  • 안정적인 운영을 위해 리플리케이션 팩터 수를 3으로 설정.
  • 커넥터 종류

Home

sudo cat /usr/local/kafka/config/connect-distributed.properties
bootstrap.servers=peter-kafka01.foo.bar:9092,peter-kafka02.foo.bar:9092,peter-kafka03.foo.bar:9092
group.id=peter-connect-cluster # 분산 모드의 그룹 아이디
key.converter=org.apache.kafka.connect.converters.ByteArrayConverter
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
offset.storage.topic=connect-offsets # 커넥터들의 오프셋 추적을 위해 저장하는 카프카 내부 토픽 
offset.storage.replication.factor=3 # 리플리케이션 수 3
offset.storage.partitions=25 # 파티션 수 25
config.storage.topic=connect-configs # 커넥터들의 설정을 저장하는 카프카 내부 토픽
config.storage.replication.factor=3 # 리플리케이션 수 3
config.storage.partitions=1 # 파티션 수 1
status.storage.topic=connect-status # 커넥터들의 상태를 저장하는 카프카 내부 토픽
status.storage.replication.factor=3 # 리플리케니션 수 3
status.storage.partitions=5 # 파티션 수 5
offset.flush.interval.ms=10000
sudo systemctl start kafka-connect
sudo systemctl status kafka-connect

5.커넥터 기반의 미러 메이커 2.0

  • 원격 토픽과 에일리어스 기능
    • 에일리어스 : 양방향 미러 시 무한 반복으로 꼬이는 문제를 방지

KIP-382: MirrorMaker 2.0 – Apache Kafka – Apache Software Foundation

  • 카프카 클러스터 통합
    • 다수의 클러스터에 있는 토픽들을 다운스트림 컨슈머가 통합 가능.
  • 무한 루프 방지
    • 에일리어스 기능으로 무한 루프 방지
  • 토픽 설정 동기화
    • 소스 토픽에서의 설정 변경을 전파함.
  • 안전한 장소로 내부 토픽 활용
  • 카프카 커넥트 지원

KIP-382: MirrorMaker 2.0 – Apache Kafka – Apache Software Foundation

  • 미러 메이커 2.0 실행 방법
    • 전용 dedicated 미러 메이커 클러스터
    • 분산 커넥트 클러스터에서 미러 메이커 커넥터 이용
    • 독립형인 커넥트 워커
    • 레거시 방식인 스크립트 사용
  • 미러 클러스터 설치
ansible-playbook -i hosts kafka2.yml
#kafka2.yml
---
- hosts: zkhosts
  become: true
  connection: ssh
  vars:
    - brokerid: "{{ inventory_hostname | regex_search('(peter-zk0[1-9]\\\\.foo\\\\.bar)') | regex_replace('\\\\.foo\\\\.bar', '') | regex_replace('^peter-zk0', '') }}"
    - zookeeperinfo: peter-zk01.foo.bar:2181,peter-zk02.foo.bar:2181,peter-zk03.foo.bar:2181/kafka2
    - dir_path: /data/kafka2-logs
  roles:
    - common
    - kafka
#토픽 생성
/usr/local/kafka/bin/kafka-topics.sh --bootstrap-server peter-zk01.foo.bar:9092 --create --topic peter-mirror01 --partitions 1 --replication-factor 3

#메세지 전송
/usr/local/kafka/bin/kafka-console-producer.sh --bootstrap-server peter-zk01.foo.bar:9092 --topic peter-mirror01

#미러 메이커 2.0 실행
curl --header "Content-Type: application/json" --header "Accept: application/json" --request PUT --data '{"name": "peter-mirrormaker2","connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector","tasks.max": "1","source.cluster.alias": "src","target.cluster.alias": "dst","source.cluster.bootstrap.servers": "peter-zk01.foo.bar:9092,peter-zk02.foo.bar:9092,peter-zk03.foo.bar:9092","target.cluster.bootstrap.servers": "peter-kafka01.foo.bar:9092,peter-kafka02.foo.bar:9092,peter-kafka03.foo.bar:9092","replication.factor": "3","topics": ".*" }' <http://peter-kafka01.foo.bar:8083/connectors/peter-mirrormaker2/config>

#REST API 이용해 확인
curl <http://peter-kafka01.foo.bar:8083/connectors/peter-mirrormaker2/status> | python -m json.tool

#토픽 리스트 확인
/usr/local/kafka/bin/kafka-topics.sh --bootstrap-server peter-kafka01.foo.bar:9092 --list

#메세지 미러링 확인 - 콘솔 컨슈머 이용
/usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server peter-kafka01.foo.bar:9092 --topic src.peter-mirror01 --from-beginning

# 커넥트 종료
sudo systemctl stop kafka-connect

# 커넥터 상태 확인
curl <http://peter-kafka02.foo.bar:8083/connectors/peter-mirrormaker2/status> | python -m json.tool

# 상태 확인 
curl <http://peter-kafka02.foo.bar:8083/connectors/peter-mirrormaker2/status> | python -m json.tool

# 종료
sudo systemctl stop kafka-connect
sudo systemctl stop kafka-server
  • 데비지움 debezium에서 MySQL, 몽고DB, PostgreSQL 등 데이터베이스 데이터와 카프카를 연동하기 위한 커넥트 공개하였으니 해당 라이브러리를 활용하셔도 됩니다.

카프카 도입 사례에 대한 설명이 필요하신 분은 여기 링크를 참고바랍니다.

좀더 자세한 설명을 원하시는 분은 실전 카프카 개발부터 운영까지 도서를 참고바립니다.


참고 : 실전 카프카 개발부터 운영까지

아래는 카프카 관련 국내 도서입니다.

실전 카프카 개발부터 운영까지:데이터플랫폼의 중추 아파치 카프카의 내부동작과 개발 운영 보안의 모든것, 책만 아파치 카프카의 모든 것 세트 : 카프카 데이터 플랫폼의 최강자+실전 카프카 개발부터 운영까지, 책만 (서점추천) 헤드 퍼스트 디자인 패턴 + 실전 카프카 개발부터 운영까지 (전2권) (서점추천) 24단계 실습으로 정복하는 쿠버네티스 + 실전 카프카 개발부터 운영까지 (전2권), 위키북스 (서점추천) 데이터 파이프라인 핵심 가이드 + 실전 카프카 개발부터 운영까지 (전2권) (서점추천) 몽고DB 완벽 가이드 + 실전 카프카 개발부터 운영까지 (전2권) (서점추천) 클린 코드의 기술 + 실전 카프카 개발부터 운영까지 (전2권), 영진닷컴 (서점추천) 카프카 핵심 가이드 + 디지털 플랫폼 전략 수립을 위한 쿠버네티스 실전 활용서 (전2권), 제이펍 아파치 카프카 애플리케이션 프로그래밍 with 자바:카프카의 개념부터 스트림즈 커넥트 스프링 카프카까지, 비제이퍼블릭 카프카 핵심 가이드 개정증보판, 제이펍 실전 아파치 카프카:애플리케이션 개발부터 파이프라인 사물인터넷 데이터 허브 구축까지, 한빛미디어 카프카 데이터 플랫폼의 최강자:실시간 비동기 스트리밍 솔루션 Kafka의 기본부터 확장 응용까지, 책만 카프카 스트림즈와 ksqlDB 정복:실시간 데이터 처리, 에이콘출판 카프카 핵심가이드:실시간 데이터와 스트림 프로세싱, 제이펍

“이 포스팅은 쿠팡 파트너스 활동의 일환으로, 이에 따른 일정액의 수수료를 제공받습니다.”

Back to top