DesignPattern

Architecture EDA(Event Driven Architecture) 마틴 파울러의 4가지 구현 패턴에 대해

Sophie소피 2023. 5. 8. 13:23

Event Driven Architecture는 애플리케이션에서 발생하는

모든 이벤트(상태 변화, 액션, 메시지 등)를 중심으로 설계된 아키텍처이다.

이벤트 드리븐 아키텍처는 이벤트를 통해 시스템을 제어하고, 이벤트가 발생했을 때

필요한 로직을 실행하는 방식으로 동작한다

 

이벤트 드리븐 아키텍처는 이벤트가 발생했을 때 이벤트 핸들러를 호출하여 이벤트를 처리하도록 설계된다.

이벤트 핸들러는 이벤트의 내용을 분석하여 필요한 로직을 수행하고, 다른 이벤트를 발생시킬 수도 있다.

이렇게 이벤트가 중심이 되는 아키텍처는 유연하고 확장성이 높으며,

비동기적인 방식으로 동작하여 처리 속도가 빠르다.

 

이벤트 드리븐 아키텍처는 분산 시스템에서도 유용하게 사용될 수 있다.

분산 시스템에서는 이벤트가 다른 노드로 전파되어 처리되기 때문에,

시스템 전체의 상태를 빠르게 파악할 수 있으며, 장애가 발생했을 때도 신속하게 대응할 수 있다.

 

이벤트 드리븐 아키텍처는 분산 환경에서의 유연성과 확장성을 강조하는 아키텍처 패턴 중 하나이다.

이를 위해 이벤트 또는 메시지를 전달하여 각 컴포넌트 간의 결합도를 낮추고, 확장성과 유연성을 높인다.

 

이를 구현하기 위해서는 보통 Message Broker를 사용하고.

Message Broker는 이벤트 또는 메시지를 수신하고 발신하는 역할을 하며,

이를 통해 각 컴포넌트 간의 통신을 비동기적으로 처리할 수 있다.

대표적인 Message Broker로는 Kafka, RabbitMQ, ActiveMQ 등이 있다. 

 

마틴 파울러의 
Event Driven Architecture를 구현하는 데 사용할 수 있는  4 가지 패턴을 소개하겠다.

 

 

 

이벤트 소싱 (Event Sourcing)

이벤트 소싱 패턴은 시스템의 모든 변경 사항을 이벤트 스트림으로 저장하고,

이전 이벤트를 캡처하여 시스템 상태를 복원하는 데 사용하는 패턴이다.

이벤트 소싱 패턴은 불변성과 복원력을 보장하여 시스템을 더욱 견고하게 만들어준다..

아래는 파이썬으로 이벤트 소싱 패턴을 구현한 예제코드이다. 

class Event:
    def __init__(self, event_name, event_data):
        self.name = event_name
        self.data = event_data

class EventStore:
    def __init__(self):
        self.events = []

    def add_event(self, event):
        self.events.append(event)

    def get_events(self):
        return self.events

class Order:
    def __init__(self, event_store):
        self.event_store = event_store
        self.status = None

    def create(self, customer_name):
        event_data = {"customer_name": customer_name}
        event = Event("order_created", event_data)
        self.event_store.add_event(event)
        self.status = "created"

    def ship(self):
        if self.status == "created":
            event = Event("order_shipped", {})
            self.event_store.add_event(event)
            self.status = "shipped"
        else:
            raise Exception("Order must be in 'created' state to ship it.")

event_store = EventStore()
order = Order(event_store)

order.create("John Smith")
order.ship()

events = event_store.get_events()
for event in events:
    print(event.name, event.data)

 

CQRS (Command and Query Responsibility Segregation)

CQRS 패턴은 읽기 작업과 쓰기 작업을 분리하여 시스템을 설계하는 패턴이다.

이 패턴을 사용하면 시스템의 일관성, 확장성 및 성능을 높일 수 있다.

CQRS 패턴은 쿼리 모델과 커맨드 모델을 분리하고,

서로 다른 데이터 저장소를 사용하여 이 둘을 분리한다.

아래는 파이썬으로 CQRS 패턴을 구현한 예시이다.

 

class CommandHandler:
    def __init__(self):
        self.orders = []

    def handle(self, command):
        if command["type"] == "create_order":
            self.orders.append({"customer_name": command["customer_name"], "status": "created"})
        elif command["type"] == "ship_order":
            for order in self.orders:
                if order["customer_name"] == command["customer_name"]:
                    if order["status"] == "created":
                        order["status"] = "shipped"
                    else:
                        raise Exception("Order must be in 'created' state to ship it.")
        else:
            raise Exception("Unknown command type.")

class QueryHandler:
    def __init__(self):
        self.orders = []

    def handle(self, query):
        if query["type"] == "get_orders":
            return self.orders
        elif query["type"] == "get_order_status":
            for order in self.orders:

 

블록체인 패턴

블록체인 패턴은 분산 시스템에서 상태 변경을 추적하고 기록하기 위해 사용된다.

