Первая и фундаментальная практика — проектирование событий как фактов. Событие — это неизменяемая запись о чем-то, что уже произошло в прошлом. Его имя должно отражать это и быть в прошедшем времени: `OrderPlaced`, `PaymentProcessed`, `UserEmailChanged`. Событие несет в себе все необходимые данные (payload) для понимания этого факта, но не должно содержать инструкций о том, что делать получателю. Это обеспечивает слабую связность.
Пример структуры события в Java:
```
public class OrderPlacedEvent {
private final String eventId;
private final Instant timestamp;
private final String orderId;
private final String customerId;
private final BigDecimal amount;
private final List items;
// Конструктор, геттеры (поля final для неизменяемости)
}
```
Вторая критически важная практика — обеспечение идемпотентности обработчиков. В распределенных системах доставка сообщений «как минимум один раз» (at-least-once) — это норма. Обработчик может получить одно и то же событие несколько раз. Его реакция должна быть идемпотентной: повторная обработка того же события не должна менять состояние системы или вызывать побочные эффекты (например, двойная отправка email). Простейший способ — ведение журнала обработанных `eventId`.
Пример идемпотентного обработчика с использованием Spring и JPA:
```
@Service
public class OrderEventHandler {
@PersistenceContext
private EntityManager entityManager;
@Transactional
public void handle(OrderPlacedEvent event) {
// Проверяем, не обрабатывали ли мы это событие ранее
boolean isProcessed = entityManager.createQuery(
"SELECT COUNT(p) FROM ProcessedEvent p WHERE p.eventId = :eventId", Long.class)
.setParameter("eventId", event.getEventId())
.getSingleResult() > 0;
if (!isProcessed) {
// Логика обработки заказа...
// Сохраняем факт обработки
entityManager.persist(new ProcessedEvent(event.getEventId()));
}
}
}
```
Третья практика — использование паттернов компенсирующих транзакций (Saga) для управления распределенными долгими процессами. Классическая транзакция ACID в распределенной среде невозможна. Saga разбивает процесс на последовательность локальных транзакций, каждая из которых публикует событие для запуска следующего шага. Если какой-то шаг fails, запускается серия компенсирующих событий (откатов).
Пример упрощенной Saga для оформления заказа:
- Сервис заказов: создает заказ в статусе `PENDING`, публикует `OrderCreatedEvent`.
- Сервис платежей (обработчик): получает событие, списывает деньги, публикует `PaymentSucceededEvent` или `PaymentFailedEvent`.
- Сервис заказов (обработчик): при успехе — меняет статус заказа на `CONFIRMED`; при неудаче — публикует `OrderCancelledEvent`, инициируя компенсацию (например, отмену резервирования на складе).
Пример конфигурации продюсера Kafka с Avro-схемой в Spring:
```
@Configuration
public class KafkaConfig {
@Bean
public ProducerFactory producerFactory() {
Map config = new HashMap();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// Использование Avro-сериализатора для значения
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
config.put("schema.registry.url", "http://localhost:8081");
return new DefaultKafkaProducerFactory(config);
}
@Bean
public KafkaTemplate kafkaTemplate() {
return new KafkaTemplate(producerFactory());
}
}
```
Пятая практика — реализация отказоустойчивости и retry-логики с dead letter queue (DLQ). Не все ошибки временные. Обработчик должен уметь отличать временный сбой сети от фатальной бизнес-ошибки. Для временных сбоев используйте экспоненциальную задержку (exponential backoff) при повторных попытках. События, которые не удалось обработать после всех попыток, должны отправляться в специальный топик DLQ для последующего анализа вручную.
Пример обработчика с аннотацией `@Retryable` и отправкой в DLQ:
```
@Service
public class ResilientEventHandler {
private final KafkaTemplate kafkaTemplate;
@Retryable(
value = {TransientDataAccessException.class},
maxAttempts = 3,
backoff = @Backoff(delay = 1000, multiplier = 2.0)
)
@KafkaListener(topics = "orders")
public void listen(OrderPlacedEvent event) {
try {
// Попытка обработки...
} catch (BusinessRuleViolationException e) {
// Фатальная бизнес-ошибка — сразу в DLQ
kafkaTemplate.send("orders.DLQ", event.getOrderId(), event);
}
// При других исключениях сработает retry, а затем, если не поможет, — также DLQ
}
}
```
Следование этим практикам — проектирование событий как фактов, обеспечение идемпотентности, использование Saga, управление схемами и реализация отказоустойчивости — позволяет построить событийно-ориентированную систему, которая не только масштабируется, но и остается надежной, понятной и простой в поддержке по мере роста ее сложности.
Комментарии (7)