Messaging & Async
Systems

Synchronous request/response works until it doesn't — a slow downstream makes you slow, a failed one makes you fail. Asynchronous messaging decouples producers from consumers, enables event-driven workflows, and is the backbone of every high-throughput production system. This section teaches you how to build reliable async systems with Kafka, RabbitMQ, and Spring's async primitives.

Why Asynchronous Messaging?

In a synchronous system, the caller waits for the callee to finish. This creates temporal coupling — both services must be available at the same time, and slow consumers slow producers. Messaging breaks this coupling.

Sync vs Async: The Fundamental Difference
⟳ Synchronous (REST call)
Producer → waits → Consumer
Producer blocked for 500ms
If consumer fails → producer fails
Consumer scales tied to producer
Tight temporal coupling
⚡ Asynchronous (Message)
Producer → Message Broker → Consumer
Producer returns immediately
Consumer failure doesn't fail producer
Scale consumers independently
Loose temporal coupling

When to Use Messaging

📧
Fire-and-Forget

Email sending, push notifications, audit logging. You don't need to wait for completion. If it's slow or temporarily down, queue it.

🔄
Work Queues

Image processing, video encoding, PDF generation. Expensive operations that should happen off the request thread, with worker scaling.

📡
Fan-Out

One event, many consumers. OrderPlaced → notifications + inventory + analytics + fraud detection. Each handles it independently.

🌊
Event Streaming

High-volume, ordered stream of facts. Click tracking, financial transactions, IoT sensor data. Kafka is built for this scale.

Apache Kafka with Spring Boot

Kafka is a distributed event streaming platform. Unlike traditional message queues, messages are stored durably on disk in ordered logs (topics). Consumers read at their own pace. The same message can be consumed multiple times by different consumer groups — making it ideal for event-driven architectures where multiple systems react to the same events.

Kafka Core Concepts

📁
Topic

A named log of events. Like a database table, but append-only and immutable. order-events, user-signups. Partitioned for parallelism.

🔢
Partition

A topic is split into N partitions. Messages with the same key always go to the same partition — guaranteeing ordering per key. Different partitions process in parallel.

👥
Consumer Group

Multiple consumers sharing the work. Each partition is assigned to exactly one consumer in the group. Add more consumers to scale — up to the partition count.

📍
Offset

Position of a message in a partition. Consumers commit their offset after processing. On restart, they resume from their last committed offset. This is exactly-once semantics territory.

Kafka Topic Partitioning & Consumer Groups
Topic: order-events (3 partitions)
Partition 0
msg1 → msg4 → msg7
Partition 1
msg2 → msg5 → msg8
Partition 2
msg3 → msg6 → msg9
notification-service group
Consumer A → Partition 0
Consumer B → Partition 1
Consumer C → Partition 2
analytics-service group
Consumer X → Partition 0+1
Consumer Y → Partition 2
(same messages, different group)

Kafka Producer & Consumer

application.yml — Kafka configYAML
spring:
  kafka:
    bootstrap-servers: localhost:9092
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
      acks: all          # wait for all replicas to acknowledge
      retries: 3
      properties:
        enable.idempotence: true   # exactly-once producer semantics
        max.in.flight.requests.per.connection: 1

    consumer:
      group-id: order-processing-service
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      auto-offset-reset: earliest  # start from beginning on new group
      enable-auto-commit: false    # manual commit — don't lose messages
      properties:
        spring.json.trusted.packages: "com.company.events"
        max.poll.records: 100      # max messages per poll
        max.poll.interval.ms: 300000  # 5 min max processing time per batch
Kafka ProducerJava
// Event DTO — must be serializable
public record OrderPlacedEvent(
    String orderId,
    String customerId,
    BigDecimal total,
    List<String> productIds,
    Instant occurredAt
) {}

@Service
@RequiredArgsConstructor
public class OrderEventPublisher {

    private final KafkaTemplate<String, OrderPlacedEvent> kafkaTemplate;

