Skip to main content

3 posts tagged with "Kafka"

View All Tags

· 6 min read
Byju Luckose

Modern backend systems demand reactive, scalable, and maintainable architectures especially for event driven workflows like document processing, user provisioning, or IoT orchestration. In this blog post, we’ll explore how to build a non blocking, production ready Spring StateMachine using Java 21, Spring Boot 3.3+, and Project Reactor. We'll follow architectural standards, observability best practices, and layer in a moderately complex real world example.

✨ Why Non-Blocking State Machines Matter

A State Machine models the behavior of a system by defining possible states, events, and transitions. Using a non blocking (reactive) implementation brings huge benefits:

  • Improved scalability: Threads aren’t blocked, allowing efficient resource usage

  • Better responsiveness: Especially under high concurrency or I/O load

  • Clean workflow orchestration: Explicitly model and track business state transitions

  • Excellent integration: With Kafka, WebFlux, and Micrometer

🎓 Theory Refresher: What is a Finite State Machine (FSM)?

A Finite State Machine (FSM) consists of:
  • A finite set of states

  • A set of events that trigger transitions

  • Transition functions that define valid state changes

  • Optional entry/exit actions per state

FSMs are ideal for modeling lifecycle workflows like:
  • Order processing

  • User registration

  • Document approval

  • IoT device management

In Spring StateMachine, FSM concepts are implemented with strong typing, clear configuration, and extensible action/guard hooks.

🔄 Architectural Patterns & Principles

Below is a detailed table of architectural best practices and how they apply to Spring StateMachine:

PrincipleExplanation
Separation of ConcernsKeep states, transitions, and business logic (actions/guards) clearly separated
Single ResponsibilityEach machine or service should handle a specific workflow
Event driven DesignTransitions are triggered by events from Kafka, WebFlux, or internal logic
ObservabilityTrack metrics, transitions, and errors using Prometheus + Micrometer
ResilienceUse fallback states, retries, and guards to handle failures
Configuration over ConventionDefine states, transitions, and actions declaratively using DSL
External Transitions EmphasisPrefer external transitions with actions for full control and traceability
Stateless MachinesMachines don’t persist state internally; use Redis or DB externally
Separation of Actions/GuardsActions and guards should be defined as Spring beans or components

🧱 Configuration Example

State & Event Enums

public enum DocState {
NEW, VALIDATING, PROCESSING, REVIEW_PENDING, APPROVED, FAILED
}

public enum DocEvent {
VALIDATE, PROCESS, SEND_FOR_REVIEW, APPROVE, FAIL
}

State Machine Configuration

First, define separate Action classes for clarity and reuse:

@Component
public class ValidateAction implements Action<DocState, DocEvent> {
public void execute(StateContext<DocState, DocEvent> context) {
System.out.println("[Action] Validating document " + context.getStateMachine().getId());
}
}

@Component
public class ProcessAction implements Action<DocState, DocEvent> {
public void execute(StateContext<DocState, DocEvent> context) {
System.out.println("[Action] Processing document " + context.getStateMachine().getId());
}
}

@Component
public class ApproveAction implements Action<DocState, DocEvent> {
public void execute(StateContext<DocState, DocEvent> context) {
System.out.println("[Action] Approving document " + context.getStateMachine().getId());
}
}

Now, wire them into your state machine configuration:java @Configuration


