Архитектура, основанная на событиях, предполагает асинхронное взаимодействие между компонентами системы. Она позволяет эффективно распределять системные ресурсы, что способствует масштабируемости и повышению производительности приложений и программ, используемых в различных областях.
Событие в этой архитектуре — это сообщение, которое может быть отправлено в разных точках кода, когда выполняются определённые условия. Такой подход используется при разработке масштабируемых приложений, а также более простых программных продуктов.
Ключевые концепции EDA
Событие (Event) — это значимое изменение состояния системы или важное действие, которое произошло. Например:
- Пользователь создал заказ
- Температура достигла критического значения
- Произошла оплата
Компоненты EDA:
- Event Producers (генераторы событий) — создают события
- Event Consumers (потребители событий) — обрабатывают события
- Event Channels (каналы событий) — транспортируют события
- Event Processing (обработка событий) — механизмы реакции на события
Основные характеристики EDA
- Декомпозиция по времени. Компоненты системы не работают синхронно, а взаимодействуют через асинхронные события.
- Слабая связанность (Loose Coupling). Компоненты не знают друг о друге, знают только о событиях.
- Ориентация на изменения. Система реагирует на изменения, а не выполняет предопределённую последовательность действий.
Преимущества EDA
- Масштабируемость
- Гибкость и расширяемость. Новые функции добавляются как новые подписчики на события.
- Отказоустойчивость. Если один компонент упал, другие продолжают работать, а сообщения накапливаются в брокере.
- Поддержка распределённых систем. Компоненты могут быть разнесены географически.
- Реактивность. Система мгновенно реагирует на изменения.
Основные компоненты EDA
События (Events)
События - это уведомления о том, что что-то произошло в системе. Они содержат минимально необходимую информацию о произошедшем изменении.
Пример события:
from pydantic import BaseModel
class OrderCreatedEvent(BaseModel):
order_id: str
user_id: str
total_amount: float
timestamp: str
Продюсеры (Producers)
Компоненты, которые генерируют события и публикуют их в брокер сообщений.
Пример продюсера на FastStream:
from faststream import FastStream
from faststream.rabbit import RabbitBroker
broker = RabbitBroker("amqp://guest:guest@localhost:5672/")
app = FastStream(broker)
@broker.publisher("order_created")
async def create_order(order_data: dict) -> OrderCreatedEvent:
order = OrderCreatedEvent(**order_data)
return order
Брокеры сообщений (Message Brokers)
Промежуточное ПО, которое принимает события от продюсеров и доставляет их консьюмерам.
1.4. Консьюмеры (Consumers)
Компоненты, которые подписываются на события и реагируют на них.
Пример консьюмера на FastStream:
@broker.subscriber("order_created")
async def handle_order_created(event: OrderCreatedEvent):
print(f"Order {event.order_id} created by user {event.user_id}")
# Дополнительная обработка...
Паттерны EDA в FastStream
Публикация/Подписка (Pub/Sub)
Множество консьюмеров могут получать одно и то же событие.
Пример:
# Продюсер
@broker.publisher("notifications")
async def send_notification(message: str):
return {"message": message}
# Консьюмер 1
@broker.subscriber("notifications")
async def log_notification(message: dict):
print(f"Logging notification: {message}")
# Консьюмер 2
@broker.subscriber("notifications")
async def send_email_notification(message: dict):
print(f"Sending email with: {message}")
Точка-точка (Point-to-Point)
Событие доставляется только одному консьюмеру из группы.
Пример с очередью:
@broker.subscriber("processing_queue")
async def process_item(item: dict):
print(f"Processing item: {item}")
# Длительная обработка...
Маршрутизация событий (Event Routing)
Маршрутизация событий на основе их содержимого.
Пример:
from faststream.rabbit import RabbitExchange, RabbitQueue
exchange = RabbitExchange("orders", auto_delete=True)
high_priority = RabbitQueue("high_priority", routing_key="high")
normal_priority = RabbitQueue("normal_priority", routing_key="normal")
@broker.subscriber(high_priority, exchange)
async def handle_high_priority(order: dict):
print(f"Processing HIGH priority order: {order}")
@broker.subscriber(normal_priority, exchange)
async def handle_normal_priority(order: dict):
print(f"Processing normal priority order: {order}")
@broker.publisher(exchange)
async def create_order(order: dict):
routing_key = "high" if order.get("priority") == "high" else "normal"
return {"message": order, "routing_key": routing_key}
Обработка ошибок в EDA
Dead Letter Queues (DLQ)
Очереди для сообщений, которые не удалось обработать.
Пример:
from faststream.rabbit import RabbitQueue
queue = RabbitQueue(
"main_queue",
dead_letter_exchange="dlx",
dead_letter_routing_key="dlq",
)
@broker.subscriber(queue)
async def process_item(item: dict):
try:
# Обработка, которая может вызвать ошибку
process(item)
except Exception:
raise # Сообщение будет перенаправлено в DLQ
Повторные попытки (Retries)
Настройка повторных попыток обработки сообщения.
Пример:
from faststream import Retry
@broker.subscriber("orders", retry=Retry(max_calls=3, delay=5.0))
async def handle_order(order: dict):
# Может вызвать временную ошибку
await process_order(order)
Мониторинг и трассировка
Логирование
import logging
logger = logging.getLogger(__name__)
@broker.subscriber("orders")
async def handle_order(order: dict):
logger.info(f"Processing order {order['id']}")
# ...
Метрики
Интеграция с Prometheus:
from prometheus_client import Counter
ORDERS_PROCESSED = Counter('orders_processed', 'Total orders processed')
@broker.subscriber("orders")
async def handle_order(order: dict):
ORDERS_PROCESSED.inc()
# ...