카프카 스키마 레지스트리

카프카 스키마 레지스트리

스키마 레스지트리란 스키마를 등록하고 관리하는 카프카와 별도로 구성된 독립형 애플리케이션입니다. 비상업적 용도로 쓸 경우 무료로 사용 가능합니다. 스키마 레지스트리에 대해 알아보고 코드 레벨 단에서의 적용 방법을 소개해드리겠습니다.

1. 카프카 스키마 레지스트리

1) 스키마 레지스트리 활용

  • 사용 절차
    • 프로듀서는 스키마 레지스트리에 스키마를 등록합니다.
    • 스키마 레지스트리는 프로듀서에 의해 등록된 스키마 정보를 카프카의 내부 토픽에 저장합니다.
    • 프로듀서는 스키마 레지스트리에 등록됙 스키마 ID와 메시지를 카프카로 전송합니다.
    • 컨슈머는 스키마 ID를 스키마 레지스트리로부터 읽어옵니다. 프로듀서가 전송한 스미카 ID와 메시지를 조합해서 읽을 수 있습니다.
    • 스키마 레지스트리를 이용하기 위해서는 스키마 레지스트리가 지원하는 데이터 포맷 중에 에이브로를 사용합니다.

2) 카프카 스키마 레지스트리의 에이브로 지원

  • 에이브로는 시스템, 프로그래밍 언어, 프로세싱 프레임워크 사이에서 데이터 교환을 도와주는 오픈소스 직력화 시스템.
  • 장점
    • JSON과 매핑됩니다.
    • 매우 간결한 데이터 포맷입니다.
    • JSON은 메시지마다 필드 네임들이 포함되어 전송되므로 효율이 떨어진다. 에이브로는 값만 보내므로 데이타 사이즈가 작아진다.
    • 에이브로는 바이너리 형태이므로 매우 빠릅니다.
    • 데이터 필드마다 데이터 타입을 정의할 수 있고, doc를 이용해 각 필드의 의미를 데이터를 사용하고자 하는 사용자들에게 정확하게 전달할 수 있습니다.
    Specification

3) 카프카 스키마 레지스트리 설치

  • 카프카 클러스터 새로 설치
# hosts
[zkhosts]
peter-zk01.foo.bar
peter-zk02.foo.bar
peter-zk03.foo.bar

[kafkahosts]
peter-kafka01.foo.bar
peter-kafka02.foo.bar
peter-kafka03.foo.bar

[kerberoshosts]
peter-zk01.foo.bar
# kafka.yml

---
- hosts: kafkahosts
  become: true
  connection: ssh
  roles:
    - common
    - kafka
  • 설치
ansible-playbook -i hosts kafka.yml
  • 잘 설치되지 않는다면 주키퍼(zookeeper.yml), 카프카(kafka.yml)를 순서대로 설치합니다. site.yml을 이용합니다.
# zoekepper.yml
---
- hosts: zkhosts
  become: true
  connection: ssh
  roles:
    - common
    - zookeeper
# site.yml
---
- import_playbook: zookeeper.yml
- import_playbook: kafka.yml

site.yml

zookeeper.yml

kafka.yml

hosts.txt

sudo wget <http://packages.confluent.io/archive/6.1/confluent-community-6.1.0.tar.gz> -0 /opt/confluent-community-6.1.0.tar.gz
sudo tar zxf /opt/confluent-community-6.1.0.tar.gz =C usr/local
sudo ln -s /usr/local/confluent-6.1.0 /usr/local/confluent
  • 스키마 레지스트리 설정
vi /usr/local/confluent/etc/schema-registry/schema-resistry.properties
  • 카프카 스키마 레지스트리 옵션 설정
listeners=http://0.0.0.0:8081
kafkastore.bootstrap.servers=PLAINTEXT://peter-kafka01.foo.bar:9092,peter-kafka02.foo.bar:9092,peter-kafka03.foo.bar:9092
kafkastore.topic=_schemas
schema.compatibility.level=full
  • 운영 환경에서는 로드밸러서 등을 이용해 이중화로 구성함.
  • 카프카 스키마 레지스트리의 system 설정
vi /etc/systemd/system/schema-registry.service
[Unit]
Description=schema registry
After=network.target

[Service]
Type=simple
ExecStart=/usr/local/confluent/bin/schema-registry-start /usr/local/confluent/etc/schema-registry/schema-registry.properties
Restart=always

[Install]
WantedBy=multi-user.target
sudo systemctl daemon-reload
sudo systemctl start schema-registry

curl -X GET <http://pf-kafka03.sample.test:8081/config>
  • 카프카 스키마 레지스트리 API
    • GET /schemas
      • 현재 스키마 레지스트리에 등록된 전체 스키마 리스트 조회
    • GET /schemas/ids/id
      • 스키마 ID로 조회
    • GET /schemas/ids/id/versions
      • 스키마 ID의 버전
    • GET /subjects
      • 스키마 레지스트리에 등록된 subject 리스트
      • subject는 토픽이름-key, 토픽이름-value 형태로 쓰임
    • GET /subjects/서브젝트 이름/versions
      • 특정 서브젝트의 버전 리스트 조회
    • GET /config
      • 전역으로 설정된 레벨 조회
    • GET /config/서브젝트 이름
      • 서브젝트에 설정된 호환성 조회
    • DELETE /subjects/서브젝트 이름
      • 특정 서브젝트 전체 삭제
    • DELETE /subjects/서브젝트 이름/versions/버전
      • 특정 서브젝트에서 특정 버전만 삭제
    Schema Registry Overview | Confluent Documentation