@Configuration
@EnableStateMachineFactory
@RequiredArgsConstructor
public class DocumentStateMachineConfig extends EnumStateMachineConfigurerAdapter<DocState, DocEvent> {
private final ValidateAction validateAction;
private final ProcessAction processAction;
private final ApproveAction approveAction;

@Override
public void configure(StateMachineStateConfigurer<DocState, DocEvent> states) throws Exception {
states.withStates()
.initial(DocState.NEW)
.state(DocState.VALIDATING)
.state(DocState.PROCESSING)
.state(DocState.REVIEW_PENDING)
.end(DocState.APPROVED)
.end(DocState.FAILED);
}

@Override
public void configure(StateMachineTransitionConfigurer<DocState, DocEvent> transitions) throws Exception {
transitions.withExternal().source(DocState.NEW).target(DocState.VALIDATING).event(DocEvent.VALIDATE).action(validateAction)
.and()
.withExternal().source(DocState.VALIDATING).target(DocState.PROCESSING).event(DocEvent.PROCESS).action(processAction)
.and()
.withExternal().source(DocState.PROCESSING).target(DocState.REVIEW_PENDING).event(DocEvent.SEND_FOR_REVIEW)
.and()
.withExternal().source(DocState.REVIEW_PENDING).target(DocState.APPROVED).event(DocEvent.APPROVE).action(approveAction)
.and()
.withExternal().source(DocState.VALIDATING).target(DocState.FAILED).event(DocEvent.FAIL)
.and()
.withExternal().source(DocState.PROCESSING).target(DocState.FAILED).event(DocEvent.FAIL);
}

private Action<DocState, DocEvent> log(String message) {
return context -> System.out.printf("[Action] %s for doc: %s\n", message, context.getStateMachine().getId());
}
}


⚙️ Running a Reactive State Machine

@Service
public class DocumentWorkflowService {

private final StateMachineFactory<DocState, DocEvent> factory;

public DocumentWorkflowService(StateMachineFactory<DocState, DocEvent> factory) {
this.factory = factory;
}

public void runWorkflow(String docId) {
var sm = factory.getStateMachine(docId);
sm.startReactively()
.then(sm.sendEvent(Mono.just(MessageBuilder.withPayload(DocEvent.VALIDATE).build())))
.then(sm.sendEvent(Mono.just(MessageBuilder.withPayload(DocEvent.PROCESS).build())))
.then(sm.sendEvent(Mono.just(MessageBuilder.withPayload(DocEvent.SEND_FOR_REVIEW).build())))
.then(sm.sendEvent(Mono.just(MessageBuilder.withPayload(DocEvent.APPROVE).build())))
.then(sm.stopReactively())
.subscribe();
}
}

❗ Handling Errors in Actions and Ensuring Transitions

In a robust state machine, handling exceptions within Action classes is critical to avoid broken workflows or silent failures.

🔒 Safe Action Execution Pattern

You should catch exceptions within your Action class to prevent the state machine from halting unexpectedly:

@Component
public class ValidateAction implements Action<DocState, DocEvent> {
public void execute(StateContext<DocState, DocEvent> context) {
try {
// Perform validation logic
System.out.println("[Action] Validating " + context.getStateMachine().getId());
} catch (Exception ex) {
context.getExtendedState().getVariables().put("error", ex.getMessage());
context.getStateMachine().sendEvent(DocEvent.FAIL); // Trigger failure transition
}
}
}

🚨 Configure Error Transitions

Make sure to define fallback transitions that catch these programmatic failures and move the machine to a safe state:

.withExternal()
.source(DocState.VALIDATING)
.target(DocState.FAILED)
.event(DocEvent.FAIL)
.action(errorHandler)

Define an optional errorHandler to log or notify:

@Component
public class ErrorHandler implements Action<DocState, DocEvent> {
public void execute(StateContext<DocState, DocEvent> context) {
String reason = (String) context.getExtendedState().getVariables().get("error");
System.err.println("Transitioned to FAILED due to: " + reason);
}
}

🛡️ Global Error Listener (Optional)

Catch unhandled exceptions:

@Override
public void configure(StateMachineConfigurationConfigurer<DocState, DocEvent> config) throws Exception {
config.withConfiguration()
.listener(new StateMachineListenerAdapter<>() {
@Override
public void stateMachineError(StateMachine<DocState, DocEvent> stateMachine, Exception exception) {
log.error("StateMachine encountered an error: ", exception);
}
});
}

📊 Observability and Kafka Integration

Metric Listener with Micrometer

