Messaging & Async
Systems
Synchronous request/response works until it doesn't — a slow downstream makes you slow, a failed one makes you fail. Asynchronous messaging decouples producers from consumers, enables event-driven workflows, and is the backbone of every high-throughput production system. This section teaches you how to build reliable async systems with Kafka, RabbitMQ, and Spring's async primitives.
Why Asynchronous Messaging?
In a synchronous system, the caller waits for the callee to finish. This creates temporal coupling — both services must be available at the same time, and slow consumers slow producers. Messaging breaks this coupling.
When to Use Messaging
Email sending, push notifications, audit logging. You don't need to wait for completion. If it's slow or temporarily down, queue it.
Image processing, video encoding, PDF generation. Expensive operations that should happen off the request thread, with worker scaling.
One event, many consumers. OrderPlaced → notifications + inventory + analytics + fraud detection. Each handles it independently.
High-volume, ordered stream of facts. Click tracking, financial transactions, IoT sensor data. Kafka is built for this scale.
Apache Kafka with Spring Boot
Kafka is a distributed event streaming platform. Unlike traditional message queues, messages are stored durably on disk in ordered logs (topics). Consumers read at their own pace. The same message can be consumed multiple times by different consumer groups — making it ideal for event-driven architectures where multiple systems react to the same events.
Kafka Core Concepts
A named log of events. Like a database table, but append-only and immutable. order-events, user-signups. Partitioned for parallelism.
A topic is split into N partitions. Messages with the same key always go to the same partition — guaranteeing ordering per key. Different partitions process in parallel.
Multiple consumers sharing the work. Each partition is assigned to exactly one consumer in the group. Add more consumers to scale — up to the partition count.
Position of a message in a partition. Consumers commit their offset after processing. On restart, they resume from their last committed offset. This is exactly-once semantics territory.
Kafka Producer & Consumer
spring:
kafka:
bootstrap-servers: localhost:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
acks: all # wait for all replicas to acknowledge
retries: 3
properties:
enable.idempotence: true # exactly-once producer semantics
max.in.flight.requests.per.connection: 1
consumer:
group-id: order-processing-service
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
auto-offset-reset: earliest # start from beginning on new group
enable-auto-commit: false # manual commit — don't lose messages
properties:
spring.json.trusted.packages: "com.company.events"
max.poll.records: 100 # max messages per poll
max.poll.interval.ms: 300000 # 5 min max processing time per batch
// Event DTO — must be serializable
public record OrderPlacedEvent(
String orderId,
String customerId,
BigDecimal total,
List<String> productIds,
Instant occurredAt
) {}
@Service
@RequiredArgsConstructor
public class OrderEventPublisher {
private final KafkaTemplate<String, OrderPlacedEvent> kafkaTemplate;
public void publishOrderPlaced(Order order) {
OrderPlacedEvent event = new OrderPlacedEvent(
order.getId(), order.getCustomerId(),
order.getTotal(), order.getProductIds(), Instant.now()
);
// Key = customerId ensures all events for the same customer
// go to the same partition — guaranteeing ordering per customer
CompletableFuture<SendResult<String, OrderPlacedEvent>> future =
kafkaTemplate.send("order-events", order.getCustomerId(), event);
future.whenComplete((result, ex) -> {
if (ex != null) {
log.error("Failed to publish OrderPlacedEvent for order {}: {}",
order.getId(), ex.getMessage());
// In production: store to outbox table and retry
} else {
log.info("Published OrderPlacedEvent to partition {} offset {}",
result.getRecordMetadata().partition(),
result.getRecordMetadata().offset());
}
});
}
}
@Component
@RequiredArgsConstructor
@Slf4j
public class OrderEventConsumer {
private final NotificationService notificationService;
private final InventoryService inventoryService;
@KafkaListener(
topics = "order-events",
groupId = "notification-service",
containerFactory = "kafkaListenerContainerFactory"
)
public void onOrderPlaced(
@Payload OrderPlacedEvent event,
@Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
@Header(KafkaHeaders.OFFSET) long offset,
Acknowledgment ack) {
log.info("Processing OrderPlacedEvent {} from partition {} offset {}",
event.orderId(), partition, offset);
try {
notificationService.sendOrderConfirmation(event.customerId(), event.orderId());
// Only commit AFTER successful processing
// If we crash before this line, the message will be reprocessed on restart
ack.acknowledge();
} catch (Exception e) {
log.error("Failed to process order event {}", event.orderId(), e);
// Don't acknowledge — message will be reprocessed
// After max retries, goes to Dead Letter Topic (DLT)
throw e;
}
}
// Batch consumer — better throughput for bulk operations
@KafkaListener(topics = "order-events", groupId = "analytics-service")
public void onOrdersBatch(List<OrderPlacedEvent> events) {
log.info("Processing batch of {} order events", events.size());
analyticsService.recordBatch(events);
}
}
@Configuration
public class KafkaConfig {
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object>
kafkaListenerContainerFactory(
ConsumerFactory<String, Object> cf,
KafkaTemplate<String, Object> template) {
ConcurrentKafkaListenerContainerFactory<String, Object> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(cf);
// Manual acknowledgment mode
factory.getContainerProperties()
.setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
// Retry 3 times with backoff, then send to Dead Letter Topic
factory.setCommonErrorHandler(new DefaultErrorHandler(
new DeadLetterPublishingRecoverer(template,
// DLT topic name: original topic + ".DLT"
(record, ex) -> new TopicPartition(
record.topic() + ".DLT", record.partition())),
new FixedBackOff(1000L, 3) // 1 second wait, 3 retries
));
return factory;
}
}
// Consume from DLT for investigation / manual reprocessing
@KafkaListener(topics = "order-events.DLT", groupId = "dlt-monitor")
public void onDeadLetter(
@Payload String rawMessage,
@Header(KafkaHeaders.EXCEPTION_MESSAGE) String errorMessage) {
log.error("Message landed in DLT: {} | Error: {}", rawMessage, errorMessage);
alertingService.notifyDeadLetter("order-events", rawMessage, errorMessage);
}
Kafka Production Pitfalls
Auto-commit periodically commits offsets regardless of whether your processing succeeded. If your consumer processes 50 messages, crashes, and auto-commit fires — those 50 messages are permanently skipped. Always use enable-auto-commit: false with AckMode.MANUAL_IMMEDIATE and commit only after successful processing.
Kafka provides at-least-once delivery. Your consumer can and will receive the same message more than once (after restarts, rebalances). If processing the same message twice charges a customer twice or sends two emails — your consumer is not idempotent. Fix: use a processed-message-id table, check before processing. Or design operations that are naturally idempotent (upserts instead of inserts).
If a topic has 3 partitions and you run 5 consumer instances, 2 instances sit idle. Consumers ≤ partitions is the rule. Plan partitions for your expected max parallelism. Increasing partitions later re-distributes messages — can break per-key ordering guarantees.
If processing a batch takes longer than max.poll.interval.ms (default 5 min), Kafka assumes the consumer is dead and triggers a rebalance. Your consumer is removed from the group, messages go to another instance — and your "dead" consumer commits, creating duplicate processing. Fix: tune max.poll.interval.ms to be safely above your max processing time, or reduce max.poll.records.
RabbitMQ with Spring Boot
RabbitMQ is a traditional message broker using the AMQP protocol. Unlike Kafka, messages are removed from the queue once consumed. It excels at task queues, request-reply patterns, and complex routing via exchanges. Choose RabbitMQ for task distribution; choose Kafka for event streaming and audit logs.
RabbitMQ Core Concepts
Receives messages from producers and routes them to queues. Exchange types: Direct (exact routing key), Topic (wildcard), Fanout (broadcast to all queues), Headers.
Buffers messages until consumed. Messages are ACKed and removed after processing. Durable queues survive broker restarts.
A rule connecting an exchange to a queue. A topic exchange with binding order.# routes all order.created, order.cancelled messages to that queue.
Where rejected or expired messages go. Essential for debugging and manual reprocessing. Configure TTL and DLX per queue for automatic dead-lettering after N failures.
// Queue, Exchange, and Binding declarations
@Configuration
public class RabbitMQConfig {
public static final String ORDER_EXCHANGE = "order.exchange";
public static final String NOTIFICATION_QUEUE = "order.notification.queue";
public static final String NOTIFICATION_DLQ = "order.notification.dlq";
@Bean
public TopicExchange orderExchange() {
return new TopicExchange(ORDER_EXCHANGE, true, false);
}
@Bean
public Queue notificationQueue() {
return QueueBuilder.durable(NOTIFICATION_QUEUE)
.withArgument("x-dead-letter-exchange", "") // default exchange
.withArgument("x-dead-letter-routing-key", NOTIFICATION_DLQ)
.withArgument("x-message-ttl", 60000) // 60s TTL
.build();
}
@Bean
public Queue notificationDlq() {
return QueueBuilder.durable(NOTIFICATION_DLQ).build();
}
@Bean
public Binding notificationBinding(Queue notificationQueue, TopicExchange orderExchange) {
// order.# matches order.created, order.confirmed, order.cancelled
return BindingBuilder.bind(notificationQueue)
.to(orderExchange)
.with("order.#");
}
@Bean
public Jackson2JsonMessageConverter messageConverter(ObjectMapper mapper) {
return new Jackson2JsonMessageConverter(mapper);
}
}
// Producer
@Service
@RequiredArgsConstructor
public class OrderEventPublisher {
private final RabbitTemplate rabbitTemplate;
public void publishOrderCreated(OrderCreatedEvent event) {
rabbitTemplate.convertAndSend(
RabbitMQConfig.ORDER_EXCHANGE,
"order.created", // routing key
event
);
}
}
// Consumer
@Component
@RequiredArgsConstructor
public class OrderNotificationConsumer {
@RabbitListener(queues = RabbitMQConfig.NOTIFICATION_QUEUE)
public void handleOrderEvent(OrderCreatedEvent event,
Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) long tag)
throws IOException {
try {
notificationService.send(event.customerId(), event.orderId());
channel.basicAck(tag, false); // acknowledge success
} catch (Exception e) {
// false = don't requeue, send to DLQ
channel.basicNack(tag, false, false);
log.error("Failed to process {}, sent to DLQ", event.orderId(), e);
}
}
}
Kafka vs RabbitMQ — Decision Guide
You need event sourcing / audit log. Multiple independent consumers need the same events. High throughput (>100k msg/s). Message replay after bugs. Long retention (days/weeks).
Task queues where each task is done once. Complex routing (per-user queues, priority queues). Request-reply pattern. You need per-message TTL or per-message priority. Simpler operational model.
Spring @Async — Threading Made Simple
@Async is the simplest way to run a method in a background thread. The caller returns immediately; the method executes on a thread pool. Use for fire-and-forget operations that don't need a message broker.
// Enable async processing
@Configuration
@EnableAsync
public class AsyncConfig {
@Bean(name = "emailTaskExecutor")
public Executor emailTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5);
executor.setMaxPoolSize(20);
executor.setQueueCapacity(100);
executor.setThreadNamePrefix("email-async-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}
// Separate pool for heavy processing tasks
@Bean(name = "processingTaskExecutor")
public Executor processingTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(2);
executor.setMaxPoolSize(5);
executor.setQueueCapacity(50);
executor.setThreadNamePrefix("processing-async-");
executor.initialize();
return executor;
}
}
@Service
public class NotificationService {
// Runs in emailTaskExecutor pool — caller doesn't wait
@Async("emailTaskExecutor")
public CompletableFuture<Void> sendWelcomeEmail(String email) {
try {
emailClient.send(email, "Welcome!", buildWelcomeTemplate(email));
return CompletableFuture.completedFuture(null);
} catch (Exception e) {
log.error("Failed to send welcome email to {}", email, e);
return CompletableFuture.failedFuture(e);
}
}
// Caller can wait for the result if needed
@Async("processingTaskExecutor")
public CompletableFuture<ReportDto> generateReport(ReportRequest req) {
ReportDto report = heavyReportService.generate(req);
return CompletableFuture.completedFuture(report);
}
}
// Calling code
@PostMapping("/api/users/register")
public ResponseEntity<UserDto> register(@RequestBody RegisterRequest req) {
User user = userService.create(req);
// Fire and forget — email sent in background, response is immediate
notificationService.sendWelcomeEmail(user.getEmail());
return ResponseEntity.status(201).body(toDto(user));
}
@Async from the same class bypasses the proxy — the method runs synchronously. Always call from a different bean. (2) Exceptions thrown from @Async void methods are silently swallowed unless you configure an AsyncUncaughtExceptionHandler. Always return CompletableFuture to propagate errors. (3) @Transactional context is NOT inherited by the async thread — each async execution needs its own transaction.
Scheduled Jobs with @Scheduled
Scheduled jobs are essential for recurring work: cleanup tasks, report generation, cache warming, data synchronization. Spring's @Scheduled is simple for single-instance apps but requires careful handling in clustered environments.
@Configuration
@EnableScheduling
public class SchedulingConfig {
@Bean
public TaskScheduler taskScheduler() {
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
scheduler.setPoolSize(5); // pool handles all @Scheduled methods
scheduler.setThreadNamePrefix("scheduled-");
return scheduler;
}
}
@Component
@Slf4j
public class MaintenanceJobs {
// Fixed rate: runs every 30s regardless of how long the previous run took
@Scheduled(fixedRate = 30000)
public void pollExternalApi() {
externalApiSyncService.syncPendingOrders();
}
// Fixed delay: 10s AFTER the previous execution finishes
@Scheduled(fixedDelay = 10000)
public void cleanExpiredSessions() {
int deleted = sessionRepo.deleteExpiredBefore(Instant.now());
log.debug("Cleaned {} expired sessions", deleted);
}
// Cron: every day at 2:30 AM
@Scheduled(cron = "0 30 2 * * *")
public void generateDailyReport() {
log.info("Starting daily report generation");
reportService.generateAndEmail();
}
// Cron with timezone
@Scheduled(cron = "0 0 9 * * MON-FRI", zone = "America/New_York")
public void sendBusinessSummary() {
emailService.sendDailySummary();
}
// First run delayed by 5 seconds after startup
@Scheduled(initialDelay = 5000, fixedDelay = 60000)
public void warmupCache() {
cacheWarmupService.warmProductCache();
}
}
Distributed Scheduling with ShedLock
In a multi-instance deployment (Kubernetes with 3 replicas), @Scheduled runs on ALL instances simultaneously. For jobs that must run exactly once (email digests, financial reports), use ShedLock to coordinate via the database.
// pom.xml: net.javacrumbs.shedlock:shedlock-spring
// + net.javacrumbs.shedlock:shedlock-provider-jdbc-template
// DB table (create once):
// CREATE TABLE shedlock(name VARCHAR(64) NOT NULL, lock_until TIMESTAMP NOT NULL,
// locked_at TIMESTAMP NOT NULL, locked_by VARCHAR(255) NOT NULL, PRIMARY KEY (name));
@Configuration
@EnableSchedulerLock(defaultLockAtMostFor = "PT10M")
public class ShedLockConfig {
@Bean
public LockProvider lockProvider(DataSource ds) {
return new JdbcTemplateLockProvider(
JdbcTemplateLockProvider.Configuration.builder()
.withJdbcTemplate(new JdbcTemplate(ds))
.usingDbTime() // use DB time to avoid clock skew between instances
.build());
}
}
@Component
public class ClusteredJobs {
// Only ONE instance acquires the lock and runs the job
// Other instances skip. Lock released after completion or after 10 min.
@Scheduled(cron = "0 0 8 * * *")
@SchedulerLock(
name = "sendMorningDigest",
lockAtLeastFor = "PT5M", // keep lock for at least 5 min (prevent double-run on fast execution)
lockAtMostFor = "PT30M" // force release after 30 min (prevent stuck locks)
)
public void sendMorningDigest() {
emailService.sendDailyDigest();
}
}
WebSockets — Real-Time Bidirectional Communication
HTTP is request-response — the client must ask for updates. WebSocket establishes a persistent connection allowing the server to push data to the client at any time. Use for: live dashboards, collaborative editing, chat, notifications, live prices.
// pom.xml: spring-boot-starter-websocket
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
// In-memory broker for pub/sub
config.enableSimpleBroker("/topic", "/queue");
// Prefix for messages from client to server (@MessageMapping)
config.setApplicationDestinationPrefixes("/app");
// Prefix for user-specific queues (/user/{userId}/queue/...)
config.setUserDestinationPrefix("/user");
}
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/ws")
.setAllowedOriginPatterns("https://*.yourdomain.com")
.withSockJS(); // fallback for browsers without WebSocket support
}
}
// Controller — handles messages from clients
@Controller
@RequiredArgsConstructor
public class ChatController {
private final SimpMessagingTemplate messaging;
// Client sends to /app/chat.send
@MessageMapping("/chat.send")
@SendTo("/topic/chat.global") // broadcast to all subscribers
public ChatMessage handleMessage(@Payload ChatMessage message,
Principal principal) {
message.setSender(principal.getName());
return message;
}
// Push to a specific user — client subscribes to /user/queue/notifications
public void pushNotification(String userId, NotificationDto notification) {
messaging.convertAndSendToUser(userId, "/queue/notifications", notification);
}
}
// From a service — push updates to all subscribers of an order
@Service
@RequiredArgsConstructor
public class OrderStatusPushService {
private final SimpMessagingTemplate messaging;
public void notifyOrderStatusChange(Order order) {
messaging.convertAndSend(
"/topic/orders." + order.getId(),
new OrderStatusUpdate(order.getId(), order.getStatus())
);
}
}
Server-Sent Events (SSE)
SSE is simpler than WebSockets — it's a one-way channel from server to client over standard HTTP. The client opens a connection and the server streams events. Perfect for: notifications, live feeds, progress updates, monitoring dashboards. No special protocol, proxies handle it naturally, and browsers reconnect automatically.
@RestController
@RequiredArgsConstructor
public class NotificationController {
// Store emitters per user (in production use Redis for multi-instance)
private final Map<String, SseEmitter> emitters = new ConcurrentHashMap<>();
// Client connects: GET /api/notifications/stream
@GetMapping(value = "/api/notifications/stream",
produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public SseEmitter streamNotifications(Principal principal) {
String userId = principal.getName();
SseEmitter emitter = new SseEmitter(Long.MAX_VALUE); // no timeout
emitters.put(userId, emitter);
emitter.onCompletion(() -> emitters.remove(userId));
emitter.onTimeout(() -> emitters.remove(userId));
emitter.onError(e -> emitters.remove(userId));
// Send initial connection event
try {
emitter.send(SseEmitter.event()
.name("connected")
.data("Notifications stream connected for " + userId));
} catch (IOException e) {
emitters.remove(userId);
}
return emitter;
}
// Push notification from anywhere in the application
public void pushToUser(String userId, NotificationDto notification) {
SseEmitter emitter = emitters.get(userId);
if (emitter != null) {
try {
emitter.send(SseEmitter.event()
.name("notification")
.id(notification.getId())
.data(notification)
.reconnectTime(3000)); // client reconnects after 3s on disconnect
} catch (IOException e) {
emitters.remove(userId);
}
}
}
}
// Reactive alternative — Spring WebFlux with Flux
@GetMapping(value = "/api/orders/{id}/progress",
produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<OrderProgress>> streamOrderProgress(
@PathVariable String id) {
return Flux.interval(Duration.ofSeconds(1))
.map(seq -> ServerSentEvent.<OrderProgress>builder()
.id(String.valueOf(seq))
.event("progress")
.data(orderService.getProgress(id))
.build())
.takeUntil(event -> event.data().isComplete());
}
Idempotency & Exactly-Once Processing
In distributed systems, messages can be delivered more than once. Network retries, consumer restarts, and broker rebalances all cause duplicate delivery. Your consumers must handle duplicates safely — idempotent consumers produce the same result regardless of how many times they process the same message.
@Entity
@Table(name = "processed_messages")
public class ProcessedMessage {
@Id private String messageId;
private String topic;
private Instant processedAt;
}
@Component
@RequiredArgsConstructor
@Transactional
public class IdempotentOrderConsumer {
private final ProcessedMessageRepo processedRepo;
private final OrderService orderService;
@KafkaListener(topics = "order-commands")
public void handleOrderCommand(
@Payload CreateOrderCommand cmd,
@Header(KafkaHeaders.RECORD_METADATA) RecordMetadata meta,
Acknowledgment ack) {
// Build a stable message ID from topic + partition + offset
String messageId = meta.topic() + "-" + meta.partition() + "-" + meta.offset();
// Check if already processed
if (processedRepo.existsById(messageId)) {
log.info("Duplicate message {} — skipping", messageId);
ack.acknowledge();
return;
}
// Process
orderService.createOrder(cmd);
// Mark as processed (in same transaction as the domain write)
processedRepo.save(new ProcessedMessage(messageId, meta.topic(), Instant.now()));
ack.acknowledge();
}
}
OrderPlacedEvent. Due to a consumer group rebalance during deployment, some messages are redelivered. Users report receiving duplicate order confirmation emails. What is the correct fix?
max.poll.interval.ms controls timeout, not rebalance prevention. Exactly-once Kafka semantics (EOS) only applies to produce-consume loops within Kafka itself — external side effects like emails are outside Kafka's transaction boundary and can still duplicate. The correct solution is an idempotent consumer: before sending the email, check if you've already sent one for this orderId. If yes, skip. This makes the operation safe to run any number of times.
Interview Questions
customerId) are routed to the same partition by the default hash partitioner. So all events for customer 123 are in the same partition, in order. Trade-off: high-volume customers create hot partitions. If one customer generates 80% of events, their partition becomes a bottleneck while others are underutilized. Solutions: key by a more distributed attribute, or implement custom partitioners.this.asyncMethod() is a direct Java call to the same object — the proxy is not involved, the method runs synchronously on the calling thread. Fix: inject the bean into itself (@Autowired ApplicationContext ctx; ctx.getBean(MyService.class).asyncMethod()), or extract the async method to a separate bean. The same limitation applies to @Transactional — calling a transactional method from within the same class won't start a new transaction because the proxy is bypassed.