    public void publishOrderPlaced(Order order) {
        OrderPlacedEvent event = new OrderPlacedEvent(
            order.getId(), order.getCustomerId(),
            order.getTotal(), order.getProductIds(), Instant.now()
        );

        // Key = customerId ensures all events for the same customer
        // go to the same partition — guaranteeing ordering per customer
        CompletableFuture<SendResult<String, OrderPlacedEvent>> future =
            kafkaTemplate.send("order-events", order.getCustomerId(), event);

        future.whenComplete((result, ex) -> {
            if (ex != null) {
                log.error("Failed to publish OrderPlacedEvent for order {}: {}",
                    order.getId(), ex.getMessage());
                // In production: store to outbox table and retry
            } else {
                log.info("Published OrderPlacedEvent to partition {} offset {}",
                    result.getRecordMetadata().partition(),
                    result.getRecordMetadata().offset());
            }
        });
    }
}
Kafka Consumer with manual commitJava
@Component
@RequiredArgsConstructor
@Slf4j
public class OrderEventConsumer {

    private final NotificationService notificationService;
    private final InventoryService inventoryService;

    @KafkaListener(
        topics = "order-events",
        groupId = "notification-service",
        containerFactory = "kafkaListenerContainerFactory"
    )
    public void onOrderPlaced(
            @Payload OrderPlacedEvent event,
            @Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
            @Header(KafkaHeaders.OFFSET) long offset,
            Acknowledgment ack) {

        log.info("Processing OrderPlacedEvent {} from partition {} offset {}",
            event.orderId(), partition, offset);

        try {
            notificationService.sendOrderConfirmation(event.customerId(), event.orderId());
            // Only commit AFTER successful processing
            // If we crash before this line, the message will be reprocessed on restart
            ack.acknowledge();
        } catch (Exception e) {
            log.error("Failed to process order event {}", event.orderId(), e);
            // Don't acknowledge — message will be reprocessed
            // After max retries, goes to Dead Letter Topic (DLT)
            throw e;
        }
    }

    // Batch consumer — better throughput for bulk operations
    @KafkaListener(topics = "order-events", groupId = "analytics-service")
    public void onOrdersBatch(List<OrderPlacedEvent> events) {
        log.info("Processing batch of {} order events", events.size());
        analyticsService.recordBatch(events);
    }
}
Dead Letter Topic + error handlingJava
@Configuration
public class KafkaConfig {

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Object>
            kafkaListenerContainerFactory(
                ConsumerFactory<String, Object> cf,
                KafkaTemplate<String, Object> template) {

        ConcurrentKafkaListenerContainerFactory<String, Object> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(cf);

        // Manual acknowledgment mode
        factory.getContainerProperties()
            .setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);

        // Retry 3 times with backoff, then send to Dead Letter Topic
        factory.setCommonErrorHandler(new DefaultErrorHandler(
            new DeadLetterPublishingRecoverer(template,
                // DLT topic name: original topic + ".DLT"
                (record, ex) -> new TopicPartition(
                    record.topic() + ".DLT", record.partition())),
            new FixedBackOff(1000L, 3)  // 1 second wait, 3 retries
        ));

        return factory;
    }
}

// Consume from DLT for investigation / manual reprocessing
@KafkaListener(topics = "order-events.DLT", groupId = "dlt-monitor")
public void onDeadLetter(
        @Payload String rawMessage,
        @Header(KafkaHeaders.EXCEPTION_MESSAGE) String errorMessage) {
    log.error("Message landed in DLT: {} | Error: {}", rawMessage, errorMessage);
    alertingService.notifyDeadLetter("order-events", rawMessage, errorMessage);
}

Kafka Production Pitfalls

CRITICAL enable.auto.commit = true (the default)

