스키마 레스지트리란 스키마를 등록하고 관리하는 카프카와 별도로 구성된 독립형 애플리케이션입니다. 비상업적 용도로 쓸 경우 무료로 사용 가능합니다. 스키마 레지스트리에 대해 알아보고 코드 레벨 단에서의 적용 방법을 소개해드리겠습니다.
목차
Toggle1. 카프카 스키마 레지스트리
1) 스키마 레지스트리 활용
- 사용 절차
- 프로듀서는 스키마 레지스트리에 스키마를 등록합니다.
- 스키마 레지스트리는 프로듀서에 의해 등록된 스키마 정보를 카프카의 내부 토픽에 저장합니다.
- 프로듀서는 스키마 레지스트리에 등록됙 스키마 ID와 메시지를 카프카로 전송합니다.
- 컨슈머는 스키마 ID를 스키마 레지스트리로부터 읽어옵니다. 프로듀서가 전송한 스미카 ID와 메시지를 조합해서 읽을 수 있습니다.
- 스키마 레지스트리를 이용하기 위해서는 스키마 레지스트리가 지원하는 데이터 포맷 중에 에이브로를 사용합니다.
2) 카프카 스키마 레지스트리의 에이브로 지원
- 에이브로는 시스템, 프로그래밍 언어, 프로세싱 프레임워크 사이에서 데이터 교환을 도와주는 오픈소스 직력화 시스템.
- 장점
- JSON과 매핑됩니다.
- 매우 간결한 데이터 포맷입니다.
- JSON은 메시지마다 필드 네임들이 포함되어 전송되므로 효율이 떨어진다. 에이브로는 값만 보내므로 데이타 사이즈가 작아진다.
- 에이브로는 바이너리 형태이므로 매우 빠릅니다.
- 데이터 필드마다 데이터 타입을 정의할 수 있고, doc를 이용해 각 필드의 의미를 데이터를 사용하고자 하는 사용자들에게 정확하게 전달할 수 있습니다.
3) 카프카 스키마 레지스트리 설치
- 카프카 클러스터 새로 설치
# hosts
[zkhosts]
peter-zk01.foo.bar
peter-zk02.foo.bar
peter-zk03.foo.bar
[kafkahosts]
peter-kafka01.foo.bar
peter-kafka02.foo.bar
peter-kafka03.foo.bar
[kerberoshosts]
peter-zk01.foo.bar
# kafka.yml
---
- hosts: kafkahosts
become: true
connection: ssh
roles:
- common
- kafka
- 설치
ansible-playbook -i hosts kafka.yml
- 잘 설치되지 않는다면 주키퍼(zookeeper.yml), 카프카(kafka.yml)를 순서대로 설치합니다. site.yml을 이용합니다.
# zoekepper.yml
---
- hosts: zkhosts
become: true
connection: ssh
roles:
- common
- zookeeper
# site.yml
---
- import_playbook: zookeeper.yml
- import_playbook: kafka.yml
sudo wget <http://packages.confluent.io/archive/6.1/confluent-community-6.1.0.tar.gz> -0 /opt/confluent-community-6.1.0.tar.gz
sudo tar zxf /opt/confluent-community-6.1.0.tar.gz =C usr/local
sudo ln -s /usr/local/confluent-6.1.0 /usr/local/confluent
- 스키마 레지스트리 설정
vi /usr/local/confluent/etc/schema-registry/schema-resistry.properties
- 카프카 스키마 레지스트리 옵션 설정
listeners=http://0.0.0.0:8081
kafkastore.bootstrap.servers=PLAINTEXT://peter-kafka01.foo.bar:9092,peter-kafka02.foo.bar:9092,peter-kafka03.foo.bar:9092
kafkastore.topic=_schemas
schema.compatibility.level=full
- 운영 환경에서는 로드밸러서 등을 이용해 이중화로 구성함.
- 카프카 스키마 레지스트리의 system 설정
vi /etc/systemd/system/schema-registry.service
[Unit]
Description=schema registry
After=network.target
[Service]
Type=simple
ExecStart=/usr/local/confluent/bin/schema-registry-start /usr/local/confluent/etc/schema-registry/schema-registry.properties
Restart=always
[Install]
WantedBy=multi-user.target
sudo systemctl daemon-reload
sudo systemctl start schema-registry
curl -X GET <http://pf-kafka03.sample.test:8081/config>
- 카프카 스키마 레지스트리 API
- GET /schemas
- 현재 스키마 레지스트리에 등록된 전체 스키마 리스트 조회
- GET /schemas/ids/id
- 스키마 ID로 조회
- GET /schemas/ids/id/versions
- 스키마 ID의 버전
- GET /subjects
- 스키마 레지스트리에 등록된 subject 리스트
- subject는 토픽이름-key, 토픽이름-value 형태로 쓰임
- GET /subjects/서브젝트 이름/versions
- 특정 서브젝트의 버전 리스트 조회
- GET /config
- 전역으로 설정된 레벨 조회
- GET /config/서브젝트 이름
- 서브젝트에 설정된 호환성 조회
- DELETE /subjects/서브젝트 이름
- 특정 서브젝트 전체 삭제
- DELETE /subjects/서브젝트 이름/versions/버전
- 특정 서브젝트에서 특정 버전만 삭제
- GET /schemas
2. 스키마 레지스트리 실습
1) 파이썬을 이용한 스키마 레지스트리 활용
sudo yum -y install python3
python3 -m venv venv10
source venv10/bin/activate
pip install confluent-kafka[avro]
https://github.com/confluentinc/confluent-kafka-python
- 에이브로 전송 프로듀서(python-avro-producer.py)
from confluent_kafka import avro
from confluent_kafka.avro import AvroProducer
value_schema_str = """
{"namespace": "student.avro",
"type": "record",
"doc": "This is an example of Avro.",
"name": "Student",
"fields": [
{"name": "name", "type": ["null", "string"], "default": null, "doc": "Name of the student"},
{"name": "class", "type": "int", "default": 1, "doc": "Class of the student"}
]
}
"""
value_schema = avro.loads(value_schema_str)
value = {"name": "Peter", "class": 1} # 전송할 메시지
def delivery_report(err, msg):
""" Called once for each message produced to indicate delivery result.
Triggered by poll() or flush(). """
if err is not None:
print('Message delivery failed: {}'.format(err))
else:
print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))
avroProducer = AvroProducer({
'bootstrap.servers': 'peter-kafka01.foo.bar,peter-kafka02.foo.bar,peter-kafka03.foo.bar',
'on_delivery': delivery_report,
'schema.registry.url': '<http://peter-kafka03.foo.bar:8081>'
}, default_value_schema=value_schema)
avroProducer.produce(topic='peter-avro2', value=value)
avroProducer.flush()
- 메시지 전송
python python-avro-producer.py
- 에이브로 컨슈머(python-avro-consumer.py)
from confluent_kafka import avro
from confluent_kafka.avro import AvroConsumer
from confluent_kafka.avro.serializer import SerializerError
value_schema_str = """
{"namespace": "student.avro",
"type": "record",
"doc": "This is an example of Avro.",
"name": "Student",
"fields": [
{"name": "name", "type": ["null", "string"], "default": null, "doc": "Name of the student"},
{"name": "class", "type": "int", "default": 1, "doc": "Class of the student"}
]
}
"""
value_schema = avro.loads(value_schema_str)
c = AvroConsumer({
'bootstrap.servers': 'peter-kafka01.foo.bar,peter-kafka02.foo.bar,peter-kafka03.foo.bar',
'group.id': 'python-groupid01',
'auto.offset.reset': 'earliest',
'schema.registry.url': '<http://peter-kafka03.foo.bar:8081>'},reader_value_schema=value_schema)
c.subscribe(['peter-avro2'])
while True:
try:
msg = c.poll(10)
except SerializerError as e:
print("Message deserialization failed for {}: {}".format(msg, e))
break
if msg is None:
continue
if msg.error():
print("AvroConsumer error: {}".format(msg.error()))
continue
print(msg.value())
c.close()
python python-avro-consumer.py
- 스키마 확인
curl <http://pf-kafka03.sample.test:8081/schemas> | python -m json.tool
- 메시지 형식이 바뀔 때마다 스키마를 변경하면 되므로 쉽게 메시지 형식 변경이 가능.
이상으로 카프카 스키마 레지스트리에 대한 설명을 마치겠습니다.
카프카 도입 사례에 대한 설명이 필요하신 분은 여기 링크를 참고바랍니다.
좀더 자세한 설명을 원하시는 분은 실전 카프카 개발부터 운영까지 도서를 참고바립니다.
참고 : 실전 카프카 개발부터 운영까지
아래는 카프카 관련 국내 도서입니다.
“이 포스팅은 쿠팡 파트너스 활동의 일환으로, 이에 따른 일정액의 수수료를 제공받습니다.”