카프카 스키마 레지스트리

카프카 스키마 레지스트리

스키마 레스지트리란 스키마를 등록하고 관리하는 카프카와 별도로 구성된 독립형 애플리케이션입니다. 비상업적 용도로 쓸 경우 무료로 사용 가능합니다. 스키마 레지스트리에 대해 알아보고 코드 레벨 단에서의 적용 방법을 소개해드리겠습니다.

1. 카프카 스키마 레지스트리

1) 스키마 레지스트리 활용

  • 사용 절차
    • 프로듀서는 스키마 레지스트리에 스키마를 등록합니다.
    • 스키마 레지스트리는 프로듀서에 의해 등록된 스키마 정보를 카프카의 내부 토픽에 저장합니다.
    • 프로듀서는 스키마 레지스트리에 등록됙 스키마 ID와 메시지를 카프카로 전송합니다.
    • 컨슈머는 스키마 ID를 스키마 레지스트리로부터 읽어옵니다. 프로듀서가 전송한 스미카 ID와 메시지를 조합해서 읽을 수 있습니다.
    • 스키마 레지스트리를 이용하기 위해서는 스키마 레지스트리가 지원하는 데이터 포맷 중에 에이브로를 사용합니다.

2) 카프카 스키마 레지스트리의 에이브로 지원

  • 에이브로는 시스템, 프로그래밍 언어, 프로세싱 프레임워크 사이에서 데이터 교환을 도와주는 오픈소스 직력화 시스템.
  • 장점
    • JSON과 매핑됩니다.
    • 매우 간결한 데이터 포맷입니다.
    • JSON은 메시지마다 필드 네임들이 포함되어 전송되므로 효율이 떨어진다. 에이브로는 값만 보내므로 데이타 사이즈가 작아진다.
    • 에이브로는 바이너리 형태이므로 매우 빠릅니다.
    • 데이터 필드마다 데이터 타입을 정의할 수 있고, doc를 이용해 각 필드의 의미를 데이터를 사용하고자 하는 사용자들에게 정확하게 전달할 수 있습니다.
    Specification

3) 카프카 스키마 레지스트리 설치

  • 카프카 클러스터 새로 설치
# hosts
[zkhosts]
peter-zk01.foo.bar
peter-zk02.foo.bar
peter-zk03.foo.bar

[kafkahosts]
peter-kafka01.foo.bar
peter-kafka02.foo.bar
peter-kafka03.foo.bar

[kerberoshosts]
peter-zk01.foo.bar
# kafka.yml

---
- hosts: kafkahosts
  become: true
  connection: ssh
  roles:
    - common
    - kafka
  • 설치
ansible-playbook -i hosts kafka.yml
  • 잘 설치되지 않는다면 주키퍼(zookeeper.yml), 카프카(kafka.yml)를 순서대로 설치합니다. site.yml을 이용합니다.
# zoekepper.yml
---
- hosts: zkhosts
  become: true
  connection: ssh
  roles:
    - common
    - zookeeper
# site.yml
---
- import_playbook: zookeeper.yml
- import_playbook: kafka.yml

sudo wget <http://packages.confluent.io/archive/6.1/confluent-community-6.1.0.tar.gz> -0 /opt/confluent-community-6.1.0.tar.gz
sudo tar zxf /opt/confluent-community-6.1.0.tar.gz =C usr/local
sudo ln -s /usr/local/confluent-6.1.0 /usr/local/confluent
  • 스키마 레지스트리 설정
vi /usr/local/confluent/etc/schema-registry/schema-resistry.properties
  • 카프카 스키마 레지스트리 옵션 설정
listeners=http://0.0.0.0:8081
kafkastore.bootstrap.servers=PLAINTEXT://peter-kafka01.foo.bar:9092,peter-kafka02.foo.bar:9092,peter-kafka03.foo.bar:9092
kafkastore.topic=_schemas
schema.compatibility.level=full
  • 운영 환경에서는 로드밸러서 등을 이용해 이중화로 구성함.
  • 카프카 스키마 레지스트리의 system 설정
vi /etc/systemd/system/schema-registry.service
[Unit]
Description=schema registry
After=network.target