@Bean
public StateMachineListener<DocState, DocEvent> metricListener(MeterRegistry registry) {
return new StateMachineListenerAdapter<>() {
@Override
public void stateChanged(State<DocState, DocEvent> from, State<DocState, DocEvent> to) {
registry.counter("doc_state_transition", "from", from.getId().name(), "to", to.getId().name()).increment();
}
};
}

Kafka Event Trigger


@KafkaListener(topics = "doc.events")
public void handleEvent(String json) {
DocEvent event = parse(json);
String docId = extractId(json);

var sm = factory.getStateMachine(docId);
sm.startReactively()
.then(sm.sendEvent(Mono.just(MessageBuilder.withPayload(event).build())))
.subscribe();
}

🖼️ DOT Export for Visualization

To visualize your state machine, use the StateMachineSerialisationUtils utility provided by Spring StateMachine. Make sure you include the dependency:


<dependency>
<groupId>org.springframework.statemachine</groupId>
<artifactId>spring-statemachine-kryo</artifactId>
</dependency>

Then export the DOT file like so:


String dot = StateMachineSerialisationUtils.toDot(stateMachine);
Files.writeString(Path.of("statemachine.dot"), dot);

Render via:

dot -Tpng statemachine.dot -o statemachine.png

📏 Additional Standards and Extensions

🧩 State Persistence with Redis or DB
@Bean
public StateMachinePersister<DocState, DocEvent, String> persister() {
return new InMemoryStateMachinePersister<>(); // Replace with Redis or JPA-based implementation
}

Use:

persister.persist(stateMachine, docId);
persister.restore(stateMachine, docId);

📉 Real Time Dashboards (Prometheus + Grafana)

sum by (from, to) (rate(doc_state_transition[5m]))

🧪 Testing with Spring Test Plan

@Test
void shouldTransitionFromNewToValidating() throws Exception {
StateMachine<DocState, DocEvent> machine = factory.getStateMachine();
machine.startReactively().block();
boolean result = machine.sendEvent(Mono.just(MessageBuilder.withPayload(DocEvent.VALIDATE).build())).block();
assertThat(machine.getState().getId()).isEqualTo(DocState.VALIDATING);
}

📐 FAQ: Best Practices

Why emphasize external transitions? Easier to test and debug.

Why stateless machines? More scalable and testable.

Separate actions/guards? Yes — improves traceability and reuse.

Visualize workflows? Use DOT export + Graphviz.

Manage large flows? Use nested or orthogonal states.

· 4 min read
Byju Luckose

In the age of microservices and polyglot development, it's common for teams to use different languages for different tasks Java for orchestration, Python for AI, and C# for enterprise system integration. To tie all this together, Apache Kafka shines as a powerful messaging backbone. In this blog post, we’ll explore how to build a multi-language worker architecture using Spring Boot and Kafka, with workers written in Java, Python, and C#.

Why Use Kafka with Multiple Language Workers?

Kafka is a distributed message queue designed for high-throughput and decoupled communication. Using Kafka with multi-language workers allows you to:

  • Scale task execution independently per language.

  • Use the best language for each task.

  • Decouple orchestration logic from implementation details.

  • Add or remove workers without restarting the system.

Architecture Overview


+-----------------------------+ Kafka Topics +-------------------------+
| Spring Boot App | ---------------------------> | |
| (Orchestrator) | [task-submission] | Java Worker |
| | | - Parses DOCX |
| - Accepts job via REST | <--------------------------- | - Converts to PDF |
| - Sends JSON tasks to Kafka| [task-result] +-------------------------+
| - Collects results | +-------------------------+
+-----------------------------+ | |
| Python Worker |
| - Runs ML Inference |
| - Extracts Text |
+-------------------------+
| |
| C# (.NET) Worker |
| - Legacy System API |
| - Data Enrichment |
+-------------------------+



Topics

  • task-submission: Receives tasks from orchestrator

  • task-result: Publishes results from workers

