
스키마 레스지트리란 스키마를 등록하고 관리하는 카프카와 별도로 구성된 독립형 애플리케이션입니다. 비상업적 용도로 쓸 경우 무료로 사용 가능합니다. 스키마 레지스트리에 대해 알아보고 코드 레벨 단에서의 적용 방법을 소개해드리겠습니다.
목차
Toggle1. 카프카 스키마 레지스트리
1) 스키마 레지스트리 활용
- 사용 절차
- 프로듀서는 스키마 레지스트리에 스키마를 등록합니다.
- 스키마 레지스트리는 프로듀서에 의해 등록된 스키마 정보를 카프카의 내부 토픽에 저장합니다.
- 프로듀서는 스키마 레지스트리에 등록됙 스키마 ID와 메시지를 카프카로 전송합니다.
- 컨슈머는 스키마 ID를 스키마 레지스트리로부터 읽어옵니다. 프로듀서가 전송한 스미카 ID와 메시지를 조합해서 읽을 수 있습니다.
- 스키마 레지스트리를 이용하기 위해서는 스키마 레지스트리가 지원하는 데이터 포맷 중에 에이브로를 사용합니다.
2) 카프카 스키마 레지스트리의 에이브로 지원
- 에이브로는 시스템, 프로그래밍 언어, 프로세싱 프레임워크 사이에서 데이터 교환을 도와주는 오픈소스 직력화 시스템.
- 장점
- JSON과 매핑됩니다.
- 매우 간결한 데이터 포맷입니다.
- JSON은 메시지마다 필드 네임들이 포함되어 전송되므로 효율이 떨어진다. 에이브로는 값만 보내므로 데이타 사이즈가 작아진다.
- 에이브로는 바이너리 형태이므로 매우 빠릅니다.
- 데이터 필드마다 데이터 타입을 정의할 수 있고, doc를 이용해 각 필드의 의미를 데이터를 사용하고자 하는 사용자들에게 정확하게 전달할 수 있습니다.
3) 카프카 스키마 레지스트리 설치
- 카프카 클러스터 새로 설치
# hosts
[zkhosts]
peter-zk01.foo.bar
peter-zk02.foo.bar
peter-zk03.foo.bar
[kafkahosts]
peter-kafka01.foo.bar
peter-kafka02.foo.bar
peter-kafka03.foo.bar
[kerberoshosts]
peter-zk01.foo.bar
# kafka.yml
---
- hosts: kafkahosts
become: true
connection: ssh
roles:
- common
- kafka
- 설치
ansible-playbook -i hosts kafka.yml
- 잘 설치되지 않는다면 주키퍼(zookeeper.yml), 카프카(kafka.yml)를 순서대로 설치합니다. site.yml을 이용합니다.
# zoekepper.yml
---
- hosts: zkhosts
become: true
connection: ssh
roles:
- common
- zookeeper
# site.yml
---
- import_playbook: zookeeper.yml
- import_playbook: kafka.yml
sudo wget <http://packages.confluent.io/archive/6.1/confluent-community-6.1.0.tar.gz> -0 /opt/confluent-community-6.1.0.tar.gz
sudo tar zxf /opt/confluent-community-6.1.0.tar.gz =C usr/local
sudo ln -s /usr/local/confluent-6.1.0 /usr/local/confluent
- 스키마 레지스트리 설정
vi /usr/local/confluent/etc/schema-registry/schema-resistry.properties
- 카프카 스키마 레지스트리 옵션 설정
listeners=http://0.0.0.0:8081
kafkastore.bootstrap.servers=PLAINTEXT://peter-kafka01.foo.bar:9092,peter-kafka02.foo.bar:9092,peter-kafka03.foo.bar:9092
kafkastore.topic=_schemas
schema.compatibility.level=full
- 운영 환경에서는 로드밸러서 등을 이용해 이중화로 구성함.
- 카프카 스키마 레지스트리의 system 설정
vi /etc/systemd/system/schema-registry.service
[Unit]
Description=schema registry
After=network.target
[Service]
Type=simple
ExecStart=/usr/local/confluent/bin/schema-registry-start /usr/local/confluent/etc/schema-registry/schema-registry.properties
Restart=always
[Install]
WantedBy=multi-user.target
sudo systemctl daemon-reload
sudo systemctl start schema-registry
curl -X GET <http://pf-kafka03.sample.test:8081/config>
- 카프카 스키마 레지스트리 API
- GET /schemas
- 현재 스키마 레지스트리에 등록된 전체 스키마 리스트 조회
- 스키마 ID로 조회
- 스키마 ID의 버전
- 스키마 레지스트리에 등록된 subject 리스트subject는 토픽이름-key, 토픽이름-value 형태로 쓰임
- 특정 서브젝트의 버전 리스트 조회
- 전역으로 설정된 레벨 조회
- 서브젝트에 설정된 호환성 조회
- 특정 서브젝트 전체 삭제
- 특정 서브젝트에서 특정 버전만 삭제
- GET /schemas
2. 스키마 레지스트리 실습
1) 파이썬을 이용한 스키마 레지스트리 활용
sudo yum -y install python3
python3 -m venv venv10
source venv10/bin/activate
pip install confluent-kafka[avro]
- 에이브로 전송 프로듀서(python-avro-producer.py)
from confluent_kafka import avro
from confluent_kafka.avro import AvroProducer
value_schema_str = """
{"namespace": "student.avro",
"type": "record",
"doc": "This is an example of Avro.",
"name": "Student",
"fields": [
{"name": "name", "type": ["null", "string"], "default": null, "doc": "Name of the student"},
{"name": "class", "type": "int", "default": 1, "doc": "Class of the student"}
]
}
"""
value_schema = avro.loads(value_schema_str)
value = {"name": "Peter", "class": 1} # 전송할 메시지
def delivery_report(err, msg):
""" Called once for each message produced to indicate delivery result.
Triggered by poll() or flush(). """
if err is not None:
print('Message delivery failed: {}'.format(err))
else:
print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))
avroProducer = AvroProducer({
'bootstrap.servers': 'peter-kafka01.foo.bar,peter-kafka02.foo.bar,peter-kafka03.foo.bar',
'on_delivery': delivery_report,
'schema.registry.url': '<http://peter-kafka03.foo.bar:8081>'
}, default_value_schema=value_schema)
avroProducer.produce(topic='peter-avro2', value=value)
avroProducer.flush()
- 메시지 전송
python python-avro-producer.py
- 에이브로 컨슈머(python-avro-consumer.py)
from confluent_kafka import avro
from confluent_kafka.avro import AvroConsumer
from confluent_kafka.avro.serializer import SerializerError
value_schema_str = """
{"namespace": "student.avro",
"type": "record",
"doc": "This is an example of Avro.",
"name": "Student",
"fields": [
{"name": "name", "type": ["null", "string"], "default": null, "doc": "Name of the student"},
{"name": "class", "type": "int", "default": 1, "doc": "Class of the student"}
]
}
"""
value_schema = avro.loads(value_schema_str)
c = AvroConsumer({
'bootstrap.servers': 'peter-kafka01.foo.bar,peter-kafka02.foo.bar,peter-kafka03.foo.bar',
'group.id': 'python-groupid01',
'auto.offset.reset': 'earliest',
'schema.registry.url': '<http://peter-kafka03.foo.bar:8081>'},reader_value_schema=value_schema)
c.subscribe(['peter-avro2'])
while True:
try:
msg = c.poll(10)
except SerializerError as e:
print("Message deserialization failed for {}: {}".format(msg, e))
break
if msg is None:
continue
if msg.error():
print("AvroConsumer error: {}".format(msg.error()))
continue
print(msg.value())
c.close()