이 패턴을 사용하면 시스템 상태의 불변성과 안정성을 보장할 수 있다.

이 패턴을 구현하는 가장 유명한 기술 중 하나는 블록체인이다.
이 예제 코드는 Python 3.7 이상에서 실행됩니다.

import hashlib
import json
import time

class Block:
    def __init__(self, index, transactions, timestamp, previous_hash):
        self.index = index
        self.transactions = transactions
        self.timestamp = timestamp
        self.previous_hash = previous_hash
        self.hash = self.calculate_hash()

    def calculate_hash(self):
        block_string = json.dumps(self.__dict__, sort_keys=True)
        return hashlib.sha256(block_string.encode()).hexdigest()

class Blockchain:
    def __init__(self):
        self.chain = [self.create_genesis_block()]

    def create_genesis_block(self):
        return Block(0, [], time.time(), "0")

    def get_latest_block(self):
        return self.chain[-1]

    def add_block(self, new_block):
        new_block.previous_hash = self.get_latest_block().hash
        new_block.hash = new_block.calculate_hash()
        self.chain.append(new_block)

    def is_valid(self):
        for i in range(1, len(self.chain)):
            current_block = self.chain[i]
            previous_block = self.chain[i-1]
            if current_block.hash != current_block.calculate_hash():
                return False
            if current_block.previous_hash != previous_block.hash:
                return False
        return True

이 예제 코드에서는 블록과 블록체인 클래스를 정의한다.

블록 클래스는 블록체인에서 사용되는 블록을 나타내며,

이전 블록의 해시 값과 현재 블록의 해시 값을 계산한다.

블록체인 클래스는 블록들의 리스트를 관리하며,

블록을 추가할 수 있고, 블록체인의 유효성을 검사힌다.

다음은 이 예제 코드를 사용하는 방법이다.

# 블록체인 생성
blockchain = Blockchain()

# 블록 추가
blockchain.add_block(Block(1, ["transaction 1"], time.time(), ""))
blockchain.add_block(Block(2, ["transaction 2"], time.time(), ""))

# 블록체인 유효성 검사
print(blockchain.is_valid()) # True

이 예제 코드에서는 블록체인을 생성하고, 블록을 추가한 후 블록체인의 유효성을 검사한다.

블록체인은 블록들의 리스트로 구성되며, 각 블록은 이전 블록의 해시 값을 가지고 있다.

 

 

이벤트 메시지 (Event Messaging) 패턴

이벤트 메시지 패턴은 시스템 간 통신에 이벤트 메시지를 사용한다.

이 패턴을 사용하면 느슨한 결합을 유지하면서 시스템 간 통신을 구현할 수 있다.

파이썬에서는 대표적으로 RabbitMQ, Kafka와 같은 Message Broker를 사용하여 이벤트 메시지를 전달한다.

 

이러한 Message Broker를 사용하면 여러 시스템 간에 이벤트 메시지를 비동기적으로 전송할 수 있으며,

각 시스템은 이벤트 메시지를 수신하여 필요한 처리를 수행한다.

 

아래는 KAFKA를 사용하여 이벤트 메시지를 전달하는 간단한 예제 코드이다.

Kafka는 대용량 실시간 로그 처리 시스템으로, 이벤트 드리븐 아키텍처에서 많이 사용된다.

 

다음은 Python에서 Kafka를 사용하는 예제 코드이다.

먼저, Kafka 패키지를 설치해야 합니다. pip install kafka-python 명령어를 사용하여 설치할 수 있다.

 

from kafka import KafkaProducer, KafkaConsumer
import json

# Kafka 프로듀서 생성
producer = KafkaProducer(bootstrap_servers='localhost:9092',
                         value_serializer=lambda v: json.dumps(v).encode('utf-8'))

# Kafka 컨슈머 생성
consumer = KafkaConsumer('test-topic',
                         bootstrap_servers=['localhost:9092'],
                         value_deserializer=lambda m: json.loads(m.decode('utf-8')))

# 메시지 전송
producer.send('test-topic', {'key': 'value'})

# 메시지 수신
for message in consumer:
    print(message.value)

 

위 코드에서는 KafkaProducer와 KafkaConsumer를 사용하여 Kafka 서버에 연결한다.

bootstrap_servers 인자에는 Kafka 서버의 주소를 지정한다.

value_serializer와 value_deserializer는 Kafka에 전송할 메시지를 JSON 형식으로 인코딩하고

디코딩하는 함수를 지정한다.

 

producer.send를 사용하여 test-topic이라는 토픽에 메시지를 전송하고,

consumer 객체를 사용하여 메시지를 수신한다.

consumer는 for문을 사용하여 메시지가 도착할 때마다 메시지를 출력한다

Kafka를 사용하는 예제 중 하나는 로그 데이터를 수집하여 실시간으로 처리하는 것이다.

로그 데이터를 Kafka에 전송하고, 로그 처리를 위한 Kafka 컨슈머를 구현하여 실시간으로 처리할 수 있다.

 

Architecture EDA(Event Driven Architecture