Auto-commit periodically commits offsets regardless of whether your processing succeeded. If your consumer processes 50 messages, crashes, and auto-commit fires — those 50 messages are permanently skipped. Always use enable-auto-commit: false with AckMode.MANUAL_IMMEDIATE and commit only after successful processing.

HIGH Non-idempotent consumers

Kafka provides at-least-once delivery. Your consumer can and will receive the same message more than once (after restarts, rebalances). If processing the same message twice charges a customer twice or sends two emails — your consumer is not idempotent. Fix: use a processed-message-id table, check before processing. Or design operations that are naturally idempotent (upserts instead of inserts).

HIGH Partition count mismatch

If a topic has 3 partitions and you run 5 consumer instances, 2 instances sit idle. Consumers ≤ partitions is the rule. Plan partitions for your expected max parallelism. Increasing partitions later re-distributes messages — can break per-key ordering guarantees.

MEDIUM max.poll.interval.ms too low

If processing a batch takes longer than max.poll.interval.ms (default 5 min), Kafka assumes the consumer is dead and triggers a rebalance. Your consumer is removed from the group, messages go to another instance — and your "dead" consumer commits, creating duplicate processing. Fix: tune max.poll.interval.ms to be safely above your max processing time, or reduce max.poll.records.

RabbitMQ with Spring Boot

RabbitMQ is a traditional message broker using the AMQP protocol. Unlike Kafka, messages are removed from the queue once consumed. It excels at task queues, request-reply patterns, and complex routing via exchanges. Choose RabbitMQ for task distribution; choose Kafka for event streaming and audit logs.

RabbitMQ Core Concepts

🔀
Exchange

Receives messages from producers and routes them to queues. Exchange types: Direct (exact routing key), Topic (wildcard), Fanout (broadcast to all queues), Headers.

📬
Queue

Buffers messages until consumed. Messages are ACKed and removed after processing. Durable queues survive broker restarts.

🔗
Binding

A rule connecting an exchange to a queue. A topic exchange with binding order.# routes all order.created, order.cancelled messages to that queue.

💀
Dead Letter Exchange

Where rejected or expired messages go. Essential for debugging and manual reprocessing. Configure TTL and DLX per queue for automatic dead-lettering after N failures.

RabbitMQ — complete setupJava
// Queue, Exchange, and Binding declarations
@Configuration
public class RabbitMQConfig {

    public static final String ORDER_EXCHANGE = "order.exchange";
    public static final String NOTIFICATION_QUEUE = "order.notification.queue";
    public static final String NOTIFICATION_DLQ = "order.notification.dlq";

    @Bean
    public TopicExchange orderExchange() {
        return new TopicExchange(ORDER_EXCHANGE, true, false);
    }

    @Bean
    public Queue notificationQueue() {
        return QueueBuilder.durable(NOTIFICATION_QUEUE)
            .withArgument("x-dead-letter-exchange", "")   // default exchange
            .withArgument("x-dead-letter-routing-key", NOTIFICATION_DLQ)
            .withArgument("x-message-ttl", 60000)          // 60s TTL
            .build();
    }

    @Bean
    public Queue notificationDlq() {
        return QueueBuilder.durable(NOTIFICATION_DLQ).build();
    }

    @Bean
    public Binding notificationBinding(Queue notificationQueue, TopicExchange orderExchange) {
        // order.# matches order.created, order.confirmed, order.cancelled
        return BindingBuilder.bind(notificationQueue)
            .to(orderExchange)
            .with("order.#");
    }

    @Bean
    public Jackson2JsonMessageConverter messageConverter(ObjectMapper mapper) {
        return new Jackson2JsonMessageConverter(mapper);
    }
}

// Producer
@Service
@RequiredArgsConstructor
public class OrderEventPublisher {
    private final RabbitTemplate rabbitTemplate;

    public void publishOrderCreated(OrderCreatedEvent event) {
        rabbitTemplate.convertAndSend(
            RabbitMQConfig.ORDER_EXCHANGE,
            "order.created",   // routing key
            event
        );
    }
}

