Implementing Event-Driven Architecture with Kafka
A practical guide to building event-driven microservices using Apache Kafka and Spring Boot for scalable, decoupled systems.
Implementing Event-Driven Architecture with Kafka
Event-driven architecture (EDA) enables services to communicate asynchronously through events, creating more scalable and resilient systems. In this post, I'll show you how to implement EDA using Apache Kafka and Spring Boot.
Why Event-Driven Architecture?
Traditional synchronous communication has several drawbacks:
- Tight coupling: Services depend on each other's availability
- Poor scalability: Bottlenecks propagate through the system
- Single point of failure: One service down affects the entire chain
Event-driven architecture solves these problems by:
- Decoupling services: Producers don't need to know about consumers
- Improved scalability: Consumers can scale independently
- Better resilience: Events can be replayed and processed at different rates
Kafka Fundamentals
Core Concepts
- Topics: Logical channels for events
- Partitions: Divisions within topics for parallelism
- Producers: Applications that send events
- Consumers: Applications that process events
- Consumer Groups: Groups of consumers that share load
Setting Up Kafka
# docker-compose.yml
version: '3'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.4.0
environment:
ZOOKEEPER_CLIENT_PORT: 2181
kafka:
image: confluentinc/cp-kafka:7.4.0
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
Producing Events
Spring Kafka Producer Configuration
@Configuration
@EnableKafka
public class KafkaProducerConfig {
@Bean
public ProducerFactory<String, Object> producerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
// Reliability settings
config.put(ProducerConfig.ACKS_CONFIG, "all");
config.put(ProducerConfig.RETRIES_CONFIG, 3);
config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
return new DefaultKafkaProducerFactory<>(config);
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
Event Producer Service
@Service
public class OrderEventProducer {
private final KafkaTemplate<String, Object> kafkaTemplate;
private static final String ORDER_TOPIC = "orders";
public void publishOrderCreated(Order order) {
OrderCreatedEvent event = OrderCreatedEvent.builder()
.orderId(order.getId())
.userId(order.getUserId())
.total(order.getTotal())
.items(order.getItems())
.createdAt(Instant.now())
.build();
kafkaTemplate.send(ORDER_TOPIC, order.getId().toString(), event);
}
public void publishOrderCancelled(Long orderId) {
OrderCancelledEvent event = OrderCancelledEvent.builder()
.orderId(orderId)
.cancelledAt(Instant.now())
.build();
kafkaTemplate.send(ORDER_TOPIC, orderId.toString(), event);
}
}
Event DTOs
@Data
@Builder
public class OrderCreatedEvent {
private Long orderId;
private Long userId;
private BigDecimal total;
private List<OrderItem> items;
private Instant createdAt;
}
@Data
@Builder
public class OrderCancelledEvent {
private Long orderId;
private Instant cancelledAt;
}
Consuming Events
Spring Kafka Consumer Configuration
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
@Bean
public ConsumerFactory<String, Object> consumerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
// Consumer group
config.put(ConsumerConfig.GROUP_ID_CONFIG, "notification-service");
// Offset management
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
return new DefaultKafkaConsumerFactory<>(
config,
new StringDeserializer(),
new JsonDeserializer<>(Object.class)
);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
// Manual commit for reliability
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
// Concurrency
factory.setConcurrency(3);
return factory;
}
}
Event Consumer Service
@Service
public class OrderEventConsumer {
private final NotificationService notificationService;
@KafkaListener(
topics = "orders",
groupId = "notification-service",
containerFactory = "kafkaListenerContainerFactory"
)
public void handleOrderCreated(
OrderCreatedEvent event,
Acknowledgment acknowledgment
) {
try {
notificationService.sendOrderConfirmation(event);
acknowledgment.acknowledge();
} catch (Exception e) {
// Handle error - event will be retried
log.error("Failed to process order created event: {}", event.getOrderId(), e);
}
}
@KafkaListener(
topics = "orders",
groupId = "inventory-service",
containerFactory = "kafkaListenerContainerFactory"
)
public void handleOrderCreatedForInventory(
OrderCreatedEvent event,
Acknowledgment acknowledgment
) {
try {
inventoryService.reserveItems(event);
acknowledgment.acknowledge();
} catch (Exception e) {
log.error("Failed to reserve inventory for order: {}", event.getOrderId(), e);
}
}
}
Error Handling and Retries
Retry Strategy
@Configuration
public class KafkaRetryConfig {
@Bean
public RetryTemplate retryTemplate() {
return RetryTemplate.builder()
.maxAttempts(3)
.exponentialBackoff(1000, 2, 10000)
.retryOn(Exception.class)
.build();
}
}
Dead Letter Queue
@Configuration
public class KafkaDlqConfig {
@Bean
public ProducerFactory<String, Object> dlqProducerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(config);
}
@Bean
public KafkaTemplate<String, Object> dlqKafkaTemplate() {
return new KafkaTemplate<>(dlqProducerFactory());
}
}
Event Sourcing
Event Store
@Repository
public interface EventRepository extends JpaRepository<Event, Long> {
List<Event> findByAggregateIdOrderByCreatedAtAsc(Long aggregateId);
@Query("SELECT e FROM Event e WHERE e.aggregateId = :aggregateId AND e.version <= :version")
List<Event> findEventsUpToVersion(@Param("aggregateId") Long aggregateId, @Param("version") Long version);
}
Aggregate Rebuilder
@Service
public class OrderAggregateService {
private final EventRepository eventRepository;
public Order rebuildOrder(Long orderId) {
List<Event> events = eventRepository.findByAggregateIdOrderByCreatedAtAsc(orderId);
Order order = new Order();
for (Event event : events) {
applyEvent(order, event);
}
return order;
}
private void applyEvent(Order order, Event event) {
if (event.getType() == EventType.ORDER_CREATED) {
OrderCreatedEvent data = event.getData();
order.setId(data.getOrderId());
order.setUserId(data.getUserId());
order.setTotal(data.getTotal());
} else if (event.getType() == EventType.ORDER_CANCELLED) {
order.setStatus(OrderStatus.CANCELLED);
}
}
}
Monitoring and Observability
Kafka Metrics
@Configuration
public class KafkaMetricsConfig {
@Bean
public KafkaAdmin kafkaAdmin() {
Map<String, Object> config = new HashMap<>();
config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
return new KafkaAdmin(config);
}
@Bean
public MeterBinder kafkaMetricsBinder(KafkaAdmin kafkaAdmin) {
return new KafkaMetricsBinder(kafkaAdmin);
}
}
Best Practices
- Idempotent Consumers: Handle duplicate events gracefully
- Event Versioning: Use schema evolution strategies
- Backpressure: Implement rate limiting for consumers
- Monitoring: Track lag, throughput, and error rates
- Testing: Use embedded Kafka for integration tests
Conclusion
Event-driven architecture with Kafka provides a robust foundation for scalable microservices. By decoupling services through events, you can build systems that are more resilient, scalable, and maintainable.
In future posts, I'll cover CQRS patterns, saga transactions, and advanced Kafka features like exactly-once semantics.
Related Modules
Building Scalable REST APIs with Spring Boot
Learn how to design and implement scalable REST APIs using Spring Boot with best practices for performance, security, and maintainability.
Securing Spring Boot Applications with OAuth2 and JWT
A comprehensive guide to implementing secure authentication and authorization in Spring Boot applications using OAuth2 and JWT.