DevOps/kafka

[Kafka] Python에 kafka 활용한 간단예제 ft.Event-driven microservices

Sophie소피 2023. 5. 8. 19:02

파이썬에서는 kafka-python이라는 라이브러리를 통해 카프카를 활용할 수 있다.

 

kafka-python 라이브러리를 설치 명령어 

pip install kafka-python

 

다음은 kafka producer 예제이다.

producer.send() 함수를 사용하여 토픽에 메시지를 전송할 수 있다.

pythonCopy code
from kafka import KafkaProducer

# 카프카 서버 정보 설정
bootstrap_servers = ['localhost:9092']

# 프로듀서 생성
producer = KafkaProducer(bootstrap_servers=bootstrap_servers)

# 메시지 전송
producer.send('my_topic', b'message from python producer')

 

다음은 카프카 consumer예제이다.

consumer.poll() 함수를 사용하여 토픽에서 메시지를 가져올 수 있다.

pythonCopy code
from kafka import KafkaConsumer

# 카프카 서버 정보 설정
bootstrap_servers = ['localhost:9092']

# 컨슈머 생성
consumer = KafkaConsumer('my_topic', bootstrap_servers=bootstrap_servers)

# 메시지 수신
for message in consumer:
    print(message.value.decode())

위 예제 코드에서 **my_topic**은 전송된 메시지를 수신할 토픽 이름이다.

**b'message from python producer'**는 전송할 메시지 내용이다.

이 예제를 실행하면 프로듀서가 메시지를 전송하고, 컨슈머가 해당 토픽에서 메시지를 수신하여 출력한다.

파이썬으로 작성된 이벤트 기반 마이크로서비스 아키텍처를 구현하기 위한 예제이다.

 

이 예제에서는 Kafka를 이용하여 이벤트를 발생시키고, 이벤트를 처리하는 마이크로서비스를 구현했다..

이 예제를 실행하기 위해서는 Kafka 클러스터를 구성하고,

Python 언어와 함께 confluent-kafka-python 패키지를 설치해야 한다.

이 패키지는 파이썬으로 Kafka를 사용할 수 있도록 도와주는 라이브러리 다.

예제 설명

이 예제는 **producer.py**와 consumer.py 두 개의 파이썬 파일로 이루어져 있다.

**producer.py**는 Kafka 토픽으로 이벤트를 발생시키는 producer를 구현하고,

**consumer.py**는 이벤트를 처리하는 consumer를 구현한다.

producer.py 코드

from confluent_kafka import Producer

p = Producer({'bootstrap.servers': 'localhost:9092'})

def delivery_report(err, msg):
    if err is not None:
        print('Message delivery failed: {}'.format(err))
    else:
        print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))

for data in range(1, 6):
    p.produce('my_topic', key=str(data), value=str(data), callback=delivery_report)

p.flush()

confluent-kafka 패키지의 Producer 클래스를 사용하여 프로듀서 객체를 생성하고,

for 루프를 통해 토픽으로 이벤트를 발생시킨다.

이벤트의 키 값은 **str(data)**이고, 이벤트의 값은 **str(data)**이다.

callback 함수는 Kafka 프로듀서가 메시지를 전송한 후 호출되는 함수이다.

p.flush() 함수는 프로듀서가 모든 메시지를 전송하고 나서 종료될 때까지 대기하는 함수이다.

 

consumer.py 예제코드

from confluent_kafka import Consumer, KafkaError

c = Consumer({
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'my_group',
    'auto.offset.reset': 'earliest'
})

c.subscribe(['my_topic'])

while True:
    msg = c.poll(1.0)

    if msg is None:
        continue
    if msg.error():
        if msg.error().code() == KafkaError._PARTITION_EOF:
            print('End of partition reached {}/{}'.format(msg.topic(), msg.partition()))
        else:
            print('Error while receiving message: {}'.format(msg.error()))
    else:
        print('Received message: {}:{}:{}:{}: key={} value={}'.format(
            msg.topic(), msg.partition(), msg.offset(), msg.timestamp(), msg.key(), msg.value()))

c.close()

KafkaConsumer 클래스는 Kafka 서버에서 데이터를 가져올 때 사용된다.
여기서는 bootstrap_servers 인자로 지정한 Kafka 서버에 연결하여,

topic_name 인자로 지정한 토픽에서 데이터를 가져오도록 한다.

pythonCopy code
consumer = KafkaConsumer(
    topic_name,
    bootstrap_servers=['localhost:9092']
)

consumer.poll() 메서드는 Kafka Topic으로부터 데이터를 가져온다. 
가져온 데이터는 message 변수에 저장되고 이때 message 변수는 Kafka Consumer Record 객체이다.

pythonCopy code
for message in consumer.poll(timeout_ms=1000):

**message.value()**는 Kafka Consumer Record에서 실제 데이터 값을 가져온다.
이 예제에서는 가져온 데이터를 단순히 출력하는 코드를 작성하였다.

pythonCopy code
    print("Received message: {}".format(message.value()))

마지막으로, consumer.close() 메서드를 호출하여 Kafka Consumer와의 연결을 종료한다.

pythonCopy code
consumer.close()

이렇게 작성된 consumer.py 코드는 Kafka Topic에서 데이터를 가져와 처리하는 Consumer 역할을 한다.