KafkaEvent-DrivenMicroservicesArchitectureSpring Boot

Implementing Event-Driven Architecture with Kafka

A practical guide to building event-driven microservices using Apache Kafka and Spring Boot for scalable, decoupled systems.

5 min read
Share:

Implementing Event-Driven Architecture with Kafka

Event-driven architecture (EDA) enables services to communicate asynchronously through events, creating more scalable and resilient systems. In this post, I'll show you how to implement EDA using Apache Kafka and Spring Boot.

Why Event-Driven Architecture?

Traditional synchronous communication has several drawbacks:

  • Tight coupling: Services depend on each other's availability
  • Poor scalability: Bottlenecks propagate through the system
  • Single point of failure: One service down affects the entire chain

Event-driven architecture solves these problems by:

  • Decoupling services: Producers don't need to know about consumers
  • Improved scalability: Consumers can scale independently
  • Better resilience: Events can be replayed and processed at different rates

Kafka Fundamentals

Core Concepts

  • Topics: Logical channels for events
  • Partitions: Divisions within topics for parallelism
  • Producers: Applications that send events
  • Consumers: Applications that process events
  • Consumer Groups: Groups of consumers that share load

Setting Up Kafka

# docker-compose.yml
version: '3'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.4.0
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
  
  kafka:
    image: confluentinc/cp-kafka:7.4.0
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

Producing Events

Spring Kafka Producer Configuration

@Configuration
@EnableKafka
public class KafkaProducerConfig {
    
    @Bean
    public ProducerFactory<String, Object> producerFactory() {
        Map<String, Object> config = new HashMap<>();
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        
        // Reliability settings
        config.put(ProducerConfig.ACKS_CONFIG, "all");
        config.put(ProducerConfig.RETRIES_CONFIG, 3);
        config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
        
        return new DefaultKafkaProducerFactory<>(config);
    }
    
    @Bean
    public KafkaTemplate<String, Object> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

Event Producer Service

@Service
public class OrderEventProducer {
    
    private final KafkaTemplate<String, Object> kafkaTemplate;
    private static final String ORDER_TOPIC = "orders";
    
    public void publishOrderCreated(Order order) {
        OrderCreatedEvent event = OrderCreatedEvent.builder()
            .orderId(order.getId())
            .userId(order.getUserId())
            .total(order.getTotal())
            .items(order.getItems())
            .createdAt(Instant.now())
            .build();
        
        kafkaTemplate.send(ORDER_TOPIC, order.getId().toString(), event);
    }
    
    public void publishOrderCancelled(Long orderId) {
        OrderCancelledEvent event = OrderCancelledEvent.builder()
            .orderId(orderId)
            .cancelledAt(Instant.now())
            .build();
        
        kafkaTemplate.send(ORDER_TOPIC, orderId.toString(), event);
    }
}

Event DTOs

@Data
@Builder
public class OrderCreatedEvent {
    private Long orderId;
    private Long userId;
    private BigDecimal total;
    private List<OrderItem> items;
    private Instant createdAt;
}

@Data
@Builder
public class OrderCancelledEvent {
    private Long orderId;
    private Instant cancelledAt;
}

Consuming Events

Spring Kafka Consumer Configuration

@Configuration
@EnableKafka
public class KafkaConsumerConfig {
    
    @Bean
    public ConsumerFactory<String, Object> consumerFactory() {
        Map<String, Object> config = new HashMap<>();
        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        
        // Consumer group
        config.put(ConsumerConfig.GROUP_ID_CONFIG, "notification-service");
        
        // Offset management
        config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        
        return new DefaultKafkaConsumerFactory<>(
            config,
            new StringDeserializer(),
            new JsonDeserializer<>(Object.class)
        );
    }
    
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Object> factory = 
            new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        
        // Manual commit for reliability
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        
        // Concurrency
        factory.setConcurrency(3);
        
        return factory;
    }
}

Event Consumer Service

@Service
public class OrderEventConsumer {
    
    private final NotificationService notificationService;
    