2. 스키마 레지스트리 실습

1) 파이썬을 이용한 스키마 레지스트리 활용

sudo yum -y install python3
python3 -m venv venv10
source venv10/bin/activate
pip install confluent-kafka[avro]

https://github.com/confluentinc/confluent-kafka-python

  • 에이브로 전송 프로듀서(python-avro-producer.py)
from confluent_kafka import avro
from confluent_kafka.avro import AvroProducer

value_schema_str = """
{"namespace": "student.avro",
 "type": "record",
 "doc": "This is an example of Avro.",
 "name": "Student",
 "fields": [
     {"name": "name", "type": ["null", "string"], "default": null, "doc": "Name of the student"},
     {"name": "class", "type": "int", "default": 1, "doc": "Class of the student"}
 ]
}
"""

value_schema = avro.loads(value_schema_str)
value = {"name": "Peter", "class": 1} # 전송할 메시지

def delivery_report(err, msg):
    """ Called once for each message produced to indicate delivery result.
        Triggered by poll() or flush(). """
    if err is not None:
        print('Message delivery failed: {}'.format(err))
    else:
        print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))

avroProducer = AvroProducer({
    'bootstrap.servers': 'peter-kafka01.foo.bar,peter-kafka02.foo.bar,peter-kafka03.foo.bar',
    'on_delivery': delivery_report,
    'schema.registry.url': '<http://peter-kafka03.foo.bar:8081>'
    }, default_value_schema=value_schema)

avroProducer.produce(topic='peter-avro2', value=value)
avroProducer.flush()
  • 메시지 전송
python python-avro-producer.py
  • 에이브로 컨슈머(python-avro-consumer.py)
from confluent_kafka import avro
from confluent_kafka.avro import AvroConsumer
from confluent_kafka.avro.serializer import SerializerError

value_schema_str = """
{"namespace": "student.avro",
 "type": "record",
 "doc": "This is an example of Avro.",
 "name": "Student",
 "fields": [
     {"name": "name", "type": ["null", "string"], "default": null, "doc": "Name of the student"},
     {"name": "class", "type": "int", "default": 1, "doc": "Class of the student"}
 ]
}
"""

value_schema = avro.loads(value_schema_str)

c = AvroConsumer({
    'bootstrap.servers': 'peter-kafka01.foo.bar,peter-kafka02.foo.bar,peter-kafka03.foo.bar',
    'group.id': 'python-groupid01',
    'auto.offset.reset': 'earliest',
    'schema.registry.url': '<http://peter-kafka03.foo.bar:8081>'},reader_value_schema=value_schema)

c.subscribe(['peter-avro2'])

while True:
    try:
        msg = c.poll(10)

    except SerializerError as e:
        print("Message deserialization failed for {}: {}".format(msg, e))
        break

    if msg is None:
        continue

    if msg.error():
        print("AvroConsumer error: {}".format(msg.error()))
        continue

    print(msg.value())

c.close()
python python-avro-consumer.py
  • 스키마 확인
curl <http://pf-kafka03.sample.test:8081/schemas> | python -m json.tool
  • 메시지 형식이 바뀔 때마다 스키마를 변경하면 되므로 쉽게 메시지 형식 변경이 가능.

이상으로 카프카 스키마 레지스트리에 대한 설명을 마치겠습니다.

카프카 도입 사례에 대한 설명이 필요하신 분은 여기 링크를 참고바랍니다.

좀더 자세한 설명을 원하시는 분은 실전 카프카 개발부터 운영까지 도서를 참고바립니다.


참고 : 실전 카프카 개발부터 운영까지

아래는 카프카 관련 국내 도서입니다.

실전 카프카 개발부터 운영까지:데이터플랫폼의 중추 아파치 카프카의 내부동작과 개발 운영 보안의 모든것, 책만 아파치 카프카의 모든 것 세트 : 카프카 데이터 플랫폼의 최강자+실전 카프카 개발부터 운영까지, 책만 (서점추천) 헤드 퍼스트 디자인 패턴 + 실전 카프카 개발부터 운영까지 (전2권) (서점추천) 24단계 실습으로 정복하는 쿠버네티스 + 실전 카프카 개발부터 운영까지 (전2권), 위키북스 (서점추천) 데이터 파이프라인 핵심 가이드 + 실전 카프카 개발부터 운영까지 (전2권) (서점추천) 몽고DB 완벽 가이드 + 실전 카프카 개발부터 운영까지 (전2권) (서점추천) 클린 코드의 기술 + 실전 카프카 개발부터 운영까지 (전2권), 영진닷컴 (서점추천) 카프카 핵심 가이드 + 디지털 플랫폼 전략 수립을 위한 쿠버네티스 실전 활용서 (전2권), 제이펍 아파치 카프카 애플리케이션 프로그래밍 with 자바:카프카의 개념부터 스트림즈 커넥트 스프링 카프카까지, 비제이퍼블릭 카프카 핵심 가이드 개정증보판, 제이펍 실전 아파치 카프카:애플리케이션 개발부터 파이프라인 사물인터넷 데이터 허브 구축까지, 한빛미디어 카프카 데이터 플랫폼의 최강자:실시간 비동기 스트리밍 솔루션 Kafka의 기본부터 확장 응용까지, 책만 카프카 스트림즈와 ksqlDB 정복:실시간 데이터 처리, 에이콘출판 카프카 핵심가이드:실시간 데이터와 스트림 프로세싱, 제이펍

“이 포스팅은 쿠팡 파트너스 활동의 일환으로, 이에 따른 일정액의 수수료를 제공받습니다.”

Back to top