파이썬에서는 kafka-python이라는 라이브러리를 통해 카프카를 활용할 수 있다.
kafka-python 라이브러리를 설치 명령어
pip install kafka-python
다음은 kafka producer 예제이다.
producer.send() 함수를 사용하여 토픽에 메시지를 전송할 수 있다.
pythonCopy code
from kafka import KafkaProducer
# 카프카 서버 정보 설정
bootstrap_servers = ['localhost:9092']
# 프로듀서 생성
producer = KafkaProducer(bootstrap_servers=bootstrap_servers)
# 메시지 전송
producer.send('my_topic', b'message from python producer')
다음은 카프카 consumer예제이다.
consumer.poll() 함수를 사용하여 토픽에서 메시지를 가져올 수 있다.
pythonCopy code
from kafka import KafkaConsumer
# 카프카 서버 정보 설정
bootstrap_servers = ['localhost:9092']
# 컨슈머 생성
consumer = KafkaConsumer('my_topic', bootstrap_servers=bootstrap_servers)
# 메시지 수신
for message in consumer:
print(message.value.decode())
위 예제 코드에서 **my_topic**은 전송된 메시지를 수신할 토픽 이름이다.
**b'message from python producer'**는 전송할 메시지 내용이다.
이 예제를 실행하면 프로듀서가 메시지를 전송하고, 컨슈머가 해당 토픽에서 메시지를 수신하여 출력한다.
파이썬으로 작성된 이벤트 기반 마이크로서비스 아키텍처를 구현하기 위한 예제이다.
이 예제에서는 Kafka를 이용하여 이벤트를 발생시키고, 이벤트를 처리하는 마이크로서비스를 구현했다..
이 예제를 실행하기 위해서는 Kafka 클러스터를 구성하고,
Python 언어와 함께 confluent-kafka-python 패키지를 설치해야 한다.
이 패키지는 파이썬으로 Kafka를 사용할 수 있도록 도와주는 라이브러리 다.
예제 설명
이 예제는 **producer.py**와 consumer.py 두 개의 파이썬 파일로 이루어져 있다.
**producer.py**는 Kafka 토픽으로 이벤트를 발생시키는 producer를 구현하고,
**consumer.py**는 이벤트를 처리하는 consumer를 구현한다.
producer.py 코드
from confluent_kafka import Producer
p = Producer({'bootstrap.servers': 'localhost:9092'})
def delivery_report(err, msg):
if err is not None:
print('Message delivery failed: {}'.format(err))
else:
print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))
for data in range(1, 6):
p.produce('my_topic', key=str(data), value=str(data), callback=delivery_report)
p.flush()
confluent-kafka 패키지의 Producer 클래스를 사용하여 프로듀서 객체를 생성하고,
for 루프를 통해 토픽으로 이벤트를 발생시킨다.
이벤트의 키 값은 **str(data)**이고, 이벤트의 값은 **str(data)**이다.
callback 함수는 Kafka 프로듀서가 메시지를 전송한 후 호출되는 함수이다.
p.flush() 함수는 프로듀서가 모든 메시지를 전송하고 나서 종료될 때까지 대기하는 함수이다.
consumer.py 예제코드
from confluent_kafka import Consumer, KafkaError
c = Consumer({
'bootstrap.servers': 'localhost:9092',
'group.id': 'my_group',
'auto.offset.reset': 'earliest'
})
c.subscribe(['my_topic'])
while True:
msg = c.poll(1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
print('End of partition reached {}/{}'.format(msg.topic(), msg.partition()))
else:
print('Error while receiving message: {}'.format(msg.error()))
else:
print('Received message: {}:{}:{}:{}: key={} value={}'.format(
msg.topic(), msg.partition(), msg.offset(), msg.timestamp(), msg.key(), msg.value()))
c.close()
KafkaConsumer 클래스는 Kafka 서버에서 데이터를 가져올 때 사용된다.
여기서는 bootstrap_servers 인자로 지정한 Kafka 서버에 연결하여,
topic_name 인자로 지정한 토픽에서 데이터를 가져오도록 한다.
pythonCopy code
consumer = KafkaConsumer(
topic_name,
bootstrap_servers=['localhost:9092']
)
consumer.poll() 메서드는 Kafka Topic으로부터 데이터를 가져온다.
가져온 데이터는 message 변수에 저장되고 이때 message 변수는 Kafka Consumer Record 객체이다.
pythonCopy code
for message in consumer.poll(timeout_ms=1000):
**message.value()**는 Kafka Consumer Record에서 실제 데이터 값을 가져온다.
이 예제에서는 가져온 데이터를 단순히 출력하는 코드를 작성하였다.
pythonCopy code
print("Received message: {}".format(message.value()))
마지막으로, consumer.close() 메서드를 호출하여 Kafka Consumer와의 연결을 종료한다.
pythonCopy code
consumer.close()
이렇게 작성된 consumer.py 코드는 Kafka Topic에서 데이터를 가져와 처리하는 Consumer 역할을 한다.
'DevOps > kafka' 카테고리의 다른 글
[Kafka] Kafka Manager API를 사용하여 Kafka 클러스터 모니터링하는 법: 카프카 클러스터의 고가용성확보 (0) | 2023.05.08 |
---|---|
[Kafka] Kafka를 활용한 대규모 데이터 실시간 전송 예제 코드 (2) | 2023.05.02 |