    @KafkaListener(
        topics = "orders",
        groupId = "notification-service",
        containerFactory = "kafkaListenerContainerFactory"
    )
    public void handleOrderCreated(
        OrderCreatedEvent event,
        Acknowledgment acknowledgment
    ) {
        try {
            notificationService.sendOrderConfirmation(event);
            acknowledgment.acknowledge();
        } catch (Exception e) {
            // Handle error - event will be retried
            log.error("Failed to process order created event: {}", event.getOrderId(), e);
        }
    }
    
    @KafkaListener(
        topics = "orders",
        groupId = "inventory-service",
        containerFactory = "kafkaListenerContainerFactory"
    )
    public void handleOrderCreatedForInventory(
        OrderCreatedEvent event,
        Acknowledgment acknowledgment
    ) {
        try {
            inventoryService.reserveItems(event);
            acknowledgment.acknowledge();
        } catch (Exception e) {
            log.error("Failed to reserve inventory for order: {}", event.getOrderId(), e);
        }
    }
}

Error Handling and Retries

Retry Strategy

@Configuration
public class KafkaRetryConfig {
    
    @Bean
    public RetryTemplate retryTemplate() {
        return RetryTemplate.builder()
            .maxAttempts(3)
            .exponentialBackoff(1000, 2, 10000)
            .retryOn(Exception.class)
            .build();
    }
}

Dead Letter Queue

@Configuration
public class KafkaDlqConfig {
    
    @Bean
    public ProducerFactory<String, Object> dlqProducerFactory() {
        Map<String, Object> config = new HashMap<>();
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        
        return new DefaultKafkaProducerFactory<>(config);
    }
    
    @Bean
    public KafkaTemplate<String, Object> dlqKafkaTemplate() {
        return new KafkaTemplate<>(dlqProducerFactory());
    }
}

Event Sourcing

Event Store

@Repository
public interface EventRepository extends JpaRepository<Event, Long> {
    
    List<Event> findByAggregateIdOrderByCreatedAtAsc(Long aggregateId);
    
    @Query("SELECT e FROM Event e WHERE e.aggregateId = :aggregateId AND e.version <= :version")
    List<Event> findEventsUpToVersion(@Param("aggregateId") Long aggregateId, @Param("version") Long version);
}

Aggregate Rebuilder

@Service
public class OrderAggregateService {
    
    private final EventRepository eventRepository;
    
    public Order rebuildOrder(Long orderId) {
        List<Event> events = eventRepository.findByAggregateIdOrderByCreatedAtAsc(orderId);
        Order order = new Order();
        
        for (Event event : events) {
            applyEvent(order, event);
        }
        
        return order;
    }
    
    private void applyEvent(Order order, Event event) {
        if (event.getType() == EventType.ORDER_CREATED) {
            OrderCreatedEvent data = event.getData();
            order.setId(data.getOrderId());
            order.setUserId(data.getUserId());
            order.setTotal(data.getTotal());
        } else if (event.getType() == EventType.ORDER_CANCELLED) {
            order.setStatus(OrderStatus.CANCELLED);
        }
    }
}

Monitoring and Observability

Kafka Metrics

@Configuration
public class KafkaMetricsConfig {
    
    @Bean
    public KafkaAdmin kafkaAdmin() {
        Map<String, Object> config = new HashMap<>();
        config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        return new KafkaAdmin(config);
    }
    
    @Bean
    public MeterBinder kafkaMetricsBinder(KafkaAdmin kafkaAdmin) {
        return new KafkaMetricsBinder(kafkaAdmin);
    }
}

Best Practices

  1. Idempotent Consumers: Handle duplicate events gracefully
  2. Event Versioning: Use schema evolution strategies
  3. Backpressure: Implement rate limiting for consumers
  4. Monitoring: Track lag, throughput, and error rates
  5. Testing: Use embedded Kafka for integration tests

Conclusion

Event-driven architecture with Kafka provides a robust foundation for scalable microservices. By decoupling services through events, you can build systems that are more resilient, scalable, and maintainable.

In future posts, I'll cover CQRS patterns, saga transactions, and advanced Kafka features like exactly-once semantics.