// Consumer
@Component
@RequiredArgsConstructor
public class OrderNotificationConsumer {

    @RabbitListener(queues = RabbitMQConfig.NOTIFICATION_QUEUE)
    public void handleOrderEvent(OrderCreatedEvent event,
                                  Channel channel,
                                  @Header(AmqpHeaders.DELIVERY_TAG) long tag)
            throws IOException {
        try {
            notificationService.send(event.customerId(), event.orderId());
            channel.basicAck(tag, false);   // acknowledge success
        } catch (Exception e) {
            // false = don't requeue, send to DLQ
            channel.basicNack(tag, false, false);
            log.error("Failed to process {}, sent to DLQ", event.orderId(), e);
        }
    }
}

Kafka vs RabbitMQ — Decision Guide

Choose Kafka when…

You need event sourcing / audit log. Multiple independent consumers need the same events. High throughput (>100k msg/s). Message replay after bugs. Long retention (days/weeks).

🐰
Choose RabbitMQ when…

Task queues where each task is done once. Complex routing (per-user queues, priority queues). Request-reply pattern. You need per-message TTL or per-message priority. Simpler operational model.

Spring @Async — Threading Made Simple

@Async is the simplest way to run a method in a background thread. The caller returns immediately; the method executes on a thread pool. Use for fire-and-forget operations that don't need a message broker.

@Async with custom thread poolJava
// Enable async processing
@Configuration
@EnableAsync
public class AsyncConfig {

    @Bean(name = "emailTaskExecutor")
    public Executor emailTaskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(5);
        executor.setMaxPoolSize(20);
        executor.setQueueCapacity(100);
        executor.setThreadNamePrefix("email-async-");
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        executor.initialize();
        return executor;
    }

    // Separate pool for heavy processing tasks
    @Bean(name = "processingTaskExecutor")
    public Executor processingTaskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(2);
        executor.setMaxPoolSize(5);
        executor.setQueueCapacity(50);
        executor.setThreadNamePrefix("processing-async-");
        executor.initialize();
        return executor;
    }
}

@Service
public class NotificationService {

    // Runs in emailTaskExecutor pool — caller doesn't wait
    @Async("emailTaskExecutor")
    public CompletableFuture<Void> sendWelcomeEmail(String email) {
        try {
            emailClient.send(email, "Welcome!", buildWelcomeTemplate(email));
            return CompletableFuture.completedFuture(null);
        } catch (Exception e) {
            log.error("Failed to send welcome email to {}", email, e);
            return CompletableFuture.failedFuture(e);
        }
    }

    // Caller can wait for the result if needed
    @Async("processingTaskExecutor")
    public CompletableFuture<ReportDto> generateReport(ReportRequest req) {
        ReportDto report = heavyReportService.generate(req);
        return CompletableFuture.completedFuture(report);
    }
}

// Calling code
@PostMapping("/api/users/register")
public ResponseEntity<UserDto> register(@RequestBody RegisterRequest req) {
    User user = userService.create(req);
    // Fire and forget — email sent in background, response is immediate
    notificationService.sendWelcomeEmail(user.getEmail());
    return ResponseEntity.status(201).body(toDto(user));
}
@Async pitfalls: (1) Calling @Async from the same class bypasses the proxy — the method runs synchronously. Always call from a different bean. (2) Exceptions thrown from @Async void methods are silently swallowed unless you configure an AsyncUncaughtExceptionHandler. Always return CompletableFuture to propagate errors. (3) @Transactional context is NOT inherited by the async thread — each async execution needs its own transaction.

Scheduled Jobs with @Scheduled

Scheduled jobs are essential for recurring work: cleanup tasks, report generation, cache warming, data synchronization. Spring's @Scheduled is simple for single-instance apps but requires careful handling in clustered environments.