Common Message Format

All communication uses a shared JSON message schema:


{
"jobId": "123e4567-e89b-12d3-a456-426614174000",
"taskType": "DOC_CONVERT",
"payload": {
"source": "http://example.com/sample.docx",
"outputFormat": "pdf"
}
}


Spring Boot Orchestrator

Dependencies (Maven)

<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>

REST + Kafka Integration

@RestController
@RequestMapping("/jobs")
public class JobController {

private final KafkaTemplate<String, String> kafkaTemplate;
private final ObjectMapper objectMapper = new ObjectMapper();

public JobController(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}

@PostMapping
public ResponseEntity<String> submitJob(@RequestBody Map<String, Object> job) throws JsonProcessingException {
String jobId = UUID.randomUUID().toString();
job.put("jobId", jobId);
String json = objectMapper.writeValueAsString(job);
kafkaTemplate.send("task-submission", jobId, json);
return ResponseEntity.ok("Job submitted: " + jobId);
}

@KafkaListener(topics = "task-result", groupId = "orchestrator")
public void receiveResult(String message) {
System.out.println("Received result: " + message);
}
}


Java Worker Example


@KafkaListener(topics = "task-submission", groupId = "java-worker")
public void consume(String message) throws JsonProcessingException {
ObjectMapper mapper = new ObjectMapper();
Map<String, Object> task = mapper.readValue(message, new TypeReference<>() {});
// ... Process ...
Map<String, Object> result = Map.of(
"jobId", task.get("jobId"),
"status", "done",
"worker", "java"
);
kafkaTemplate.send("task-result", mapper.writeValueAsString(result));
}

Python Worker Example


from kafka import KafkaConsumer, KafkaProducer
import json

consumer = KafkaConsumer('task-submission', bootstrap_servers='localhost:9092', group_id='py-worker')
producer = KafkaProducer(bootstrap_servers='localhost:9092', value_serializer=lambda v: json.dumps(v).encode())

for msg in consumer:
task = json.loads(msg.value.decode())
print("Python Worker got task:", task)

result = {
"jobId": task["jobId"],
"status": "completed",
"worker": "python"
}
producer.send("task-result", result)


C# Worker Example (.NET Core)


using Confluent.Kafka;
using System.Text.Json;

var config = new ConsumerConfig { BootstrapServers = "localhost:9092", GroupId = "csharp-worker" };
var consumer = new ConsumerBuilder<Ignore, string>(config).Build();
var producer = new ProducerBuilder<Null, string>(new ProducerConfig { BootstrapServers = "localhost:9092" }).Build();

consumer.Subscribe("task-submission");

while (true)
{
var consumeResult = consumer.Consume();
var task = JsonSerializer.Deserialize<Dictionary<string, object>>(consumeResult.Message.Value);

var result = new {
jobId = task["jobId"],
status = "done",
worker = "csharp"
};

producer.Produce("task-result", new Message<Null, string> {
Value = JsonSerializer.Serialize(result)
});
}

Monitoring & Logging

  • Use Prometheus + Grafana to monitor worker throughput and failures.

  • Add structured logs with jobId for end-to-end traceability.

Local Testing Tips

  • Use Docker to spin up Kafka quickly (e.g., Bitnami Kafka).

  • Use test producers/consumers (kafka-console-producer, kafka-console-consumer) to verify topics.

  • Use Postman or cURL to submit jobs via Spring Boot.

Benefits of This Architecture

FeatureBenefit
Kafka decouplingWorkers can scale independently
Multi-language supportBest language per use case
Spring Boot OrchestratorCentral control and REST API
Standard JSON formatEasy integration and testing

Conclusion

This architecture empowers teams to build distributed, language-agnostic workflows powered by Kafka. By combining the orchestration strength of Spring Boot with the flexibility of multi-language workers, you can build scalable, fault-tolerant systems that grow with your needs.

· 3 min read
Byju Luckose

