Quick Example: StackSaga-Kafka:MySQL
This quick example walks you through building a complete order-placement saga on the StackSaga-Kafka (asynchronous) orchestration engine, using MySQL as the event store.
Unlike the synchronous quick example — where every step runs inside a single orchestrator process — here the orchestrator talks to independent worker services entirely through Kafka topics, so the work is distributed across several Spring Boot applications. By the end you will have a working multiservice saga that places an order across three services and automatically compensates (rolls back) the completed steps when a later step fails.
Overview
The scenario we orchestrate is a simple place-order operation, modelled as a single long-running transaction (LRT) made up of three atomic steps (spans). Because this is the Kafka (asynchronous) engine, each span is executed by a worker service that the orchestrator reaches over Kafka — never through a direct in-process or blocking HTTP call.
| # | Span (Step) | Action Type | Worker Service | Compensated By |
|---|---|---|---|---|
1 |
Validate User |
Query (read-only) |
|
— (queries are never compensated) |
2 |
Reserve Order |
Command (state-changing) |
|
|
3 |
Make Payment |
Command (state-changing) |
|
|
If any step fails, StackSaga automatically compensates the previously completed command steps in reverse order — for example, releasing the reserved order when the payment fails — so the system is always left in a consistent state.
Services and Their Roles
This example spans three Spring Boot applications. The orchestrator/worker labels describe a service’s involvement in this saga, not separate deployments — see Service Classification for the full explanation.
| Service | StackSaga Role | Key Dependency |
|---|---|---|
|
Orchestrator (and also a worker for the Reserve Order span) |
|
|
Worker |
|
|
Worker |
|
order-service plays two roles at once — it drives the saga (orchestrator) and also executes the Reserve Order span (worker).
You do not add the worker starter to it separately: the orchestrator starter already brings the worker capability in transitively.
Only the orchestrator owns the MySQL event store; the workers are stateless with respect to the saga.
|
How the Saga Executes
-
A client calls
POST /api/v1/orderon the orchestrator (order-service). The controller initialises theOrderDomainEntityand hands it to the engine throughStackSagaKafkaTemplate. -
The engine asks the
PlaceOrderEventManagerfor the first topic (DO_USER_VALIDATED) and publishes a command message to that worker’s Kafka topic. -
The target worker (
user-service) consumes the command through its@SagaEndpoint, runs the business logic, and publishes a reply to the framework-managed per-domain reply topic. -
The orchestrator consumes the reply and calls
EventManager.onNext(…)to decide what to do next — route toDO_ORDER_RESERVED, thenDO_MAKE_PAYMENT, and finallycomplete(). -
Every state transition is persisted to the MySQL event store, so an in-flight transaction can always be recovered.
-
If a worker reports a non-retryable failure (or
onNext()returns an error), the engine switches to compensation and dispatches theUNDO_*commands to the relevant workers in reverse order.
For the full picture, see StackSaga-Kafka Architecture.
What This Guide Covers
This guide implements the minimal, working topology: the orchestrator, the three worker spans, compensation, and MySQL persistence. Once it runs, two production-hardening layers can be added on top without changing any of the code below:
-
Retry — make stalled transactions self-healing with the Ring Coordinator. See Architecture · Stage 2.
-
Observability — visualise every transaction in real time with the StackSaga Trace Window. See Architecture · Stage 3.
Stage-1 [Minimal Implementation]
This stage demonstrates a minimal, end-to-end implementation of StackSaga’s Kafka orchestration using MySQL as the event store. We will build it in the following order:
-
Creating Order Service (Orchestrator service) — the orchestrator: domain entity, topics, EventManager, handler, controller, and configuration.
-
Configuring User-Service As Worker — the first worker service.
-
Configuring Order-Service As a Worker — the orchestrator acting as a worker for the Reserve Order span.
-
Configuring Payment-Service As a Worker — the second external worker service.
-
Running the Application and Testing the Transaction Flow — run everything and watch the happy path and compensation in action.
Prerequisites
Make sure you have the following installed and running before getting started:
-
Java 17+
-
Apache Kafka 3.x — a running broker (the orchestrator and workers communicate exclusively over Kafka).
-
MySQL 8+ — used by the orchestrator as the event store.
-
Maven
The quickest way to get a local broker and database up is Docker — e.g. a bitnami/kafka (or Redpanda) container for Kafka and a mysql:8 container for MySQL.
|
Creating Order Service (Orchestrator service)
As the first step, we will create the Order Service (Orchestrator service) that will handle the order placement and orchestrate the other services.
Getting Started - Initial project setup
In this demo we use Spring MVC as the web framework and MySQL as the primary database for the event-store.
So, To get started, create a new Spring Boot application and include the following dependencies in your pom.xml:
| It is recommended to use the StackSaga Initializer to get the dependency snippets for your project for the StackSaga related dependencies, as it will ensure you have the correct versions and configurations for your project setup. |
<dependencyManagement> (1)
<dependencies>
<dependency>
<groupId>org.stacksaga</groupId>
<artifactId>stacksaga-bom</artifactId>
<version>1.0.0-SNAPSHOT</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<!-- Spring Boot Starter Web -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency> (2)
<groupId>org.stacksaga</groupId>
<artifactId>stacksaga-kafka-orchestrator-spring-boot-starter</artifactId>
</dependency>
<dependency> (3)
<groupId>org.stacksaga</groupId>
<artifactId>stacksaga-mysql-reactive-support</artifactId>
</dependency>
</dependencies>
| 1 | This section imports the StackSaga BOM (Bill of Materials) which manages the versions of all StackSaga related dependencies, ensuring compatibility and simplifying dependency management. |
| 2 | This is the main StackSaga Starter dependency that includes the core functionalities of StackSaga, such as the orchestration engine, event handling, and transaction management. It provides the necessary components to implement the StackSaga pattern in your application. |
| 3 | This dependency provides support for using MySQL as the event-store in a reactive manner. It includes the necessary components and configurations to integrate MySQL with StackSaga, allowing you to store and manage events in a MySQL database while leveraging reactive programming paradigms. Stacksaga MySQL Reactive Support |
Creating the Domain-Entity
Domain-Entity is the main entity that will be used in the transaction flow, it represents the main data structure that will be manipulated and processed throughout the transaction. In StackSaga-Kafka it is also the payload that travels (serialized) between the orchestrator and the workers over Kafka. read more
@Getter
@Setter
@SagaDomainEntity(
version = @SagaDomainEntityVersion(major = 1, minor = 0, patch = 0),
name = "OrderDomainEntity"
)(1)
public class OrderDomainEntity extends DomainEntity { (2)
// Domain-specific fields
@JsonProperty("username")
private String username;
@JsonProperty("user_validated")
private boolean userValidated;
@JsonProperty("total_amount")
private double totalAmount;
@JsonProperty("payment_reference_id")
private String paymentReferenceId;
@JsonProperty("product_items")
private List<String> productItems;
(3)
public OrderDomainEntity() {
super(OrderDomainEntity.class);
}
}
| 1 | The @SagaDomainEntity annotation marks this class as a domain entity for StackSaga, which means it will be used to represent the state and data of the transaction as it flows through the different steps of the saga.
The version attribute allows you to manage changes to the domain entity over time. |
| 2 | The OrderDomainEntity class extends the DomainEntity base class provided by StackSaga, which includes common functionality and properties needed for domain entities. |
| 3 | The constructor calls the superclass constructor with the class type, which is necessary for StackSaga to properly manage and serialize the domain entity during the transaction flow. |
Creating Topics
Defining topics is one of the most important steps in a StackSaga-Kafka implementation, because the topics are the saga: each one represents an atomic execution point (span) routed to a specific worker service.
In StackSaga-Kafka, topics are not just plain string names — they carry metadata such as the topic name, a unique topic key, the action type (primary DO or compensation UNDO), and the target service.
You declare them by extending AbstractTopic<T> and exposing each topic as a constant, as shown below. read more
public class PlaceOrderTopic extends AbstractTopic<PlaceOrderTopic> { (1)
(2)
protected PlaceOrderTopic(String topicName, float topicKey, SagaEventType sagaEventType, String targetService) {
super(topicName, topicKey, sagaEventType, targetService);
}
(2)
protected PlaceOrderTopic(String topicName, float topicKey, SagaEventType sagaEventType, String targetService, PlaceOrderTopic parent) {
super(topicName, topicKey, sagaEventType, targetService, parent);
}
(3)
//do topics
public static final PlaceOrderTopic DO_USER_VALIDATED = new PlaceOrderTopic("user-service.validate-user", 1.0f, SagaEventType.QUERY_DO_ACTION, "user-service");
public static final PlaceOrderTopic DO_ORDER_RESERVED = new PlaceOrderTopic("order-service.reserve-order", 2.0f, SagaEventType.COMMAND_DO_ACTION, "order-service");
public static final PlaceOrderTopic DO_MAKE_PAYMENT = new PlaceOrderTopic("payment-service.make-payment", 3.0f, SagaEventType.COMMAND_DO_ACTION, "payment-service");
//undo topics
public static final PlaceOrderTopic UNDO_ORDER_RESERVED = new PlaceOrderTopic("order-service.reserve-order", -2.0f, SagaEventType.COMMAND_UNDO_ACTION, "order-service", DO_ORDER_RESERVED);
public static final PlaceOrderTopic UNDO_MAKE_PAYMENT = new PlaceOrderTopic("payment-service.make-payment", -3.0f, SagaEventType.COMMAND_UNDO_ACTION, "payment-service", DO_MAKE_PAYMENT);
}
| 1 | Create a custom topic class for your domain and extend AbstractTopic<T>, where <T> is the topic class itself (the curiously-recurring generic pattern). |
| 2 | Override the two constructors required by AbstractTopic<T>.
The 4-parameter constructor is used to declare a primary (DO) topic; the 5-parameter constructor adds a parent argument and is used to declare a compensation (UNDO) topic, linking it back to the primary topic it reverses.
You never call these constructors yourself other than from the constants below — just override them and leave the bodies as the super(…) delegation. |
| 3 | Declare every topic of the LRT (long-running transaction) as a public static final constant.
This example has 3 primary (DO) topics and 2 compensation (UNDO) topics — note that DO_USER_VALIDATED is a read-only query and therefore has no compensation counterpart.
Each topic specifies its name, a unique float topic key (negative for compensation), the SagaEventType, and the target service.
See the topic specification for the naming and topic-key rules. |
| The custom topic class only declares topics according to the StackSaga-Kafka specification — it is plain Java, not a Spring bean, so do not annotate it with any Spring stereotype. |
Creating the Topic EventManager
To use the defined topics we have to create a custom event manager as per the Stacksaga specifications as below. This custom event manager creates a domain-topic for receiving the response from the target services. read more Architecture
(2)
@SagaEventManager(
value = "placeOrderEventManager",
listenerScope = ListenerScope.SHARED,
domainRootTopicSuffix = "place-order"
)
public class PlaceOrderEventManager extends AbstractEventManager<OrderDomainEntity, PlaceOrderTopic> { (1)
(3)
@Override
public Supplier<List<PlaceOrderTopic>> registerTopics() {
return () -> List.of(
PlaceOrderTopic.DO_USER_VALIDATED,
PlaceOrderTopic.DO_ORDER_RESERVED,
PlaceOrderTopic.UNDO_ORDER_RESERVED,
PlaceOrderTopic.DO_MAKE_PAYMENT,
PlaceOrderTopic.UNDO_MAKE_PAYMENT
);
}
@NonNull
@Override
(4)
public SagaPrimaryEventAction<PlaceOrderTopic> onNext(PlaceOrderTopic recentTopic, OrderDomainEntity currentDomainEntityState) {
(5)
if (recentTopic.equals(PlaceOrderTopic.DO_USER_VALIDATED)) {
return SagaPrimaryEventAction.next(PlaceOrderTopic.DO_ORDER_RESERVED);
}
(6)
if (recentTopic.equals(PlaceOrderTopic.DO_ORDER_RESERVED)) {
return SagaPrimaryEventAction.next(PlaceOrderTopic.DO_MAKE_PAYMENT);
}
(7)
if (recentTopic.equals(PlaceOrderTopic.DO_MAKE_PAYMENT)) {
return SagaPrimaryEventAction.complete();
}
(8)
return SagaPrimaryEventAction.error(new IllegalStateException("Unexpected topic: " + recentTopic));
}
}
| 1 | Create your custom event manager by extending the AbstractEventManager<DE,T> class. DE is the domain-entity and the <T> is the topic class that we created about. |
| 2 | Annotate the class with @SagaEventManager and configure the required properties.
|
| 3 | Override the registerTopics method to return the full list of topics this event manager uses (both DO and UNDO).
This is where we register our topics with StackSaga, and it is called only once, when the application starts. |
| 4 | Override the onNext method to define the business logic of the event manager. this method is invoked by the framework after each primary-execution topic(span/atomic execution) is completed successfully until the end of the transaction (if it is not failed with NonRetryableExecutorException). and this is where we should navigate the engine to execute next topic after each one as shown above. you can use the given parameters to make the navigation decision. |
| 5 | If the most recently completed topic is DO_USER_VALIDATED, route the engine to DO_ORDER_RESERVED next. |
| 6 | If the most recently completed topic is DO_ORDER_RESERVED, route the engine to DO_MAKE_PAYMENT next. |
| 7 | DO_MAKE_PAYMENT is the last span, so once it completes there is nothing more to route — return SagaPrimaryEventAction.complete() to finish the saga successfully and move it to the terminal COMPLETED state.IMPORTANT: Don’t forget this branch. Without it, a successful payment would fall through to the error(…) below and the engine would wrongly start compensation.
A saga always ends either with complete() here or with compensation triggered by a failure. |
| 8 | A defensive fall-through for an unexpected topic — with the routing above it should never be reached.
Returning SagaPrimaryEventAction.error(…) (or throwing any exception) from onNext() transitions the saga to FAILED and triggers compensation.
See the EventManager reference for the full onNext()/onNextRevert() contract. |
Creating the Handler
Following the recommended handler pattern from the StackSaga team, a Handler is the single class responsible for starting the saga, checking its state, and receiving the transaction events the engine emits during the flow.
In this quick example we keep the handler minimal and use it only to receive transaction state-change events; in a real application it is also the natural place to host the StackSagaKafkaTemplate call that starts the saga.
@Slf4j
@Component(1)
public class PlaceOrderHandler implements TransactionEventListener<OrderDomainEntity> {(2)
@Override(3)
public void onStateChanged(TransactionState<OrderDomainEntity> transactionState) {
log.info("onStateChanged : {}", transactionState.getCurrentStatus());
}
}
| 1 | Annotate the class with @Component to make it a Spring bean, so that it can be automatically detected and registered by the Spring container. |
| 2 | The PlaceOrderHandler class implements the TransactionEventListener interface, which lets it react to the transaction events the StackSaga engine emits during the flow. read moreNOTE: The listener comes in a blocking and a non-blocking flavour — use TransactionEventListener in a servlet (MVC) application and ReactiveTransactionEventListener in a reactive (WebFlux) application.
See Transaction State Change Listeners for both. |
| 3 | The onStateChanged method is overridden to handle the transaction state changes. it receives the current state of the transaction as a parameter, which includes information such as the current status of the transaction, the execution history, the current domain entity state, and the timestamps. in this example, we simply log the current status of the transaction whenever it changes. you can implement any logic you need in this method based on your use case, such as sending notifications to users about the transaction status changes or updating other systems based on the transaction events. |
Creating the Controller to trigger the transaction accessing StackSagaKafkaTemplate
Here we create a simple REST controller with an endpoint that triggers the saga by accessing the StackSagaKafkaTemplate — the main API for interacting with the StackSaga-Kafka engine. read more
It is an ordinary Spring REST controller with a POST endpoint to place an order.
Inside the endpoint we use the StackSagaKafkaTemplate to start the flow — providing the initial domain-entity state and specifying the first topic together with its EventManager.
@Slf4j
@RestController
@RequestMapping("/api/v1/order")
@RequiredArgsConstructor
public class PlaceOrderController {
private final StackSagaKafkaTemplate<OrderDomainEntity, PlaceOrderTopic> stackSagaKafkaTemplate; (1)
@PostMapping
public String placeOrder(@RequestBody PlaceOrderRequest placeOrderRequest) {
final String transactionId = this
.stackSagaKafkaTemplate
(2)
.init(() -> {
OrderDomainEntity orderDomainEntity = new OrderDomainEntity();
orderDomainEntity.setUsername(placeOrderRequest.getUsername());
orderDomainEntity.setTotalAmount(placeOrderRequest.getTotalAmount());
orderDomainEntity.setProductItems(Arrays.asList(placeOrderRequest.getItems()));
return orderDomainEntity;
})
(3)
.peek(orderDomainEntity -> {
//you can do some local operation for the orderDomainEntity before the saga process.
//and also here you can access the unique id for the transaction that provide by the engine.
log.info("Transaction Id : {}", orderDomainEntity.getTransactionId());
})
(4)
.startWith(PlaceOrderTopic.DO_USER_VALIDATED, PlaceOrderEventManager.class)
(5)
.execute();
(6)
return "Order placed successfully with transaction id: " + transactionId;
}
(7)
@Data
public static class PlaceOrderRequest {
private String username;
private double totalAmount;
private String[] items;
}
}
| 1 | Autowire the StackSagaKafkaTemplate<DE, T>, the main API for interacting with the StackSaga-Kafka engine.
It exposes a fluent API to define and start a transaction.
The two type parameters are the domain entity DE (OrderDomainEntity) and the topic class T (PlaceOrderTopic) — the same pair that parameterises the EventManager. |
| 2 | The init method is called to initialize the transaction flow with the initial state of the domain entity. it takes a supplier function that returns the initial domain entity state, which is created based on the request data in this example. |
| 3 | The peek method is an optional step that allows you to perform some local operations with the domain entity before the saga process starts. it also provides access to the unique transaction id generated by the engine, which can be useful for logging or tracking purposes. |
| 4 | The startWith method specifies two things: the first topic to dispatch (PlaceOrderTopic.DO_USER_VALIDATED) and the EventManager class that owns the routing for this saga domain (PlaceOrderEventManager).
From this point on, the EventManager.onNext(…) logic drives the rest of the flow topic by topic. |
| 5 | The execute method is called to execute the transaction flow with the defined configuration. it returns the unique transaction id generated by the engine for this transaction, which can be used for tracking and logging purposes. |
| 6 | The endpoint returns a response indicating that the order was placed successfully along with the transaction id. |
| 7 | The PlaceOrderRequest is a simple DTO class to represent the request body for placing an order, it contains the necessary fields such as username, totalAmount, and items to create the initial domain entity state for the transaction flow.
It is not recommended to use the domain-entity class as the request body directly to avoid coupling the API layer with the domain layer, and to have better control over the API contract. |
Tune Configuration
To run the application, you need to configure the following properties in your application.properties or application.yml file.
#spring properties
spring.application.name=order-service
#stacksaga-instance properties (1)
stacksaga.instance.cluster=local-cluster
stacksaga.instance.region=local-region
stacksaga.instance.zone=local-zone
#stacksaga-starter properties
stacksaga.kafka.orchestrator.domain-entity-scan=org.example.orderservice.domain (2)
#stacksaga-mysql-database-support properties (3)
stacksaga.mysql.r2dbc.url=r2dbc:mysql://localhost:3306
stacksaga.mysql.r2dbc.database=order_service_event_store
stacksaga.mysql.r2dbc.username=${MYSQL_USER:root}
stacksaga.mysql.r2dbc.password=${MYSQL_PASSWORD:password}
(4)
stacksaga.mysql.jdbc.url=jdbc:mysql://localhost:3306/order_service_event_store
stacksaga.mysql.jdbc.username=${MYSQL_USER:root}
stacksaga.mysql.jdbc.password=${MYSQL_PASSWORD:password}
(5)
spring.kafka.bootstrap-servers=${KAFKA_BOOTSTRAP_SERVERS:localhost:9092}
| 1 | The stacksaga.instance properties are used to configure the instance information for the StackSaga engine, such as the cluster, region, and zone. these properties are used is used to identify the transactions for retry and re-storing purposes mainly in distributed environments. in this example, we are using a local cluster, region, and zone for demonstration purposes. |
| 2 | The stacksaga.kafka.orchestrator.domain-entity-scan property is used to specify the package where the domain entity classes are located. the StackSaga engine will scan this package to find the domain entity classes and register them for use in the transaction flow. it can be provided as a comma-separated list of packages if you have multiple packages containing domain entity classes. |
| 3 | The stacksaga.mysql.r2dbc properties are used to configure the R2DBC connection for the StackSaga engine to connect to the MySQL database as the event-store. you need to provide the URL, database name, username, and password for the MySQL connection. the event-store should be a separate database schema dedicated for storing the transaction events and data, it should not be the same database schema used by your application for its regular data storage to avoid any potential conflicts and to have better separation of concerns. |
| 4 | The stacksaga.mysql.jdbc properties are used to configure the JDBC connection.
JDBC connection is used by internally Liquibase to database migration and schema creation. |
| 5 | Configure Kafka connection details |
Configuring User-Service As Worker
The next part is adding the StackSaga Kafka features to the target utility services. let’s update the user-service. Add the following dependencies to your user-service.
Dependency Management
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.stacksaga</groupId>
<artifactId>stacksaga-bom</artifactId>
<version>1.0.0-SNAPSHOT</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependency>
<groupId>org.stacksaga</groupId>
<artifactId>stacksaga-kafka-worker-starter</artifactId>
</dependency>
Next, create the custom UserValidationEndpoint class that handles the Validate User span on the worker side.
Custom UserValidationEndpoint
@Slf4j
@SagaEndpoint(topicNameSuffix = "user-service.validate-user", listenerScope = ListenerScope.SHARED) (2)
public class UserValidationEndpoint extends QueryEndpoint { (1)
@Override (3)
public void doProcess(ConsumerRecord<String, SagaPayload> consumerRecord)
throws JustRetryableExecutorException, RetryableExecutorException, NonRetryableExecutorException {
log.info("Transaction Id: {},Idempotency Key: {}", consumerRecord.value().getTransactionId(), consumerRecord.value().getIdempotencyKey());
consumerRecord.value().getCurrentDomainEntityStateForUpdate().ifPresent(domainEntity -> {
String username = domainEntity.get("username").asText();
log.info("username: {}", username);
//Validate username with your logic here and if the user is not valid you can throw NonRetryableExecutorException
//If there is a transient error, you can throw RetryableExecutorException or JustRetryableExecutorException
try {
//simulate some delay for make the request
Thread.sleep(new Random().nextInt(1000, 3000));
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
}
}
| 1 | Create the custom UserValidationEndpoint by extending from QueryEndpoint abstract class. |
| 2 | Mark the @SagaEndpoint annotation to the class and configure the required properties.
|
| 3 | Override doProcess to implement your user-validation logic.
The consumerRecord carries everything about the transaction — the current domain-entity state, transaction id, idempotency key, and much more. |
Read more about more details
Configuration Properties
spring.application.name=user-service
#kafka connection (1)
spring.kafka.bootstrap-servers=${KAFKA_BOOTSTRAP_SERVERS:localhost:9092}
#stacksaga-instance properties (2)
stacksaga.instance.cluster=local-cluster
stacksaga.instance.region=local-region
stacksaga.instance.zone=local-zone
| 1 | The worker connects to the same Kafka cluster as the orchestrator — that broker is how it receives commands and returns replies. A worker needs no database or event-store configuration; it is stateless with respect to the saga. |
| 2 | Declare the StackSaga instance coordinates. Every saga participant — orchestrator or worker — must share the same cluster / region / zone so the engine can correlate them. |
Configuring Order-Service As a Worker
The Reserve Order span runs inside order-service itself — the very application that also acts as the orchestrator.
So order-service is both the orchestrator and a worker for this saga.
There is nothing to add to its pom.xml: the stacksaga-kafka-orchestrator-spring-boot-starter added earlier already pulls in the worker capability transitively.
We only need to create the endpoint that reserves the ordered items.
Custom ReserverOrderEndpoint
@Slf4j
@SagaEndpoint(topicNameSuffix = "order-service.reserve-order", listenerScope = ListenerScope.SHARED) (2)
public class ReserverOrderEndpoint extends CommandEndpoint { (1)
(3)
@Override
public void doProcess(ConsumerRecord<String, SagaPayload> consumerRecord)
throws JustRetryableExecutorException, RetryableExecutorException, NonRetryableExecutorException {
log.info("Transaction Id: {},Idempotency Key: {}", consumerRecord.value().getTransactionId(), consumerRecord.value().getIdempotencyKey());
consumerRecord.value().getCurrentDomainEntityStateForUpdate().ifPresent(domainEntity -> {
String username = domainEntity.get("username").asText();
log.info("username: {}", username);
JsonNode productItems = domainEntity.get("product_items");
for (JsonNode productItem : productItems) {
String productId = productItem.asText();
log.info("productId: {} ", productId);
}
try {
//simulate some delay for make the request
Thread.sleep(new Random().nextInt(1000, 3000));
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
// TODO: Reserve order with your logic here
});
}
(4)
@Override
public void undoProcess(ConsumerRecord<String, SagaPayload> consumerRecord)
throws JustRetryableExecutorException, RetryableExecutorException {
SagaPayload payload = consumerRecord.value();
log.info("Transaction Id: {},Idempotency Key: {}", payload.getTransactionId(), payload.getIdempotencyKey());
JsonNode lastDomainEntityState = payload.getDomainEntityState();
String username = lastDomainEntityState.get("username").asText();
log.info("username: {}", username);
payload.getPrimaryExecutionException().ifPresent(primaryExecutionExceptionMetaData -> {
log.info("primary error message : {}", primaryExecutionExceptionMetaData.getRealExceptionMessage());
});
// TODO: revert the reserve order with your logic here
try {
//simulate some delay for make the request
Thread.sleep(new Random().nextInt(1000, 3000));
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
(5)
payload.getHintStore().ifPresent(hintStore -> {
hintStore.put("undone_reserve_order", "true");
});
}
}
| 1 | Reserving an order is a state-changing action, so the endpoint extends the CommandEndpoint abstract class — which requires both a doProcess (forward) and an undoProcess (compensation) implementation. |
| 2 | Mark the class with @SagaEndpoint and configure its properties:
|
| 3 | Override doProcess to implement the forward reserve-order logic.
The consumerRecord carries everything about the transaction — the current domain-entity state, transaction id, idempotency key, and much more. |
| 4 | Override undoProcess to implement the compensation logic that releases the reservation.
It exposes the last domain-entity state and the primary-execution exception that triggered the rollback, plus much more. |
| 5 | The undoProcess method also exposes the HintStore — a key-value scratchpad for the compensation process.
Use it to carry flags or metadata between compensation steps (for example, recording that the reservation was successfully undone). |
Read more about more details
Configuration Properties
No additional configuration is required here.
Because the worker role runs inside the same order-service application that drives the saga, it already has everything it needs — the instance properties, the event store, and the Kafka connection were all set earlier in Tune Configuration.
Configuring Payment-Service As a Worker
Just as we did for the user-service, we configure the payment-service as a worker. Because it is a standalone service (not the orchestrator), it needs the worker starter added explicitly. Add the following dependencies to your payment-service.
Dependency Management
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.stacksaga</groupId>
<artifactId>stacksaga-bom</artifactId>
<version>1.0.0-SNAPSHOT</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependency>
<groupId>org.stacksaga</groupId>
<artifactId>stacksaga-kafka-worker-starter</artifactId>
</dependency>
The next step is creating the custom MakePaymentEndpoint class to handle the Make Payment span.
Because making a payment is a state-changing (command) action, it extends the CommandEndpoint abstract class so it can provide both a forward action (doProcess) and a compensation action (undoProcess).
To make the compensation flow observable, this endpoint deliberately fails the payment at random by throwing a NonRetryableExecutorException — exactly the kind of permanent business failure (for example, "insufficient balance") that should trigger a rollback rather than a retry.
Custom MakePaymentEndpoint
@Slf4j
@SagaEndpoint(topicNameSuffix = "payment-service.make-payment", listenerScope = ListenerScope.SHARED)
public class MakePaymentEndpoint extends CommandEndpoint {
@Override
public void doProcess(ConsumerRecord<String, SagaPayload> consumerRecord)
throws JustRetryableExecutorException, RetryableExecutorException, NonRetryableExecutorException {
SagaPayload payload = consumerRecord.value();
log.info("Transaction Id: {},Idempotency Key: {}", payload.getTransactionId(), payload.getIdempotencyKey());
payload.getCurrentDomainEntityStateForUpdate().ifPresent(domainEntity -> {
String username = domainEntity.get("username").asText();
log.info("username: {}", username);
double totalAmount = domainEntity.get("total_amount").asDouble();
log.info("total amount: {}", totalAmount);
// TODO: Make payment with your logic here
try {
//simulate some delay for make the request
Thread.sleep(new Random().nextInt(1000, 3000));
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
//simulate a payment failure to demonstrate compensation.
//run the request a few times until this branch is hit: the orchestrator will then
//compensate the previously completed command span (UNDO_ORDER_RESERVED).
if (new Random().nextBoolean()) {
throw NonRetryableExecutorException
.buildWith(new RuntimeException("insufficient balance"))
.put("error_code", "MAKE_PAYMENT_FAILED")
.build();
}
});
}
@Override
public void undoProcess(ConsumerRecord<String, SagaPayload> consumerRecord)
throws JustRetryableExecutorException, RetryableExecutorException {
SagaPayload payload = consumerRecord.value();
log.info("Transaction Id: {},Idempotency Key: {}", payload.getTransactionId(), payload.getIdempotencyKey());
JsonNode lastDomainEntityState = payload.getDomainEntityState();
String username = lastDomainEntityState.get("username").asText();
log.info("username: {}", username);
double totalAmount = lastDomainEntityState.get("total_amount").asDouble();
log.info("total amount: {}", totalAmount);
payload.getPrimaryExecutionException().ifPresent(primaryExecutionExceptionMetaData -> {
log.info("primary error message : {}", primaryExecutionExceptionMetaData.getRealExceptionMessage());
});
// TODO: revert the payment with your logic here
payload.getHintStore().ifPresent(hintStore -> {
hintStore.put("undone_make_payment", "true");
});
}
}
The framework decides what to do based on the type of exception you throw.
A NonRetryableExecutorException (used here) signals a permanent business failure and starts compensation immediately.
For transient failures (timeouts, a broker blip), throw JustRetryableExecutorException / RetryableExecutorException instead, so the framework retries the span rather than rolling the saga back.
See the Exception Handling Reference.
|
Read more about more details
Configuration Properties
spring.application.name=payment-service
#kafka connection (1)
spring.kafka.bootstrap-servers=${KAFKA_BOOTSTRAP_SERVERS:localhost:9092}
#stacksaga-instance properties (2)
stacksaga.instance.cluster=local-cluster
stacksaga.instance.region=local-region
stacksaga.instance.zone=local-zone
| 1 | The worker connects to the same Kafka cluster as the orchestrator — that broker is how it receives commands and returns replies. A worker needs no database or event-store configuration; it is stateless with respect to the saga. |
| 2 | Declare the StackSaga instance coordinates. Every saga participant — orchestrator or worker — must share the same cluster / region / zone so the engine can correlate them. |
Running the Application and Testing the Transaction Flow
Start all three services — order-service, user-service, and payment-service — with your Kafka broker and MySQL up. On first start-up the framework auto-creates the required Kafka topics and (via Liquibase) the event-store schema.
Trigger the saga with a POST request to the order-service’s /api/v1/order endpoint:
curl -X POST http://localhost:8080/api/v1/order \
-H "Content-Type: application/json" \
-d '{
"username": "john.doe",
"totalAmount": 149.99,
"items": ["item-1", "item-2"]
}'
Because MakePaymentEndpoint fails at random, repeat the request a few times to observe both outcomes.
The PlaceOrderHandler.onStateChanged(…) callback logs every transition, so you can follow the saga in the order-service console.
-
Happy path (payment succeeds) — the saga runs to completion:
2026-06-21T23:57:36.911+05:30 INFO 19868 --- [order-service] [nio-8080-exec-3] o.e.o.controller.PlaceOrderController : Transaction Id : e0b34f8a-9702-4e23-87e8-8ae5b9654a61
2026-06-21T23:57:38.357+05:30 INFO 19868 --- [order-service] [oundedElastic-3] o.e.o.handler.PlaceOrderHandler : onStateChanged : PROCESSING
2026-06-21T23:57:41.177+05:30 INFO 19868 --- [order-service] [oundedElastic-3] o.e.o.handler.PlaceOrderHandler : onStateChanged : PROCESSING
2026-06-21T23:57:42.527+05:30 INFO 19868 --- [order-service] [oundedElastic-3] o.e.o.handler.PlaceOrderHandler : onStateChanged : PROCESS_COMPLETED
-
Compensation path (payment fails) — the engine reverts the previously reserved order, ending in
REVERT_COMPLETED:
2026-06-21T23:54:19.094+05:30 INFO 17792 --- [order-service] [nio-8080-exec-1] o.e.o.controller.PlaceOrderController : Transaction Id : 0864a84c-79a1-45a1-a688-662f431ca218
2026-06-21T23:54:26.248+05:30 INFO 17792 --- [order-service] [oundedElastic-3] o.e.o.handler.PlaceOrderHandler : onStateChanged : PROCESSING
2026-06-21T23:54:27.956+05:30 INFO 17792 --- [order-service] [oundedElastic-3] o.e.o.handler.PlaceOrderHandler : onStateChanged : PROCESSING
2026-06-21T23:54:29.980+05:30 INFO 17792 --- [order-service] [oundedElastic-3] o.e.o.handler.PlaceOrderHandler : onStateChanged : REVERTING
2026-06-21T23:54:32.546+05:30 INFO 17792 --- [order-service] [oundedElastic-3] o.e.o.handler.PlaceOrderHandler : onStateChanged : REVERT_COMPLETED