@Scheduled — all trigger typesJava
@Configuration
@EnableScheduling
public class SchedulingConfig {
    @Bean
    public TaskScheduler taskScheduler() {
        ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
        scheduler.setPoolSize(5);  // pool handles all @Scheduled methods
        scheduler.setThreadNamePrefix("scheduled-");
        return scheduler;
    }
}

@Component
@Slf4j
public class MaintenanceJobs {

    // Fixed rate: runs every 30s regardless of how long the previous run took
    @Scheduled(fixedRate = 30000)
    public void pollExternalApi() {
        externalApiSyncService.syncPendingOrders();
    }

    // Fixed delay: 10s AFTER the previous execution finishes
    @Scheduled(fixedDelay = 10000)
    public void cleanExpiredSessions() {
        int deleted = sessionRepo.deleteExpiredBefore(Instant.now());
        log.debug("Cleaned {} expired sessions", deleted);
    }

    // Cron: every day at 2:30 AM
    @Scheduled(cron = "0 30 2 * * *")
    public void generateDailyReport() {
        log.info("Starting daily report generation");
        reportService.generateAndEmail();
    }

    // Cron with timezone
    @Scheduled(cron = "0 0 9 * * MON-FRI", zone = "America/New_York")
    public void sendBusinessSummary() {
        emailService.sendDailySummary();
    }

    // First run delayed by 5 seconds after startup
    @Scheduled(initialDelay = 5000, fixedDelay = 60000)
    public void warmupCache() {
        cacheWarmupService.warmProductCache();
    }
}

Distributed Scheduling with ShedLock

In a multi-instance deployment (Kubernetes with 3 replicas), @Scheduled runs on ALL instances simultaneously. For jobs that must run exactly once (email digests, financial reports), use ShedLock to coordinate via the database.

ShedLock — one-instance-only scheduled jobsJava
// pom.xml: net.javacrumbs.shedlock:shedlock-spring
// + net.javacrumbs.shedlock:shedlock-provider-jdbc-template

// DB table (create once):
// CREATE TABLE shedlock(name VARCHAR(64) NOT NULL, lock_until TIMESTAMP NOT NULL,
//   locked_at TIMESTAMP NOT NULL, locked_by VARCHAR(255) NOT NULL, PRIMARY KEY (name));

@Configuration
@EnableSchedulerLock(defaultLockAtMostFor = "PT10M")
public class ShedLockConfig {
    @Bean
    public LockProvider lockProvider(DataSource ds) {
        return new JdbcTemplateLockProvider(
            JdbcTemplateLockProvider.Configuration.builder()
                .withJdbcTemplate(new JdbcTemplate(ds))
                .usingDbTime()  // use DB time to avoid clock skew between instances
                .build());
    }
}

@Component
public class ClusteredJobs {

    // Only ONE instance acquires the lock and runs the job
    // Other instances skip. Lock released after completion or after 10 min.
    @Scheduled(cron = "0 0 8 * * *")
    @SchedulerLock(
        name = "sendMorningDigest",
        lockAtLeastFor = "PT5M",    // keep lock for at least 5 min (prevent double-run on fast execution)
        lockAtMostFor = "PT30M"     // force release after 30 min (prevent stuck locks)
    )
    public void sendMorningDigest() {
        emailService.sendDailyDigest();
    }
}

WebSockets — Real-Time Bidirectional Communication

HTTP is request-response — the client must ask for updates. WebSocket establishes a persistent connection allowing the server to push data to the client at any time. Use for: live dashboards, collaborative editing, chat, notifications, live prices.

WebSocket with STOMPJava
// pom.xml: spring-boot-starter-websocket

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

    @Override
    public void configureMessageBroker(MessageBrokerRegistry config) {
        // In-memory broker for pub/sub
        config.enableSimpleBroker("/topic", "/queue");
        // Prefix for messages from client to server (@MessageMapping)
        config.setApplicationDestinationPrefixes("/app");
        // Prefix for user-specific queues (/user/{userId}/queue/...)
        config.setUserDestinationPrefix("/user");
    }

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/ws")
            .setAllowedOriginPatterns("https://*.yourdomain.com")
            .withSockJS();  // fallback for browsers without WebSocket support
    }
}