In this blog post, we'll build a real-world application that combines Spring StateMachine, Apache Kafka, and CSV-based document ingestion to manage complex document lifecycles in a scalable and reactive way.

Use Case Overview

You have a CSV file that contains many documents. Each row defines a document and an event to apply (e.g., START, COMPLETE). The system should:

  1. Read the CSV file

  2. Send a Kafka message for each row

  3. Consume the Kafka message

  4. Trigger a Spring StateMachine transition for the related document

  5. Persist the updated document state

Sample CSV Format


documentId,title,state,event
doc-001,Contract A,NEW,START
doc-002,Contract B,NEW,START
doc-003,Report C,PROCESSING,COMPLETE

Technologies Used

  • Java 17

  • Spring Boot 3.x

  • Spring StateMachine

  • Spring Kafka

  • Apache Commons CSV

  • H2 Database

Enum Definitions


public enum DocumentState {
NEW, PROCESSING, COMPLETED, ERROR
}

public enum DocumentEvent {
START, COMPLETE, FAIL
}

StateMachine Configuration

@Configuration
@EnableStateMachineFactory
public class DocumentStateMachineConfig extends StateMachineConfigurerAdapter<DocumentState, DocumentEvent> {

@Override
public void configure(StateMachineTransitionConfigurer<DocumentState, DocumentEvent> transitions) throws Exception {
transitions
.withExternal().source(DocumentState.NEW).target(DocumentState.PROCESSING).event(DocumentEvent.START)
.and()
.withExternal().source(DocumentState.PROCESSING).target(DocumentState.COMPLETED).event(DocumentEvent.COMPLETE)
.and()
.withExternal().source(DocumentState.NEW).target(DocumentState.ERROR).event(DocumentEvent.FAIL);
}
}

Document Entity

@Entity
public class Document {

@Id
private String id;
private String title;

@Enumerated(EnumType.STRING)
private DocumentState state;

// Getters and Setters
}

Kafka Producer and CSV Processing


@Component
public class CsvProcessor {

@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

public void processCSV(InputStream inputStream) throws IOException {
try (BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream));
CSVParser parser = CSVFormat.DEFAULT.withFirstRecordAsHeader().parse(reader)) {

for (CSVRecord record : parser) {
String documentId = record.get("documentId");
String event = record.get("event");
kafkaTemplate.send("document-events", documentId, event);
}
}
}
}

REST Upload Endpoint

@RestController
@RequestMapping("/api/documents")
public class DocumentUploadController {

@Autowired
private CsvProcessor csvProcessor;

@PostMapping("/upload")
public ResponseEntity<String> upload(@RequestParam("file") MultipartFile file) throws IOException {
csvProcessor.processCSV(file.getInputStream());
return ResponseEntity.ok("CSV processed successfully");
}
}

Kafka Listener and State Transition


@Component
public class DocumentEventListener {

@Autowired
private StateMachineFactory<DocumentState, DocumentEvent> stateMachineFactory;

@Autowired
private DocumentRepository documentRepository;

@KafkaListener(topics = "document-events")
public void onMessage(ConsumerRecord<String, String> record) {
String docId = record.key();
DocumentEvent event = DocumentEvent.valueOf(record.value());

StateMachine<DocumentState, DocumentEvent> sm = stateMachineFactory.getStateMachine(docId);
sm.start();
sm.sendEvent(event);

Document doc = documentRepository.findById(docId).orElseThrow();
doc.setState(sm.getState().getId());
documentRepository.save(doc);
}
}

Document Repository


public interface DocumentRepository extends JpaRepository<Document, String> {}

Final Thoughts

This architecture provides:

  • Decoupled, event-driven state management

  • Easily testable document lifecycles

  • A scalable pattern for batch processing from CSVs

You can extend this with:

  • Retry transitions

  • Error handling

  • Audit logging

  • UI feedback via WebSockets or REST polling

Let me know if you'd like the full GitHub repo, Docker setup, or integration with a frontend uploader!