Event-Driven Architecture (EDA)

Архитектура, основанная на событиях, предполагает асинхронное взаимодействие между компонентами системы. Она позволяет эффективно распределять системные ресурсы, что способствует масштабируемости и повышению производительности приложений и программ, используемых в различных областях.

Событие в этой архитектуре — это сообщение, которое может быть отправлено в разных точках кода, когда выполняются определённые условия. Такой подход используется при разработке масштабируемых приложений, а также более простых программных продуктов.

Ключевые концепции EDA

Событие (Event) — это значимое изменение состояния системы или важное действие, которое произошло. Например:

  • Пользователь создал заказ
  • Температура достигла критического значения
  • Произошла оплата

Компоненты EDA:

  • Event Producers (генераторы событий) — создают события
  • Event Consumers (потребители событий) — обрабатывают события
  • Event Channels (каналы событий) — транспортируют события
  • Event Processing (обработка событий) — механизмы реакции на события

Основные характеристики EDA

  • Декомпозиция по времени. Компоненты системы не работают синхронно, а взаимодействуют через асинхронные события.
  • Слабая связанность (Loose Coupling). Компоненты не знают друг о друге, знают только о событиях.
  • Ориентация на изменения. Система реагирует на изменения, а не выполняет предопределённую последовательность действий.

Преимущества EDA

  • Масштабируемость
  • Гибкость и расширяемость. Новые функции добавляются как новые подписчики на события.
  • Отказоустойчивость. Если один компонент упал, другие продолжают работать, а сообщения накапливаются в брокере.
  • Поддержка распределённых систем. Компоненты могут быть разнесены географически.
  • Реактивность. Система мгновенно реагирует на изменения.

Основные компоненты EDA

События (Events)

События - это уведомления о том, что что-то произошло в системе. Они содержат минимально необходимую информацию о произошедшем изменении.

Пример события:

from pydantic import BaseModel

class OrderCreatedEvent(BaseModel):
    order_id: str
    user_id: str
    total_amount: float
    timestamp: str

Продюсеры (Producers)

Компоненты, которые генерируют события и публикуют их в брокер сообщений.

Пример продюсера на FastStream:

from faststream import FastStream
from faststream.rabbit import RabbitBroker

broker = RabbitBroker("amqp://guest:guest@localhost:5672/")
app = FastStream(broker)

@broker.publisher("order_created")
async def create_order(order_data: dict) -> OrderCreatedEvent:
    order = OrderCreatedEvent(**order_data)
    return order

Брокеры сообщений (Message Brokers)

Промежуточное ПО, которое принимает события от продюсеров и доставляет их консьюмерам.

1.4. Консьюмеры (Consumers)

Компоненты, которые подписываются на события и реагируют на них.

Пример консьюмера на FastStream:

@broker.subscriber("order_created")
async def handle_order_created(event: OrderCreatedEvent):
    print(f"Order {event.order_id} created by user {event.user_id}")
    # Дополнительная обработка...

Паттерны EDA в FastStream

Публикация/Подписка (Pub/Sub)

Множество консьюмеров могут получать одно и то же событие.

Пример:

# Продюсер
@broker.publisher("notifications")
async def send_notification(message: str):
    return {"message": message}

# Консьюмер 1
@broker.subscriber("notifications")
async def log_notification(message: dict):
    print(f"Logging notification: {message}")

# Консьюмер 2
@broker.subscriber("notifications")
async def send_email_notification(message: dict):
    print(f"Sending email with: {message}")

Точка-точка (Point-to-Point)

Событие доставляется только одному консьюмеру из группы.

Пример с очередью:

@broker.subscriber("processing_queue")
async def process_item(item: dict):
    print(f"Processing item: {item}")
    # Длительная обработка...

Маршрутизация событий (Event Routing)

Маршрутизация событий на основе их содержимого.

Пример:

from faststream.rabbit import RabbitExchange, RabbitQueue

exchange = RabbitExchange("orders", auto_delete=True)
high_priority = RabbitQueue("high_priority", routing_key="high")
normal_priority = RabbitQueue("normal_priority", routing_key="normal")

@broker.subscriber(high_priority, exchange)
async def handle_high_priority(order: dict):
    print(f"Processing HIGH priority order: {order}")

@broker.subscriber(normal_priority, exchange)
async def handle_normal_priority(order: dict):
    print(f"Processing normal priority order: {order}")

@broker.publisher(exchange)
async def create_order(order: dict):
    routing_key = "high" if order.get("priority") == "high" else "normal"
    return {"message": order, "routing_key": routing_key}

Обработка ошибок в EDA

Dead Letter Queues (DLQ)

Очереди для сообщений, которые не удалось обработать.

Пример:

from faststream.rabbit import RabbitQueue

queue = RabbitQueue(
    "main_queue",
    dead_letter_exchange="dlx",
    dead_letter_routing_key="dlq",
)

@broker.subscriber(queue)
async def process_item(item: dict):
    try:
        # Обработка, которая может вызвать ошибку
        process(item)
    except Exception:
        raise  # Сообщение будет перенаправлено в DLQ

Повторные попытки (Retries)

Настройка повторных попыток обработки сообщения.

Пример:

from faststream import Retry

@broker.subscriber("orders", retry=Retry(max_calls=3, delay=5.0))
async def handle_order(order: dict):
    # Может вызвать временную ошибку
    await process_order(order)

Мониторинг и трассировка

Логирование

import logging

logger = logging.getLogger(__name__)

@broker.subscriber("orders")
async def handle_order(order: dict):
    logger.info(f"Processing order {order['id']}")
    # ...

Метрики

Интеграция с Prometheus:

from prometheus_client import Counter

ORDERS_PROCESSED = Counter('orders_processed', 'Total orders processed')

@broker.subscriber("orders")
async def handle_order(order: dict):
    ORDERS_PROCESSED.inc()
    # ...