// Controller — handles messages from clients
@Controller
@RequiredArgsConstructor
public class ChatController {
    private final SimpMessagingTemplate messaging;

    // Client sends to /app/chat.send
    @MessageMapping("/chat.send")
    @SendTo("/topic/chat.global")   // broadcast to all subscribers
    public ChatMessage handleMessage(@Payload ChatMessage message,
                                      Principal principal) {
        message.setSender(principal.getName());
        return message;
    }

    // Push to a specific user — client subscribes to /user/queue/notifications
    public void pushNotification(String userId, NotificationDto notification) {
        messaging.convertAndSendToUser(userId, "/queue/notifications", notification);
    }
}

// From a service — push updates to all subscribers of an order
@Service
@RequiredArgsConstructor
public class OrderStatusPushService {
    private final SimpMessagingTemplate messaging;

    public void notifyOrderStatusChange(Order order) {
        messaging.convertAndSend(
            "/topic/orders." + order.getId(),
            new OrderStatusUpdate(order.getId(), order.getStatus())
        );
    }
}
WebSocket scaling challenge: Each WebSocket connection is pinned to a specific server instance. If you have 3 instances and a user connects to instance A, a message published on instance B won't reach them. Fix: use an external message broker (RabbitMQ STOMP plugin or Redis pub/sub) instead of the in-memory broker. The external broker acts as a shared backplane — all instances pub/sub through it.

Server-Sent Events (SSE)

SSE is simpler than WebSockets — it's a one-way channel from server to client over standard HTTP. The client opens a connection and the server streams events. Perfect for: notifications, live feeds, progress updates, monitoring dashboards. No special protocol, proxies handle it naturally, and browsers reconnect automatically.

SSE endpoint with SseEmitterJava
@RestController
@RequiredArgsConstructor
public class NotificationController {

    // Store emitters per user (in production use Redis for multi-instance)
    private final Map<String, SseEmitter> emitters = new ConcurrentHashMap<>();