[Service]
Type=simple
ExecStart=/usr/local/confluent/bin/schema-registry-start /usr/local/confluent/etc/schema-registry/schema-registry.properties
Restart=always

[Install]
WantedBy=multi-user.target
sudo systemctl daemon-reload
sudo systemctl start schema-registry

curl -X GET <http://pf-kafka03.sample.test:8081/config>
  • 카프카 스키마 레지스트리 API
    • GET /schemas
      • 현재 스키마 레지스트리에 등록된 전체 스키마 리스트 조회
      GET /schemas/ids/id
      • 스키마 ID로 조회
      GET /schemas/ids/id/versions
      • 스키마 ID의 버전
      GET /subjects
      • 스키마 레지스트리에 등록된 subject 리스트subject는 토픽이름-key, 토픽이름-value 형태로 쓰임
      GET /subjects/서브젝트 이름/versions
      • 특정 서브젝트의 버전 리스트 조회
      GET /config
      • 전역으로 설정된 레벨 조회
      GET /config/서브젝트 이름
      • 서브젝트에 설정된 호환성 조회
      DELETE /subjects/서브젝트 이름
      • 특정 서브젝트 전체 삭제
      DELETE /subjects/서브젝트 이름/versions/버전
      • 특정 서브젝트에서 특정 버전만 삭제

2. 스키마 레지스트리 실습

1) 파이썬을 이용한 스키마 레지스트리 활용

sudo yum -y install python3
python3 -m venv venv10
source venv10/bin/activate
pip install confluent-kafka[avro]

  • 에이브로 전송 프로듀서(python-avro-producer.py)
from confluent_kafka import avro
from confluent_kafka.avro import AvroProducer

value_schema_str = """
{"namespace": "student.avro",
 "type": "record",
 "doc": "This is an example of Avro.",
 "name": "Student",
 "fields": [
     {"name": "name", "type": ["null", "string"], "default": null, "doc": "Name of the student"},
     {"name": "class", "type": "int", "default": 1, "doc": "Class of the student"}
 ]
}
"""

value_schema = avro.loads(value_schema_str)
value = {"name": "Peter", "class": 1} # 전송할 메시지

def delivery_report(err, msg):
    """ Called once for each message produced to indicate delivery result.
        Triggered by poll() or flush(). """
    if err is not None:
        print('Message delivery failed: {}'.format(err))
    else:
        print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))

avroProducer = AvroProducer({
    'bootstrap.servers': 'peter-kafka01.foo.bar,peter-kafka02.foo.bar,peter-kafka03.foo.bar',
    'on_delivery': delivery_report,
    'schema.registry.url': '<http://peter-kafka03.foo.bar:8081>'
    }, default_value_schema=value_schema)

avroProducer.produce(topic='peter-avro2', value=value)
avroProducer.flush()
  • 메시지 전송
python python-avro-producer.py
  • 에이브로 컨슈머(python-avro-consumer.py)
from confluent_kafka import avro
from confluent_kafka.avro import AvroConsumer
from confluent_kafka.avro.serializer import SerializerError

value_schema_str = """
{"namespace": "student.avro",
 "type": "record",
 "doc": "This is an example of Avro.",
 "name": "Student",
 "fields": [
     {"name": "name", "type": ["null", "string"], "default": null, "doc": "Name of the student"},
     {"name": "class", "type": "int", "default": 1, "doc": "Class of the student"}
 ]
}
"""

value_schema = avro.loads(value_schema_str)

c = AvroConsumer({
    'bootstrap.servers': 'peter-kafka01.foo.bar,peter-kafka02.foo.bar,peter-kafka03.foo.bar',
    'group.id': 'python-groupid01',
    'auto.offset.reset': 'earliest',
    'schema.registry.url': '<http://peter-kafka03.foo.bar:8081>'},reader_value_schema=value_schema)

c.subscribe(['peter-avro2'])

while True:
    try:
        msg = c.poll(10)

    except SerializerError as e:
        print("Message deserialization failed for {}: {}".format(msg, e))
        break

    if msg is None:
        continue

    if msg.error():
        print("AvroConsumer error: {}".format(msg.error()))
        continue

    print(msg.value())

c.close()

Back to top