    // Client connects: GET /api/notifications/stream
    @GetMapping(value = "/api/notifications/stream",
                produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public SseEmitter streamNotifications(Principal principal) {
        String userId = principal.getName();

        SseEmitter emitter = new SseEmitter(Long.MAX_VALUE); // no timeout
        emitters.put(userId, emitter);

        emitter.onCompletion(() -> emitters.remove(userId));
        emitter.onTimeout(() -> emitters.remove(userId));
        emitter.onError(e -> emitters.remove(userId));

        // Send initial connection event
        try {
            emitter.send(SseEmitter.event()
                .name("connected")
                .data("Notifications stream connected for " + userId));
        } catch (IOException e) {
            emitters.remove(userId);
        }

        return emitter;
    }

    // Push notification from anywhere in the application
    public void pushToUser(String userId, NotificationDto notification) {
        SseEmitter emitter = emitters.get(userId);
        if (emitter != null) {
            try {
                emitter.send(SseEmitter.event()
                    .name("notification")
                    .id(notification.getId())
                    .data(notification)
                    .reconnectTime(3000));  // client reconnects after 3s on disconnect
            } catch (IOException e) {
                emitters.remove(userId);
            }
        }
    }
}

// Reactive alternative — Spring WebFlux with Flux
@GetMapping(value = "/api/orders/{id}/progress",
            produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<OrderProgress>> streamOrderProgress(
        @PathVariable String id) {
    return Flux.interval(Duration.ofSeconds(1))
        .map(seq -> ServerSentEvent.<OrderProgress>builder()
            .id(String.valueOf(seq))
            .event("progress")
            .data(orderService.getProgress(id))
            .build())
        .takeUntil(event -> event.data().isComplete());
}

Idempotency & Exactly-Once Processing

In distributed systems, messages can be delivered more than once. Network retries, consumer restarts, and broker rebalances all cause duplicate delivery. Your consumers must handle duplicates safely — idempotent consumers produce the same result regardless of how many times they process the same message.

Idempotent consumer with deduplication tableJava
@Entity
@Table(name = "processed_messages")
public class ProcessedMessage {
    @Id private String messageId;
    private String topic;
    private Instant processedAt;
}

@Component
@RequiredArgsConstructor
@Transactional
public class IdempotentOrderConsumer {

    private final ProcessedMessageRepo processedRepo;
    private final OrderService orderService;

    @KafkaListener(topics = "order-commands")
    public void handleOrderCommand(
            @Payload CreateOrderCommand cmd,
            @Header(KafkaHeaders.RECORD_METADATA) RecordMetadata meta,
            Acknowledgment ack) {

        // Build a stable message ID from topic + partition + offset
        String messageId = meta.topic() + "-" + meta.partition() + "-" + meta.offset();

        // Check if already processed
        if (processedRepo.existsById(messageId)) {
            log.info("Duplicate message {} — skipping", messageId);
            ack.acknowledge();
            return;
        }

        // Process
        orderService.createOrder(cmd);

        // Mark as processed (in same transaction as the domain write)
        processedRepo.save(new ProcessedMessage(messageId, meta.topic(), Instant.now()));

        ack.acknowledge();
    }
}
Knowledge Check
Your Kafka consumer sends an email for each OrderPlacedEvent. Due to a consumer group rebalance during deployment, some messages are redelivered. Users report receiving duplicate order confirmation emails. What is the correct fix?
Switch to enable.auto.commit=true — Kafka will track which messages were processed
Increase max.poll.interval.ms — this prevents rebalances during processing
Make the consumer idempotent: track a sent-email record keyed by orderId in the database; skip sending if the record already exists for that orderId
Use exactly-once semantics (EOS) in Kafka — duplicate messages will never be delivered
Correct. Kafka provides at-least-once delivery by default — duplicate delivery is normal and expected during rebalances and consumer restarts. Auto-commit doesn't solve this (it makes it worse by potentially losing messages). max.poll.interval.ms controls timeout, not rebalance prevention. Exactly-once Kafka semantics (EOS) only applies to produce-consume loops within Kafka itself — external side effects like emails are outside Kafka's transaction boundary and can still duplicate. The correct solution is an idempotent consumer: before sending the email, check if you've already sent one for this orderId. If yes, skip. This makes the operation safe to run any number of times.

Interview Questions

Q: How does Kafka guarantee message ordering? What are the trade-offs?
Kafka guarantees ordering within a single partition — messages are appended in order and consumed in order. Across partitions, there is no ordering guarantee. A topic with 6 partitions processes messages in parallel — six different consumers reading simultaneously in any interleaved order. The mechanism for preserving logical ordering: use a message key. All messages with the same key (e.g., customerId) are routed to the same partition by the default hash partitioner. So all events for customer 123 are in the same partition, in order. Trade-off: high-volume customers create hot partitions. If one customer generates 80% of events, their partition becomes a bottleneck while others are underutilized. Solutions: key by a more distributed attribute, or implement custom partitioners.
Q: What is the difference between at-most-once, at-least-once, and exactly-once delivery semantics?
At-most-once: messages are delivered zero or one time. No retries. Suitable for metrics and logs where loss is acceptable. Faster. At-least-once: messages are delivered one or more times. Retries on failure guarantee delivery, but duplicates are possible. Suitable for most business events — use idempotent consumers to handle duplicates. Exactly-once: each message is processed exactly one time. Technically achievable within Kafka's ecosystem using idempotent producers and transactional consumers, but only for Kafka-to-Kafka operations. For external effects (DB writes, emails, REST calls), exactly-once requires 2-phase commit or idempotency at the application level. Exactly-once is expensive and complex — most production systems use at-least-once with idempotent consumers.
Q: What is a Dead Letter Queue/Topic and how should you handle messages that land in it?
A Dead Letter Queue (DLQ) or Dead Letter Topic (DLT) receives messages that couldn't be processed after exhausting retries. Messages end up there due to: deserialization errors, business validation failures, external service unavailability, or bugs in consumer logic. Production DLQ strategy: (1) Alert on DLQ depth — any message in DLQ needs investigation; (2) Store enough context to understand why it failed (original topic, error message, timestamp, retry count); (3) Never silently ignore DLQ messages — they represent data that wasn't processed; (4) After fixing the root cause, manually replay from DLQ; (5) Design a replay endpoint in your service for DLQ reprocessing. DLQs are a safety net, not a normal flow. If your DLQ is getting regular traffic, fix the consumer, not the DLQ strategy.
Q: When would you choose RabbitMQ over Kafka for a new project?
Choose RabbitMQ when: you need task queues where each task is done by exactly one worker; you need complex routing (different queues for different message types, priority queues, per-user queues); you need request-reply patterns; messages should be deleted after consumption; operational simplicity matters more than throughput. Choose Kafka when: multiple independent services need to react to the same events; you need message replay (replay the last week of order events); high throughput is required (>100k msg/s); messages serve as an audit log; event sourcing. In practice: RabbitMQ for internal task distribution, Kafka for cross-service event streaming in microservices.
Q: How do you ensure a WebSocket application works with multiple server instances?
WebSocket connections are stateful — each connection is pinned to a specific server instance. If you have 3 instances and user A connects to instance 1, a message broadcast from instance 2 won't reach user A. Solution: use an external message broker as a shared backplane. For Spring's STOMP-based WebSocket, configure a RabbitMQ relay (or Redis pub/sub) instead of the in-memory broker. When instance 2 broadcasts a message, it publishes to RabbitMQ. RabbitMQ delivers it to all instances' subscribed listeners. All instances then forward to their locally connected WebSocket clients. This makes the architecture stateless from the load balancer's perspective — any instance can broadcast to any user.
Q: What happens if two instances of your Spring Boot app both run a @Scheduled job simultaneously?
@Scheduled runs on every JVM that has it configured — in a 3-replica Kubernetes deployment, it runs 3 times simultaneously. For idempotent jobs (cache warming, cleanup of already-expired records), this may be acceptable. For jobs with side effects (sending emails, generating financial reports, charging customers), it's a critical bug. Solutions: (1) ShedLock — a database-backed distributed lock that ensures only one instance runs the job at a time. The first instance to acquire the lock runs; others skip silently; (2) Spring Batch with cluster-safe job runner; (3) Quartz Scheduler with a JDBC job store — stores job state in DB, coordinates across instances. ShedLock is the simplest and most widely used for Spring Boot applications.
Q: Explain the @Async proxy limitation. Why do self-invocation calls not work asynchronously?
Spring's @Async (and @Transactional, @Cacheable, @PreAuthorize) works via AOP proxy. When you inject a bean and call its method, you're calling the proxy, which intercepts the call, starts a new thread (for @Async), then calls the real method. Self-invocation bypasses the proxy: this.asyncMethod() is a direct Java call to the same object — the proxy is not involved, the method runs synchronously on the calling thread. Fix: inject the bean into itself (@Autowired ApplicationContext ctx; ctx.getBean(MyService.class).asyncMethod()), or extract the async method to a separate bean. The same limitation applies to @Transactional — calling a transactional method from within the same class won't start a new transaction because the proxy is bypassed.
📨

Section 08 Complete

You now understand the full async toolkit — Kafka's partition model and delivery guarantees, RabbitMQ's exchange/queue routing, Spring's @Async threading model, distributed scheduling with ShedLock, WebSocket scaling challenges, and how to build idempotent consumers that handle duplicate delivery safely.