StackSaga-Kafka Framework (Asynchronous)

Table of Contents

Overview

StackSaga-Kafka Framework (stacksaga-kafka-spring-boot-starter) is the asynchronous transport implementation of the StackSaga ecosystem. It replaces synchronous inter-service calls with Kafka as the messaging backbone, enabling fully event-driven saga execution for long-running transactions (LRT) in a microservices system.

Unlike a simple choreography approach where each service publishes and reacts to domain events independently, StackSaga-Kafka follows a centralized orchestration model: a single orchestrator service owns the full saga lifecycle, emits commands to target worker services via dedicated Kafka topics, and collects responses through a framework-managed per-domain reply topic. This separation gives you the scalability and decoupling of Kafka while retaining the deterministic control flow and auditability of the saga orchestration pattern.

The framework is built on top of Spring Boot and integrates with StackSaga’s core engine for event sourcing, state management, retry scheduling, and cluster coordination. Each saga execution is fully persisted — every state transition of the SagaDomainEntity (the saga’s aggregate root and payload carrier) is written to the event store through stacksaga-database-support — so that any in-progress transaction can be recovered, retried, or inspected at any point in time.

Key capabilities:

  • Asynchronous saga orchestration — the orchestrator issues commands to worker services via Kafka and advances the saga state only upon receiving the worker’s reply, with no blocking thread held during the wait.

  • Forward and backward recovery — built-in support for compensating transactions: when a saga step fails, the framework invokes onNextRevert() on the SagaEventNavigator in reverse step order, triggering rollback actions on each previously completed service.

  • Per-domain reply topology — outbound command topics are created per worker intent; inbound reply topics are created per saga domain by the framework. A system with three saga domains maintains exactly three reply topics, keeping Kafka partition management predictable.

  • Durable state via event sourcing — the full SagaDomainEntity payload and status (STARTED → IN_PROGRESS → COMPLETED | FAILED → COMPENSATING → COMPENSATED) is persisted on every state transition, enabling point-in-time recovery and complete audit trails.

  • Distributed retry with token ring partitioning — failed or stalled transactions are retried by the owning orchestrator instance as determined by Murmur3-based token ring partitioning, coordinated through the stacksaga-ring-coordinator.

  • Idempotency guarantees — duplicate Kafka message delivery is handled safely through the StackSaga Trace Window, preventing unintended re-execution of already-completed saga steps.

  • Reactive and imperative support — the framework’s worker-side KafkaWorkerEndpoints abstraction supports both reactive (Mono/Flux) and blocking handler implementations, giving application developers freedom to choose their programming model without affecting saga coordination.

Why StackSaga-Kafka Over Raw Kafka?

Using Kafka alone for service-to-service communication in a saga does not automatically give you saga orchestration. A raw Kafka topology requires every participating service to understand the broader transaction context: which step it is in, what has already succeeded, what to roll back if something goes wrong, and how to track overall saga progress. This logic inevitably leaks into every service, making the system fragile and difficult to evolve.

The following comparison explains the specific gaps StackSaga-Kafka closes.

Centralized Transaction State Management

Concern Detail

Raw Kafka

Kafka delivers messages durably but has no concept of a business transaction. There is no built-in mechanism to know that "step 2 of 5 has completed" or that a transaction is in a compensating state.

StackSaga-Kafka

The orchestrator tracks the full saga lifecycle through well-defined states: STARTED → IN_PROGRESS → COMPLETED | FAILED → COMPENSATING → COMPENSATED. The SagaDomainEntity is the single source of truth: it carries the transaction payload (all accumulated data from previous steps) and the current execution cursor. State transitions are persisted atomically to the event store on every step completion or failure.

Advantage

No saga coordination logic needs to be implemented in individual worker services. The orchestrator is the sole authority on saga state.

Resilience and Fault Tolerance

Concern Detail

Raw Kafka

Kafka provides delivery durability and at-least-once guarantees, but it does not know what to do when the message is consumed and the downstream operation fails. Retry logic, timeouts, and compensation decisions must be built separately.

StackSaga-Kafka

The framework implements configurable retry windows, boundary guard thresholds, and automatic compensation invocation. If a worker service returns a failure response, the saga engine immediately begins the reverse traversal of the SagaEventNavigator, calling onNextRevert() for each completed forward step in reverse order. For transient failures (timeouts, infrastructure errors), the retry subsystem re-invokes the saga from the last unacknowledged step using the Murmur3 token ring scheduler.

Advantage

Data consistency is maintained across services even under partial failures, without embedding retry or rollback logic in each service.

Simplified Development

Concern Detail

Raw Kafka

Developers must implement state machine logic, idempotency checks, saga step routing, and compensation orchestration in every service that participates in a business transaction.

StackSaga-Kafka

The orchestrator-side abstraction (SagaKafkaTemplate, SagaEventNavigator, SagaDomainEntity) and the worker-side abstraction (KafkaWorkerEndpoints) provide clear contracts for each role. Developers define what the steps do and in what order; the framework handles routing, persistence, retrying, and compensation sequencing.

Advantage

Reduces boilerplate significantly and confines distributed transaction concerns to a small, testable layer.

Operational Observability

Concern Detail

Raw Kafka

Consumer lag metrics and broker-level monitoring are available, but there is no concept of "transaction X is stuck at step 3" or "compensation for order Y failed".

StackSaga-Kafka

The stacksaga-trace-window-connector exposes APIs consumed by the StackSaga Trace Window UI, providing per-transaction step-level traces, execution timelines, failure points, retry counts, and compensation status.

Advantage

Operations teams can diagnose stalled or failed sagas at the business-transaction level, not just at the Kafka consumer level.

Components

StackSaga-Kafka Framework Component Overview

The StackSaga-Kafka Framework is split into two artifacts with distinct responsibilities. They communicate with each other exclusively through Kafka: the orchestrator sends outbound command messages to worker-designated topics, and workers send reply messages back to the framework-managed per-domain reply topic.

stacksaga-kafka-orchestrator

stacksaga-kafka-orchestrator is the core runtime dependency added to the orchestrator service — the single service responsible for initiating and driving the saga lifecycle.

It exposes:

  • SagaKafkaTemplate — the entry point for starting a new saga execution or resuming a recovered one. Accepts an initialized SagaDomainEntity and hands it off to the saga engine.

  • SagaEventNavigator — defines the ordered steps and compensation steps of a saga. The engine calls onNext() sequentially for forward progress and onNextRevert() in reverse order during compensation.

  • SagaDomainEntity — the aggregate root for a saga instance. Carries the accumulated business payload and the current execution state. It is serialized and persisted to the event store on every state transition.

Additionally, the orchestrator service must include:

  • stacksaga-database-support — provides the event store integration (MySQL, Cassandra, or ScyllaDB depending on configuration) for persisting SagaDomainEntity snapshots and state transitions.

  • stacksaga-ring-coordinator-connector (optional, required for retry) — connects the orchestrator instance to the Ring Coordinator Service, registers it as a retry node, and receives its assigned Murmur3 token sub-range. Enables the retry subsystem to re-invoke failed transactions owned by this instance.

  • stacksaga-environment-support (optional) — provides environment metadata to the framework for service discovery and instance identity resolution in environments such as Eureka or Kubernetes.

stacksaga-kafka-worker

stacksaga-kafka-worker is the dependency added to every worker service (also called a target service or utility service) that participates in sagas initiated by an orchestrator.

The worker dependency provides:

  • A Kafka listener infrastructure that subscribes to the topic(s) designated for that service by the framework’s topic configuration.

  • KafkaWorkerEndpoints — the abstraction through which application developers implement the forward processing logic and the compensation logic for each saga step handled by this service.

  • Automatic response dispatch — after the endpoint handler completes (successfully or with a failure result), the worker sends the outcome back to the framework via the orchestrator’s per-domain reply topic. The application developer does not manage this reply flow directly.

Worker services do not interact with the event store or the ring coordinator. They are stateless with respect to the saga: they receive a command, execute the relevant business operation, and return a result.

A single worker service can handle steps for multiple orchestrator domains (i.e., multiple saga types), each routed to the correct KafkaWorkerEndpoints implementation by the framework.

Architecture

The following sections describe the StackSaga-Kafka architecture progressively across four deployment stages. Each stage builds on the previous one, adding production-readiness capabilities incrementally. This staged approach makes it possible to start with a minimal setup and harden the system as requirements grow.

  1. Stage 1: Basic Setup — orchestrator + worker communication over Kafka.

  2. Stage 2: Retry-Ready Setup — distributed retry via ring coordinator.

  3. Stage 3: Monitoring Setup — saga-level observability via trace window.

  4. Stage 4: Environment Support — service discovery and multi-environment integration.

Stage 1 — Basic Setup

This stage establishes the minimal viable topology: an orchestrator service that can start and drive sagas, and one or more worker services that can receive commands and return results over Kafka. No retry coordination or external monitoring is configured at this stage.

StackSaga-Kafka Architecture: Stage 1 — Basic Setup

Dependencies

Service Dependency Purpose

Orchestrator

stacksaga-kafka-orchestrator

Provides the saga engine, SagaKafkaTemplate, SagaEventNavigator, and the Kafka producer for outbound command topics.

Orchestrator

stacksaga-database-support

Provides the event store adapter for persisting SagaDomainEntity state and execution history.

Worker

stacksaga-kafka-worker

Provides the Kafka consumer infrastructure and KafkaWorkerEndpoints abstraction.

Request and Execution Flow

  1. An inbound HTTP request (e.g., POST /order) reaches the OrderController on the orchestrator service.

  2. The controller instantiates a PlaceOrderDomainEntity — the SagaDomainEntity subclass for the order placement saga — populates its initial payload, and calls SagaKafkaTemplate.process(…​). From this point the saga engine takes full control.

  3. The engine evaluates the PlaceOrderEventNavigator to determine the first step and calls onNext() with the current domain entity.

  4. The onNext() implementation produces a command message to the designated outbound Kafka topic for the target service (e.g., the payment service’s command topic). The message carries the saga step identifier and the serialized domain entity payload.

  5. The worker service, which has the stacksaga-kafka-worker dependency, consumes the message from its designated topic via the registered KafkaWorkerEndpoints handler. It executes the business operation (e.g., charge the payment) and produces a reply message to the per-domain reply topic managed by the framework. For the PlaceOrderDomainEntity domain, this is a single, dedicated reply topic created automatically by the framework.

  6. The orchestrator’s internal Kafka consumer reads the reply from the domain reply topic. If the step succeeded, the engine updates the domain entity state, persists the new snapshot to the event store, and advances to the next step by calling onNext() again.

  7. If the worker replies with a failure, the engine transitions the saga to FAILED, persists the failure state, and begins compensation by calling onNextRevert() on each previously completed step in reverse order. Each onNextRevert() produces a compensation command to the relevant worker topic. Workers handle compensation commands through their dedicated KafkaWorkerEndpoints compensation handler and reply via the same domain reply topic.

  8. Each state transition — step completion, failure, compensation start, compensation completion — is written to the event store via stacksaga-database-support.

The diagram above shows a single span (MakePaymentSpan) for clarity. In practice, a saga can traverse any number of spans across different worker services, each with its own command topic. All replies — regardless of which worker sent them — flow back to the single per-domain reply topic.

Stage 2 — Retry-Ready Setup

Stage 2 introduces the distributed retry capability. Without this stage, a saga that stalls due to a transient infrastructure failure (e.g., a worker service being temporarily unavailable, a Kafka broker partition becoming unavailable, or a timeout) will remain in an incomplete state indefinitely. Stage 2 makes the system self-healing for such failures.

StackSaga-Kafka Architecture: Stage 2 — Retry-Ready Setup

New Components at This Stage

Service New Component Purpose

Ring Coordinator Service

stacksaga-ring-coordinator

A standalone service that manages the token ring. It tracks available orchestrator instances, distributes Murmur3 token sub-ranges among them, and handles range rebalancing when instances join or leave the cluster.

Orchestrator

stacksaga-ring-coordinator-connector

Connects the orchestrator instance to the ring coordinator (via RSocket request-stream), receives and holds its assigned token sub-range, and enables the local retry scheduler to scan the event store for transactions whose Murmur3 hash falls within the owned range.

Retry Mechanism

By adding stacksaga-ring-coordinator-connector to the orchestrator service, the instance is promoted to a retry node. The ring coordinator assigns each registered orchestrator instance a contiguous sub-range of the Murmur3 token ring. The retry scheduler on each instance periodically scans the event store for transactions that are in a non-terminal state (e.g., IN_PROGRESS, COMPENSATING) and whose transaction ID hashes into the locally owned token range. When such a transaction is found, the scheduler re-submits it to the saga engine for re-execution from the last incomplete step.

This partitioning ensures that in a multi-instance orchestrator deployment, each stuck transaction is retried by exactly one instance — there is no duplication of retry work and no need for distributed locking. When an instance restarts or a new instance joins, the ring coordinator rebalances token ranges and the new assignment is delivered over the existing RSocket stream.

The ring coordinator is a separate service (stacksaga-ring-coordinator-spring-boot-starter) that must be deployed independently. It is a lightweight coordination service and does not participate in the business logic or the Kafka message flow.

Stage 3 — Monitoring and Observability Setup

Stage 3 adds the observability layer, enabling the StackSaga Trace Window to query real-time and historical saga execution data from the orchestrator service.

StackSaga-Kafka Architecture: Stage 3 — Monitoring + Retry-Ready

New Component at This Stage

Service New Component Purpose

Orchestrator

stacksaga-trace-window-connector

Exposes a set of internal APIs (consumed by the StackSaga Trace Window UI) that surface per-transaction execution traces, step-level timelines, failure details, retry histories, and compensation status from the event store.

What Becomes Visible

With the trace window connector in place, the StackSaga Trace Window provides:

  • Per-saga execution graphs showing each step, its execution timestamp, duration, status, and any error payload.

  • Compensation traces showing which onNextRevert() calls were made, in what order, and whether they succeeded.

  • Retry audit logs showing how many retry attempts were made, which retry node handled each attempt, and the outcome.

  • Live transaction status for in-progress sagas.

This layer does not change the Kafka topology or the retry behavior — it is a passive observability connector that reads from the existing event store and exposes the data through an API consumed by the Trace Window UI.

Stage 4 — Environment Support Setup

Stage 4 adds environment awareness to the framework, enabling it to resolve service instance metadata correctly when running in dynamic environments such as Eureka-based service meshes or Kubernetes deployments.

StackSaga-Kafka Architecture: Stage 4 — Environment Support + Monitoring + Retry-Ready

New Component at This Stage

Service New Component Purpose

Orchestrator + Ring Coordinator Agent

stacksaga-environment-support

Provides the framework with environment-specific metadata: the current instance’s host, port, and service identity as visible to the service discovery mechanism. This is required for the ring coordinator to correctly register and deregister orchestrator instances as they come up and go down in dynamic environments.

"Ring Coordinator Agent" in this context refers to the orchestrator service instance that has stacksaga-ring-coordinator-connector included — i.e., the same service that acts as a retry node. Adding stacksaga-environment-support to this service enables the ring coordinator to correctly identify the instance in environments where the host or port is dynamically assigned.

Environment Support and Topic Resolution

In cloud-native deployments, the environment support module ensures that:

  • The orchestrator instance registers with the ring coordinator using its actual reachable address, not a loopback or internal Docker address.

  • Instance-level metrics and trace data surfaced in the Trace Window are attributed to the correct service instance.

  • Service discovery metadata (e.g., Eureka instance ID or Kubernetes pod name) is available to the framework for logging and observability correlation.

With Stage 4 in place, the full StackSaga-Kafka deployment is production-ready: it supports asynchronous saga execution, distributed retry with token ring partitioning, comprehensive observability, and correct operation in dynamic cloud environments.

Comprehensive Technical Reference

The comprehensive technical reference is described in 2 main sections based on the two main components.

  1. stacksaga-kafka-orchestrator based specifications and functionalities.

  2. stacksaga-kafka-worker based specifications and functionalities.

stacksaga-kafka-orchestrator based specifications and functionalities.

stacksaga-kafka-orchestrator

stacksaga-kafka-orchestrator is the core dependency of the Stacksaga-Kafka Framework for the orchestrator service. it provides the essential functionalities and specifications to create and manage sagas using Kafka as the messaging backbone and ot works as Saga Execution coordinator(SEC). the dependency can be added to your Spring Boot application (the target orchestrator service) as follows,

It has been built internally using reactive programming principles, ensuring high scalability and performance. It primarily uses Reactor Kafka for interacting with Kafka. Even though the framework operates reactively under the hood, it fully supports both reactive and non-reactive (imperative) application models.
<dependency>
    <groupId>org.stacksaga</groupId>
    <artifactId>stacksaga-kafka-orchestrator-starter</artifactId>
    <version>${latest-version}</version>
</dependency>

After adding the stacksaga-kafka-orchestrator-starter dependency to your Spring Boot application, it automatically configures the necessary beans and components required for saga orchestration using Kafka. and you can provide configure your saga workflows accordingly the stacksaga-kafka specifications. next let’s dive into the specifications.

Domain Entity and Event Sourcing Specifications

Overview

The Domain-Entity object serves as the central data container for the entire transaction lifecycle. It acts as a shared data bucket, allowing on each spans to access and update transaction-related data as the process progresses.

The Domain-Entity’s state is updated at each significant event, and every version of this state is persisted. Initially, the Domain-Entity’s state is saved as the transaction’s starting state in the event-store. After each executor completes, the updated Domain-Entity state’s snapshot is again saved, tagged with the atomic execution’s name (Span).

This approach is known as domain-entity event sourcing.

Domain-Entity event sourcing provides two main benefits:

  1. Transaction Re-Invoke (Retrying and Restoring)

    • If an atomic execution fails due to a transient issue (such as a temporary resource unavailability), the system can retry the operation. or if the transaction is stopped due to a critical failure, it can be restored and re-invoked after the issue is resolved. when the retry is begun, the transaction should be started where the transaction was stopped previously. to start the transaction from the same point, it has to be restored the same data that was used when the transaction was stopped.

  2. Transaction Traceability via Dashboard

    • By storing every change to the domain-entity, the system enables detailed debugging and auditing. Administrators can inspect the domain-entity’s state before and after each execution runs, making it easy to trace the transaction’s evolution step by step through the Trace-Window dashboard.

Domain-Entity as the Domain identifier

Even though the domain-entity is used as the data bucket for the entire transaction primarily, it also serves another important purpose in the framework. it is used as the domain identifier for the transaction. that means, the domain-entity class is used to identify the transaction and also to identify the executors that are responsible for executing the transaction. that means The domain-entity class acts as the generic identifier for the transaction. for instance, if you have a transaction related to order processing, you can create a domain-entity class named OrderDomainEntity and use it as the identifier for that transaction. and also it may have any number of business domains (use cases,) and then you can create different domain-entity classes for each business domain. this approach provides a clear and consistent way to organize and manage transactions based on their domain context.

How the Domain-Entity is Used in the Saga Execution?

According to the Place-order example, the entire process has a set of sub executions like:

  1. Fetching user’s details. (query execution)

  2. Initialize order. (command execution)

  3. Reserve the items. (command execution)

  4. Make the Payment (command execution)

While executing those atomic processes, you have to store some data regarding each execution. For instance,

  • At the initially, order request related data should be stored in the domain-entity to be used in upcoming executions such as username, total amount, items that the customer bought, etc.

  • After that, the user’s data is fetched from the user-service by using the username that has been stored in the domain-entity object. and again, the user’s details are stored in the domain-entity object to be used in upcoming executions.

  • That stored user’s data will be used for Initialize order.

  • Next, to reserve the items, the order ID that is generated from the Initialize order execution is used.

  • Finally, To make the payment, order ID and the total amount, username that are stored in the domain-entity object are used. and also again, the domain-entity is updated by storing the reference ID that is returned from the payment-service.

Here is How the domain-entity changes while the transaction on each executor as per the place-order example,

stacksaga diagram domain-entity state

Life cycle Of the Domain-Entity
  • The Saga Domain-Entity object is created by you as the developer, yor are supposed to create the custom domain-domain-entity object before starting the transaction. you can create the object by adding the initial data that is required for the transaction. for instance, if you are going to start a transaction related to order processing, you can create an OrderDomainEntity object and add the initial data such as username, total amount, items that the customer bought, etc. and it is passed to the saga orchestration engine as an argument when you start the transaction by accessing the saga orchestration engine via the SagaKafkaTemplate.process(…​) method. and the initial state of the domain-entity is saved in the event-store as the starting point of the transaction. that means, the first version of the domain-entity is created and stored in the event-store at the moment.

  • After that giving the control to the saga orchestration engine, the domain-entity is updated continuously from time to time by the executors as needed. that means, after executing each execution, the domain-entity is updated by the executor by adding any data that is required for the upcoming executions. for instance, after fetching the user’s details, the user’s details are stored in the domain-entity object to be used in upcoming executions. and also after making the payment, the reference ID that is returned from the payment-service is stored in the domain-entity object to be used in upcoming executions. and each time when the domain-entity is updated. if there is no any pivot execution,(failure point) and if the LRT is done as expected, the Domain-Entity object end the lifecycle by reaching the COMPLETED state. but if there is a pivot execution (failure point), the Domain-Entity object end the lifecycle by reaching the FAILED state.

In the compensating executions, it can not be used the domain-entity object to update the data. because the domain-entity is used only for the primary executions in Stacksaga. the revert Revert-Hint-Store can be used for compensation executions to store any metadata that is required for the compensating executions.
Creating Custom Domain-Entity

To create a custom domain-entity, you need to create a class that extends the SagaDomainEntity class provided by the framework. and you can add any fields that are required for your transaction in that class. for instance, if you are going to create a domain-entity for order processing, you can create an OrderDomainEntity class and add fields such as username, total amount, items that the customer bought, etc. and also you can add any other fields that are required for your transaction. and then you can use this custom domain-entity class when starting the transaction by passing an instance of it to the SagaKafkaTemplate.process(…​) method.

import java.util.List;

@Getter
@Setter
(1)
@SagaDomainEntity(
        version = @SagaDomainEntityVersion(major = 1, minor = 0, patch = 0),
        name = "OrderDomainEntity"
)
public class OrderDomainEntity extends DomainEntity { (2)
    (4)
    @JsonProperty("username")
    private String username;
    @JsonProperty("order_id")
    private String orderId;
    @JsonProperty("total_amount")
    private double totalAmount;
    @JsonProperty("payment_reference_id")
    private String paymentReferenceId;
    @JsonProperty("user_validation_data")
    private UserValidationData userValidationData;
    @JsonProperty("product_items")
    private List<ProductItem> productItems;
    @JsonProperty("metadata")
    private Map<String, String> metadata;

    protected OrderDomainEntity() {
        (3)
        super(OrderDomainEntity.class);
    }

    (5)
    @Getter
    @Setter
    @NoArgsConstructor
    public static class ProductItem extends MissingPropertyCollector {
        @JsonProperty("product_id")
        private String productId;
        @JsonProperty("quantity")
        private int quantity;
        @JsonProperty("price")
        private double price;
    }

    @Getter
    @Setter
    @NoArgsConstructor
    public static class UserValidationData extends MissingPropertyCollector {
        @JsonProperty("is_user_validated")
        private boolean userValidated;

        @JsonProperty("validation_note")
        private String validationNote;
    }
}
1 The custom domain-entity class should be annotated with @SagaDomainEntity.
name: The name of the aggregator. this is used for identification of the DomainEntity by the name.
version: version will be used for the identification of the DomainEntity versioning. it is helpful for the event-upper-casting and event-down-casting. @SagaDomainEntityVersion annotation help you to provide the version of the aggregator.
see further customizations
2 The custom domain-entity class should be extended from the DomainEntity class. It provides the shep of DomainEntity in the framework.
3 Create the default constructor of the custom domain-entity class. and inside the constructor, call the super method by passing the same class of the custom domain-entity.
Due to framework initiates the object, it’s not recommended to have another constructor with parameters and it’s pointless
4 The custom domain-entity class can have any number of attributes that you want to store. it is recommended to use @JsonProperty annotation for each attribute to avoid any serialization disruptions.
here you can see some attributes that are used in the place order example.
if the custom domain-entity needs complex objects, you can create inner static classes or separate classes for that purpose. and those classes should be implemented by org.stacksaga.MissingPropertyCollector. MissingPropertyCollector do a most important job role to collect any missing properties(only there are missing properties by mistaken ) while deserializing the DomainEntity object from the event-store.
5 Sample inner static classes that are used in the DomainEntity.
Due to the fact that the framework identifies the custom domain-entity by its name that provided via the @SagaDomainEntity annotation, you area able to change the class name and package of the custom domain-entity class without affecting the framework in case of any refactoring or any other reason. but the name that provided in the @SagaDomainEntity annotation should be the same as before to avoid any disruption in the framework. and also it can not be created two different custom domain-entity classes with the same name.
In StackSaga, The custom domain-entity is not a spring bean at all. Therefore, it is not necessary to have inside the spring beans' component scan area. instead, it can be anywhere in your project, and you can provide the package to the stacksaga framework via stacksaga.domain-entity-scan property.

Furthermore, you can provide some additional configurations for the custom domain-entity by using the attributes of the @SagaDomainEntity annotation.

Custom Mapper Provider for Domain-Entity

By default, stacksaga uses the default ObjectMapper that spring boot provides via DefaultAggregatorMapperProvider. in case if you want to customize the ObjectMapper for your target aggregator, you can create and provide a custom objectMapper object for the target Domain-Entity as a custom implementation of AbstractDomainEntityMapperProvider. It can be created any number of custom mapper providers for different domain-entities as needed as below.

@Component (1)
public class OrderDomainEntityMapperProvider extends AbstractDomainEntityMapperProvider { (2)

    @Override (3)
    protected ObjectMapper provide() {
        return new ObjectMapper(); (4)
    }
}
//-------------------------------------------------------------------------------

@Getter
@Setter
@SagaDomainEntity(
        version = @SagaDomainEntityVersion(major = 1, minor = 0, patch = 0),
        name = "OrderDomainEntity",
        mapper = OrderDomainEntityMapperProvider.class (5)
)
public class OrderDomainEntity extends DomainEntity {
    //...
}
1 @Component: Mark your custom object mapper implementation as a Spring bean.
2 Extend class by AbstractDomainEntityMapperProvider abstract class.
provide() method is invoked only once at the initialization phase of the framework to get the custom ObjectMapper object for the target domain-entity.
3 Override the method for providing the custom ObjectMapper object.
4 return the customized ObjectMapper object.
5 mapper: provide your custom domain entity mapper provider class in the DomainEntity class.
Custom Key Generator Provider for Domain-Entity

Key generator is responsible for generating the unique identifier for each transaction instance. by default, stacksaga uses DefaultDomainEntityKeyGenerator as the key generator for all domain-entities. but if you want to customize the key generator for your target domain-entity, you can create and provide a custom key generator for the target Domain-Entity as a custom implementation of AbstractDomainEntityKeyGeneratorProvider. it can be created any number of custom key generator providers for different domain-entities as needed.

AbstractDomainEntityKeyGenerator provide two default methods. ech has default implementation, but you can override them to provide your custom logic for generating the keys. those methods are,

Generating Unique Identifiers for Saga Transactions (Default-Implementation)

Every saga transaction requires a globally unique identifier To supply a custom key generator for an Aggregator. implement the AggregatorKeyGenerator interface. Its generateKey method provides rich context you can leverage to construct a deterministic or randomized identifier: serviceName, serviceVersion, instanceId, region, zone,executionMode, and the sagaAggregator metadata.

Providing a custom KeyGen is optional. StackSaga ships with a production-ready default (DefaultKeyGen.class). A custom generator can still be valuable to:

  • align identifiers with your sharding/partitioning strategy,

  • improve index locality or read/write patterns,

  • embed minimal routing or observability hints,

  • satisfy organization-specific compliance or traceability rules.

The default generator produces time-ordered, high-entropy identifiers using this shape:

  • <serviceInitials>-<epochMillis>-<nanoId>

For example, with a service named order-service, default IDs might look like:

  • OS-1713809175237-021575259417101

  • OS-1713809468378-117401549843120

  • OS-1713809493499-012220401009440

jnanoid is used to generate the random NanoID segment, providing excellent entropy and collision resistance.
Generating Idempotency Keys for Saga Spans
If you are new to term of idempotent, refer Maintaining Idempotency first.

Each span of a saga transaction, (i.e., each execution invocation) requires an idempotency key to ensure safe retries. The generateIdempotencyKey method provides all the necessary context to create a robust idempotency key that uniquely identifies the execution attempt. the context is provided by categorizing into two main input objects: SafeIdempotentInput and UnSafeIdempotentInput based on the safety of their content for logging and debugging purposes.

The UnSafeIdempotentInput is not recommended to be used for generating the idempotency at all, because might not be the same withing the transaction lifecycle, and it can be cased by uniqueness of the idempotent key. the UnSafeIdempotentInput can be used for logging and debugging purposes only, but not for generating the idempotency key. instead, it is highly recommended to use the SafeIdempotentInput for generating the idempotency key because it provides a consistent and reliable.

The default implementation of the generateIdempotencyKey method produces a fixed-length hash by concatenating the transactionId, currentExecutor, and executionMode from the SafeIdempotentInput and then applying the provided hashGenerator to create a compact, consistent, and collision-resistant idempotency key.

Custom Idempotency Key Generation Implementation

Here is an example of how to override the generateIdempotencyKey method to provide custom logic for generating idempotency keys and how it can be configured to the custom DomainEntity class.

Due to the fact that the AggregatorKeyGenerator’s all methods are `default methods, no need to implement all methods. you can implement only the required methods as needed.
@Component (1)
public class OrderDomainEntityKeyGenerator extends AbstractDomainEntityKeyGenerator { (2)

    @Override (3)
    // this method is called when each transaction is initialized
    public String generateTransactionKey(String serviceName, String applicationVersion, String instanceId, String region, String zone, SagaDomainEntity sagaDomainEntity) {

        StringBuilder regionKey = new StringBuilder();
        for (char c : region.toCharArray()) {
            regionKey.append((int) c);
        }
        return String.format("%s-%s-%s", serviceName , regionKey , UUID.randomUUID());
    }

    @Override (4)
    public String generateIdempotencyKey(SafeIdempotentInput safeIdempotentInput, UnSafeIdempotentInput unSafeIdempotentInput) {
         final String rowKey = new StringJoiner(":")
                .add(safeIdempotentInput.transactionId())
                .add(safeIdempotentInput.currentExecutionName())
                .add(safeIdempotentInput.executionMode().name().toLowerCase())
                .toString();
       return this.hashGenerator.generateHash(rowKey, HashGenerator.ALGType.MD5);
    }
}

//: Configure The custom KeyGen With Custom DomainEntity

@Getter
@Setter
@SagaDomainEntity(
        version = @SagaDomainEntityVersion(major = 1, minor = 0, patch = 0),
        name = "OrderDomainEntity",
        mapper = OrderDomainEntityMapperProvider.class,
        keyGen = OrderDomainEntityKeyGenerator.class (5)
)
public class OrderDomainEntity extends DomainEntity {
    //...
}
1 @Component: Mark your custom key generator implementation as a Spring bean.
2 Extend the custom class by AbstractDomainEntityKeyGenerator.
3 Override the generateTransactionKey method and create your custom key for the transaction.
the method is called when each transaction is initialized.
4 Override the generateIdempotencyKey method and create your custom idempotency key for each span.
the method is called when before each span execution is invoked.
5 provide your custom class as keyGen of the @SagaDomainEntity in your DomainEntity class.
it is highly recommended to use the provided hashGenerator to produce a fixed-length hash of a composite string for generating the idempotency key, rather than returning a raw concatenation of input values. this approach ensures that the idempotency key is compact, consistent in length, and has a low risk of collisions, even when the input values are long or contain variable content.
Register the implementation as a Spring bean (e.g., @Component) and ensure it is stateless and thread-safe. The framework may invoke it concurrently.
Topic And EventManager
StackSaga Topic

StackSaga Topics are constant values that are used to identify the execution endpoint in StackSaga. Topic are nothing but the kafka topics that are used to send the command messages to the worker services. Stacksaga topics contains additional metadata such as the name of the topic, the type of the topic (primary or compensation), and the span name that is used to identify the execution point. Stacksaga topics are used in the EventManager to determine which topic should be triggered next based on the execution flow.

As per the place-order example, there were 4 atomic executions (Spans) in the primary flow like fetching user’s details, initialize order, make the payment,and inventory update. and also there were 3 atomic executions in the compensation flow like cancel order, refund payment, and release inventory. so there are 7 spans in total. here is the sample topics that are used in the place-order example,

class PlaceOrderTopic extends AbstractTopic<PlaceOrderTopic> {(1)

    (2)
    protected PlaceOrderTopic(String topicName, float topicKey, SagaEventType sagaEventType, String targetService) {
        super(topicName, topicKey, sagaEventType, targetService);
    }


    (3)
    protected PlaceOrderTopic(String topicName, float topicKey, SagaEventType sagaEventType, String targetService, PlaceOrderTopic parent) {
        super(topicName, topicKey, sagaEventType, targetService, parent);
    }

    (4)
    //primary execution topics.
    public static final PlaceOrderTopic DO_FETCH_USER_DETAILS = new PlaceOrderTopic("user.fetch", 1, SagaEventType.QUERY_DO_ACTION, "user-service");
    public static final PlaceOrderTopic DO_INITIALIZE_ORDER = new PlaceOrderTopic("order.init", 2, SagaEventType.COMMAND_DO_ACTION, "order-service");
    public static final PlaceOrderTopic DO_MAKE_PAYMENT = new PlaceOrderTopic("payment.make", 3, SagaEventType.COMMAND_DO_ACTION, "payment-service");
    public static final PlaceOrderTopic DO_INVENTORY_UPDATE = new PlaceOrderTopic("inventory.update", 4, SagaEventType.COMMAND_DO_ACTION, "inventory-service");

    (5)
    //revert/compensation topics.
    public static final PlaceOrderTopic UNDO_INITIALIZE_ORDER = new PlaceOrderTopic("order.init", -2, SagaEventType.COMMAND_UNDO_ACTION, "order-service", DO_INITIALIZE_ORDER);
    public static final PlaceOrderTopic UNDO_MAKE_PAYMENT = new PlaceOrderTopic("payment.make", -3, SagaEventType.COMMAND_UNDO_ACTION, "payment-service", DO_MAKE_PAYMENT);
    public static final PlaceOrderTopic UNDO_INVENTORY_UPDATE = new PlaceOrderTopic("inventory.update", -4, SagaEventType.COMMAND_UNDO_ACTION, "inventory-service", DO_INVENTORY_UPDATE);
}
1 Create the custom topic class by extending the AbstractTopic class.
2 Override the constructors for primary execution topic instantiation.
topicName: the name of the topic that is used in the kafka. see Topic Name Specification for more details
topicKey: the constant and unique (withing the domain) float value to represent the topic. see Topic Key Specification for more details.
sagaEventType: the type of the topic. it can be either SagaEventType.QUERY_DO_ACTION for query execution or SagaEventType.COMMAND_DO_ACTION for command execution.
targetService: the name of the target service that is responsible for executing the command. this is used for logging and debugging purposes.
3 Override the constructors for compensation execution topic instantiation.
the parameters are the same as the primary execution topic constructor with an additional parameter for the parent topic.
parent: the primary execution topic that is related to the compensation topic. this is used to indicate the relationship between the primary execution and the compensation execution. for instance, in the place-order example, the compensation topic UNDO_INITIALIZE_ORDER is related to the primary execution topic DO_INITIALIZE_ORDER.
4 Create the primary execution topics as static final fields in the custom topic class by fallowing the conventions.
5 Create the compensation execution topics as static final fields in the custom topic class by fallowing the conventions.
The custom topic class is not a spring bean at all. make sure not to annotate it with any spring annotations.
Topic Name Specification In StackSaga-Kafka

The topic name is the name of the kafka topic that is used to send the messages to the worker services. it is recommended to use a clear and descriptive name for the topic that reflects the purpose of the execution. for instance, in the place-order example, the topic name for DO_FETCH_USER_DETAILS is do.user.fetch, the topic name for DO_INITIALIZE_ORDER is do.order.init, the topic name for DO_MAKE_PAYMENT is do.payment.make, and the topic name for DO_INVENTORY_UPDATE is do.inventory.update. and also for compensation topics, it is recommended to use the same topic name as the primary execution topic to indicate that they are related. for instance, in the place-order example, the topic name for UNDO_INITIALIZE_ORDER is also undo.order.init, the topic name for UNDO_MAKE_PAYMENT is also undo.payment.make, and the topic name for UNDO_INVENTORY_UPDATE is also undo.inventory.update.

The prefix do. for the primary execution topics and undo. for the compensation topics the must accordingly to StackSaga guidelines. but you can freely mention the topic name without mentioning the prefix as well, because internally the framework will create the topics by adding the prefix as per the topic type. for instance, if you mention the topic name for DO_FETCH_USER_DETAILS as user.fetch, the framework will create the topic name as do.user.fetch internally and you are able to see that real topic in the kafka topic list.

Rules for topic naming in StackSaga-Kafka:

  1. Use . as a separator to create a hierarchical structure in topic names (e.g., do.order.init) and it is not allowed to use any other separator such as - or _ in the topic name.

Topic Key Specification In StackSaga-Kafka

The topic key is used for serialization and deserialization process. The row topic names are not passed via the header of the kafka messages, instead, the topic key is used for that purpose. therefore, the keys can not be changed once it is used in the system. therefore, it is highly recommended to use a constant and unique float value for each topic within the same domain. for instance, in the place-order example, the topic key for DO_FETCH_USER_DETAILS is 1, the topic key for DO_INITIALIZE_ORDER is 2, the topic key for DO_MAKE_PAYMENT is 3, and the topic key for DO_INVENTORY_UPDATE is 4. and also for compensation topics, it is recommended to use negative float values to differentiate them from primary execution topics. for instance, in the place-order example, the topic key for UNDO_INITIALIZE_ORDER is -2, the topic key for UNDO_MAKE_PAYMENT is -3, and the topic key for UNDO_INVENTORY_UPDATE is -4.

As per the custom topic class that is mentioned above, here is the list of topics with their keys and names that are used in the place-order example,

Execution Topic Type Mentioned Topic Name Real Topic Name

Fetch User Details

QUERY_DO_ACTION

user.fetch

saga.do.user.fetch

Initialize Order

COMMAND_DO_ACTION

order.init

saga.do.order.init

Make Payment

COMMAND_DO_ACTION

payment.make

saga.do.payment.make

Inventory Update

COMMAND_DO_ACTION

inventory.update

saga.do.inventory.update

Cancel Order

COMMAND_UNDO_ACTION

order.init

saga.undo.order.init

Refund Payment

COMMAND_UNDO_ACTION

payment.make

saga.undo.payment.make

Release Inventory

COMMAND_UNDO_ACTION

inventory.update

saga.undo.inventory.update

COMMAND_UNDO_BEFORE_ACTION and COMMAND_UNDO_AFTER_ACTION are not supported in the current version of the framework, but they will be supported in the upcoming versions. therefore, it is not recommended to use those types of topic types.
EventManager

Stacksaga-Kafka supports fully runtime dynamic execution navigation based on your conditions and the state of the transaction. So the EventManager is the component that is responsible for that. let’s create a custom EventManager for the OrderDomainEntity as below.

(2)
@SagaEventManager(
        value = "placeOrderEventManager", (3)
        listenerScope = ListenerScope.SHARED, (4)
        domainRootTopicSuffix = "place-order" (5)
)
public class PlaceOrderEventManager extends AbstractEventManager<OrderDomainEntity, PlaceOrderTopic> { (1)

    (6)
    @Override
    public Supplier<List<PlaceOrderTopic>> registerTopics() {
        return () -> List.of(
                PlaceOrderTopic.DO_FETCH_USER_DETAILS,
                PlaceOrderTopic.DO_INITIALIZE_ORDER,
                PlaceOrderTopic.DO_MAKE_PAYMENT,
                PlaceOrderTopic.DO_INVENTORY_UPDATE,
                PlaceOrderTopic.UNDO_INITIALIZE_ORDER,
                PlaceOrderTopic.UNDO_MAKE_PAYMENT,
                PlaceOrderTopic.UNDO_INVENTORY_UPDATE
        );
    }

    (7)
    @Override
    public @NonNull SagaPrimaryEventAction<PlaceOrderTopic> onNext(PlaceOrderTopic recentTopic, OrderDomainEntity currentDomainEntityState) {
        if (recentTopic.equals(PlaceOrderTopic.DO_FETCH_USER_DETAILS)) {
            currentDomainEntityState.getMetadata().put("userDetailsFetched", "true");
            return SagaPrimaryEventAction.next(PlaceOrderTopic.DO_INITIALIZE_ORDER);
        }
        if (recentTopic.equals(PlaceOrderTopic.DO_INITIALIZE_ORDER)) {
            currentDomainEntityState.getMetadata().put("orderInitialized", "true");
            return SagaPrimaryEventAction.next(PlaceOrderTopic.DO_MAKE_PAYMENT);
        }
        if (recentTopic.equals(PlaceOrderTopic.DO_MAKE_PAYMENT)) {
            currentDomainEntityState.getMetadata().put("paymentMade", "true");
            return SagaPrimaryEventAction.next(PlaceOrderTopic.DO_INVENTORY_UPDATE);
        }
        if (recentTopic.equals(PlaceOrderTopic.DO_INVENTORY_UPDATE)) {
            currentDomainEntityState.getMetadata().put("inventoryUpdated", "true");
            return SagaPrimaryEventAction.complete();
        }
        return SagaPrimaryEventAction.error(new IllegalStateException("Unexpected topic: " + recentTopic));
    }

    (8)
    @Override
    public void onNextRevert(
            PlaceOrderTopic recentExecutedTopic,
            PlaceOrderTopic nextTopic,
            OrderDomainEntity lastDomainEntityState,
            NonRetryableExecutorException nonRetryableExecutorException,
            RevertHintStore revertHintStore,
            Supplier<NavigableMap<Integer, PlaceOrderTopic>> remainingReverts
    ) {

        (9)
        {//sample usage of revert hint store and next topic in the onNextRevert method.
            if (nextTopic.equals(PlaceOrderTopic.UNDO_MAKE_PAYMENT)) {
                revertHintStore.put("BEFORE_NOTE:UNDO_MAKE_PAYMENT", "Sample value before reverting UNDO_MAKE_PAYMENT");
            }
            if (nextTopic.equals(PlaceOrderTopic.UNDO_INITIALIZE_ORDER)) {
                revertHintStore.put("BEFORE_NOTE:UNDO_INITIALIZE_ORDER", "Sample value before reverting UNDO_INITIALIZE_ORDER");
            }
        }

        (10)
        {//sample usage of remaining reverts and recentExecutedTopic
            if (recentExecutedTopic.equals(PlaceOrderTopic.UNDO_MAKE_PAYMENT)) {
                log.info("Remaining reverts after reverting UNDO_MAKE_PAYMENT: {}", remainingReverts);
            }

            if (recentExecutedTopic.equals(PlaceOrderTopic.UNDO_INITIALIZE_ORDER)) {
                log.info("Remaining reverts after reverting UNDO_INITIALIZE_ORDER: {}", remainingReverts);
            }
        }
    }
}
1 Create a custom EventManager class by extending the AbstractEventManager class and providing the custom your custom DomainEntity and the custom created Topic class as the generic parameters.
2 Annotate the custom EventManager class with @SagaEventManager annotation. it primarily marks the class as a spring bean.
3 value: provide the name of the spring bean for the custom EventManager. it is used for identification of the EventManager by the name.
4 listenerScope: Provide the listener scope for the EventManager. it can be either ListenerScope.SHARED or ListenerScope.ISOLATED. see Topics And Listener Models In stacksaga-kafka-orchestrator for more details.
5 domainRootTopicSuffix: provide the common suffix for the topics that are related to the same domain. this is used for creating the domain-root-topic (topic for receiving the response messages from the kafka clint endpoints) internally by the framework.
6 Override the registerTopics() method to register the topics that are used in the transaction. this method is invoked by the framework at the startup phase to register the topics regarding the CustomDomainEntity. you should return the list of the topics via a supplier. in this example, it has been 7 topics that are used in the place-order example. these are the real topics that sends the message to the target services.
7 Override the onNext() method to determine the next topic based on the recent executed topic and the current state of the domain-entity. this method is invoked by the framework after each successful execution of the primary flow. you can use the provided parameters to evaluate the conditions and determine the next topic that should be triggered. for instance, in the place-order example, if the recent executed topic is DO_FETCH_USER_DETAILS, it means that the user’s details have been fetched successfully, so we can update the domain-entity state by adding a metadata entry and then return SagaPrimaryEventAction.next(PlaceOrderTopic.DO_INITIALIZE_ORDER) to indicate that the next topic that should be triggered is DO_INITIALIZE_ORDER. and also if all the spans are processed as expected, you can return SagaPrimaryEventAction.complete() to indicate that the transaction is successfully completed.
8 Override the onNextRevert() (Optional) method to perform any action before reverting the executions in the compensation flow. this method is invoked by the framework before executing each compensation execution.
9 Sample usage of the RevertHintStore and the nextTopic parameters in the onNextRevert() method. you can store any metadata that is required for the compensation execution in the RevertHintStore and you can also use the nextTopic parameter to determine which compensation topic is going to be executed next. you can use the provided parameters to perform any action before reverting the execution as you can see in the example. see the Javadoc of the class for more details about the parameters of this method.
10 Sample usage of the remainingReverts and the recentExecutedTopic parameters in the onNextRevert() method.
As a best practice, avoid performing any I/O-intensive or High CPU-intensive operations inside the event navigator. This method should be used only to evaluate conditions based on the provided parameters.
The reason is that the Stacksaga-engine internally relies on Project Reactor’s reactive pipelines, which run on non-blocking event-loop threads. Executing expensive operations here could block those threads, leading to performance bottlenecks and impacting the entire application.
Topics And Listener Models In stacksaga-kafka-orchestrator
Topics

In stacksaga-kafka-orchestrator,it creates dedicated topic for each EventManager of the DomainEntity to receive the response messages from the kafka worker endpoints. The name of the topic is created by concatenating the provided domainRootTopicSuffix with the prefix saga.internal.{serviceName}.{domainRootTopicSuffix} for instance, if the service name is order-service and the domainRootTopicSuffix is place-order, the topic name will be saga.internal.order-service.place-order.

the domainRootTopicSuffix is configured in the @SagaEventManager annotation of the EventManager as below.

@SagaEventManager(
        domainRootTopicSuffix = "place-order"
)
Listener Models

Even though it creates dedicated topic for each EventManager, there are two different listener models that are used to consume the messages from those topics based on the listenerScope configuration of the EventManager which is mentioned in the @SagaEventManager annotation. those two listener models are,

Let’s dive into details of each listener model.

SHARED

If the @SagaEventManager is configured with listenerScope = ListenerScope.SHARED, the framework bind the relevant response-topic of that EventManager to a shared-listener.
If you have multiple EventManagers with listenerScope = ListenerScope.SHARED all the topics that are related to those EventManagers will be bind to the same shared-listener.

Here is the architecture for the shared listener model in stacksaga-kafka-orchestrator.

Shared kafka receiver in stacksaga kafka orchestrator
  • The consumer group is configured using the following naming convention: {serviceName}-os where os stands for Orchestration Service.
    For example, if the service name is order-service, the consumer group for the shared root topic will be: order-service-os

  • The shared listener/receiver is configured with: auto.offset.reset=earliest This ensures that previously published messages can be consumed after a service restart when no committed offset is available.

  • Message consumption is configured using At-Least-Once delivery semantics to ensure that messages are not lost, even in the event of service downtime or unexpected failures.

  • To guarantee At-Least-Once delivery semantics, each message is acknowledged only after it has been successfully processed.

  • Acknowledged offsets are committed periodically using the framework’s default time-based commit configuration. Therefore, if a failure occurs after a message has been processed and acknowledged but before the offset has been committed, the message may be delivered again and reprocessed after recovery.

  • Processing mode: The shared listener can be configured via stacksaga.kafka.orchestrator.shared-response-listener.processing-mode property which determines how messages are processed. it can be configured for either SharedListenerProcessingMode.CONCURRENT or SharedListenerProcessingMode.PARTITION_ORDERED.
    In CONCURRENT mode, messages are processed concurrently without preserving the order of messages from the same partition.
    In PARTITION_ORDERED mode, messages are processed sequentially per partition while allowing concurrent execution across multiple partitions.
    The default processing mode for the shared listener is PARTITION_ORDERED to ensure a balance between throughput and consistency while preserving the order of messages within each partition. See more relevant Configuration Properties Of stacksaga-kafka-orchestrator

ISOLATED

If the @SagaEventManager is configured with listenerScope = ListenerScope.ISOLATED, the framework will create a dedicated listener for the relevant response-topic of that EventManager. therefore, if you have multiple EventManagers with listenerScope = ListenerScope.ISOLATED, each EventManager will have its own dedicated listener and its own dedicated topic to receive the response messages from the kafka worker endpoints. this approach provides better isolation and can help to avoid any potential interference between different EventManagers, but it can also increase the resource consumption due to having multiple listeners.

Here is the architecture for the isolated-listener model in stacksaga-kafka-orchestrator.

Isolated kafka receiver in stacksaga kafka orchestrator
  • The name of the domain-root topic is created by concatenating the provided domainRootTopicSuffix with the prefix saga.internal.isolated.{serviceName}.{domainRootTopicSuffix} For instance, if the service name is order-service and the domainRootTopicSuffix is place-order, the domain-root topic name will be saga.internal.isolated.order-service.place-order.

  • the consumer group for the domain-root topic is set as the same way as the shared root topic by following the convention {serviceName}-os. therefore, in the previous example, the consumer group for the domain-root topic will be order-service-os. the other configurations for the domain-root topic are the same as the shared root topic.

  • Same as the shared, the domain-root topics are also configured with auto.offset.reset=earliest to ensure that previously published messages can be consumed after a service restart when no committed offset is available.

  • Same as the shared root topic, the domain-root topics are also configured with At-Least-Once delivery semantics, and the offsets are acknowledged and committed in the same way as the shared root topic.

  • Same as the shared root topic, each message is acknowledged only after it has been successfully processed, and if a failure occurs after a message has been processed and acknowledged but before the offset has been committed, the message may be delivered again and reprocessed after recovery.

  • Same as the shared root topic, Acknowledged offsets are committed periodically using the framework’s default time-based commit configuration. Therefore, if a failure occurs after a message has been processed and acknowledged but before the offset has been committed, the message may be delivered again and reprocessed after recovery.

  • Processing mode: The isolated listener can be configured via the @SagaEventManager(processingMode=?) of each EventManager of the domain by providing the processingMode parameter which determines how messages are processed in the isolated listener. it can be configured for either for fallowing processing modes,

    • IsolatedListenerProcessingMode.CONCURRENT: In this mode, messages are processed concurrently without preserving the order of messages from the same partition.

If the isolated listener is configured with CONCURRENT processing mode, the concurrency can be customized via @SagaEventManager(isolatedListenerConcurrency=?). read the class Javadoc for more details.
  • IsolatedListenerProcessingMode.PARTITION_ORDERED: In this mode, messages are processed sequentially per partition while allowing concurrent execution across multiple partitions.
    The default processing mode for the isolated listener is PARTITION_ORDERED to ensure a balance between throughput and consistency while preserving the order of messages within each partition.

  • IsolatedListenerProcessingMode.SEQUENTIAL: In this mode, messages are processed sequentially without any concurrency. this mode is used when the order of processing is critical and must be preserved across all partitions.
    The SEQUENTIAL processing mode is not recommended for high-throughput scenarios due to its sequential nature, which can become a bottleneck. it is recommended to use this mode only when the strict ordering of message processing is a hard requirement and the expected message volume is low to moderate.

StackSagaKafkaTemplate

StackSagaKafkaTemplate is the main entry point for accessing the StackSaga-Kafka engine (SEC) functionalities in the orchestrator service. it provides the main APIs for starting a new saga transaction and checking the status of the existing transactions.

StackSagaKafkaTemplate can be used in both reactive and non-reactive programming models. if you are in non-reactive environment, it can be used as below.

StackSagaKafkaTemplate In Non-Reactive environment

This demonstrates how to use the StackSagaKafkaTemplate to start a new saga transaction for placing an order and also how to check the current state of the transaction.

@Slf4j
@Component
@RequiredArgsConstructor
public class PlaceOrderHandler {

    private StackSagaKafkaTemplate<OrderDomainEntity, PlaceOrderTopic> stackSagaKafkaTemplate; (1)

    public String handle(String username, double amount, List<String> items) {
        String transactionId = this.stackSagaKafkaTemplate
                (2)
                .init(() -> {
                    OrderDomainEntity orderDomainEntity = new OrderDomainEntity();
                    {//initializing the domain entity with the required properties for the saga execution.
                        orderDomainEntity.setUsername(username);
                        orderDomainEntity.setTotalAmount(amount);
                        //...
                    }
                    return orderDomainEntity;
                })
                (3)
                .peek(orderDomainEntity -> {
                    log.info("transactionId {}:", orderDomainEntity.getTransactionId());
                })
                (4)
                .startWith(PlaceOrderTopic.DO_FETCH_USER_DETAILS, OrderEventManager.class)
                (5)
                .execute();
        log.info("Started place order saga with transactionId: {}", transactionId);
        return transactionId;
    }

    public void printCurrentState(String transactionId) {
        TransactionState<OrderDomainEntity> state = this
                .stackSagaKafkaTemplate
                (6)
                .getCurrentState(transactionId)
                .execute();

        TransactionCompleteStatus currentStatus = state.getCurrentStatus(); (7)
        log.info("Current status of transaction {}: {}", transactionId, currentStatus);
        NavigableMap<Integer, SagaExecutionEvent> executionHistory = state.getExecutionHistory();(8)
        log.info("Execution history of transaction {}: {}", transactionId, executionHistory);
        OrderDomainEntity currentDomainEntity = state.getCurrentDomainEntity(); (9)
        log.info("Current domain entity state: {}", currentDomainEntity); (10)
        ZonedDateTime startedDateTime = state.getStartedDateTime(); (11)
        log.info("Transaction {} started at: {}", transactionId, startedDateTime.toLocalDateTime());
    }
}
1 Autowire the StackSagaKafkaTemplate in your class. you have to provide the custom DomainEntity and the custom Topic class as the generic parameters of the StackSagaKafkaTemplate.
2 Use the init operator to initialize the transaction by providing a supplier of the custom DomainEntity. this method is used to create the initial state of the transaction by providing the required properties for the saga execution in the DomainEntity.
3 Use the peek operator (optional) to perform any action with the initialized DomainEntity before starting the execution. the usage of this operator is that is to see the generated transactionId after initializing the DomainEntity because the transactionId is generated and set in the DomainEntity by the framework at the time of passing via init operator. therefore, you can use this operator to see the generated transactionId before starting the execution.
4 Use the startWith operator to specify the first topic that should be triggered to start the execution and also provide the EventManager class that is related to the transaction. this operator is used to indicate the starting point of the execution by providing the first topic that should be triggered and also the EventManager class that is responsible for navigating the execution flow.
5 Use the execute operator to start the execution of the transaction in blocking way.
6 Use the getCurrentState operator to get the current state of the transaction by providing the transactionId. this method is used to retrieve the current state of the transaction at any point in time by providing the transactionId. yu can see all the details about the transaction state in the returned TransactionState object such as the current status of the transaction, the execution history, the current state of the domain entity, the started time and many more details.
7 Get the current status of the transaction from the TransactionState object.
8 Get the execution history of the transaction from the TransactionState object. the execution history is a navigable map that contains all the executed endpoints in the transaction flow (primary and compensation) with their metadata such as the execution status, completed time event name and more.
9 Get the current state of the domain entity from the TransactionState object. this is the latest state of the domain entity after applying all the changes that are done by the executed endpoints in the transaction flow.
10 Log the current state of the domain entity.
11 Get the started time of the transaction from the TransactionState object and log it.
There is another way to being updated about the transaction status changes by using the EventListener.
Go through the Javadoc of the TransactionState class to see all the features that are provided in the transaction state object to get more insights about the transaction.
StackSagaKafkaTemplate In Reactive environment

If you are in a reactive environment, you can use the StackSagaKafkaTemplate in a non-blocking way by using the reactive operators provided by the template as below.

@Component
@RequiredArgsConstructor
class ReactivePlaceOrderHandler {
    private StackSagaKafkaTemplate<OrderDomainEntity, PlaceOrderTopic> stackSagaKafkaTemplate;

    public Mono<String> handle(String username, double amount, List<String> items) {
        return this
                .stackSagaKafkaTemplate
                .init(() -> {
                    OrderDomainEntity orderDomainEntity = new OrderDomainEntity();
                    {//initializing the domain entity with the required properties for the saga execution.
                        orderDomainEntity.setUsername(username);
                        orderDomainEntity.setTotalAmount(amount);
                        //...
                    }
                    return orderDomainEntity;
                })
                .peek(orderDomainEntity -> {
                    log.info("transactionId {}:", orderDomainEntity.getTransactionId());
                })
                .startWith(PlaceOrderTopic.DO_FETCH_USER_DETAILS, OrderEventManager.class)
                .executeAsync()
                .doOnNext(transactionId -> {
                    log.info("Started place order saga with transactionId: {}", transactionId);
                });
    }
    public Mono<Void> printAndCurrentState(String transactionId) {
        return this
                .stackSagaKafkaTemplate
                .getCurrentState(transactionId)
                .executeAsync()
                .doOnNext(state -> {
                    TransactionCompleteStatus currentStatus = state.getCurrentStatus();
                    log.info("Current status of transaction {}: {}", transactionId, currentStatus);
                    NavigableMap<Integer, SagaExecutionEvent> executionHistory = state.getExecutionHistory();
                    log.info("Execution history of transaction {}: {}", transactionId, executionHistory);
                    OrderDomainEntity currentDomainEntity = state.getCurrentDomainEntity();
                    log.info("Current domain entity state: {}", currentDomainEntity);
                    ZonedDateTime startedDateTime = state.getStartedDateTime();
                    log.info("Transaction {} started at: {}", transactionId, startedDateTime.toLocalDateTime());
                })
                .then();

    }
}
All the things are the same except the usage of executeAsync instead of execute to start the execution in a non-blocking way and also to get the current state in a non-blocking way. read the blocking example to get the details explained about the usage of the StackSagaKafkaTemplate.
EventListener For Stage Change Events

After handing over the transaction to the StackSaga-Kafka engine by calling the execute or executeAsync method of the StackSagaKafkaTemplate, the transaction is being executed and updated the status time to time after each execution (primary and compensation) in the transaction flow. so if you want to keep listening to the transaction status changes and perform any action based on that, framework provides the way to do that.

It also can be implemented based on the environment.

  1. TransactionEventListener for non-reactive environment.

  2. ReactiveTransactionEventListener for reactive environment.

If you want to get notified about the transaction status changes, there are two ways are available in the framework.

  1. Using SagaKafkaTemplate.getCurrentState

  2. Using TransactionEventListener or ReactiveTransactionEventListener.

using SagaKafkaTemplate.getCurrentState method to get the current state of the transaction by providing the transactionId approach is not efficient because when you call this method, the engine fetches the current state of the transaction from the event-store. but if you are using the listeners, you will be notified about the transaction status changes after each execution in the transaction flow without the need to call any method to get the current state of the transaction because the current state of the transaction is being passed to the callback method of those listeners realtime. due to it is happening realtime you don’t want to pool the transaction state. for instance in the place-order example, after executing you can push the notification to update the user about the order status changes by using those listeners. but in some cases you might want to get the current state of the transaction even after the transaction is completed. for instance after use might need to see the details after placing order. then you can use the SagaKafkaTemplate.getCurrentState method to get the current state of the transaction by providing the transactionId. The use cases are different for both approaches, so you can choose the one that is suitable for your use case.
While you are using the onChange method of the listeners, be aware that the method is invoked after each execution in the transaction flow, so make sure to perform only the required action in that method and avoid doing any heavy processing in that method because it can be caused to delay the entire transaction flow. if you want to do some heavy processing based on the transaction status changes, it is recommended make a deep copy of the TransactionState object and then perform the heavy processing with a separate thread by letting the saga thread to continue the execution without being unnecessarily delayed. or you can use the SagaKafkaTemplate.getCurrentState method to get the current state of the transaction and then perform the heavy processing based on that.
TransactionEventListener

You can implement the TransactionEventListener interface to listen to the transaction events in a non-reactive environment as below.

@Slf4j
@Component
@RequiredArgsConstructor
public class PlaceOrderHandler implements TransactionEventListener<OrderDomainEntity> {
    @Override
    public void onStateChanged(TransactionState<OrderDomainEntity> state) {
        TransactionCompleteStatus currentStatus = state.getCurrentStatus();
        log.info("Current status of transaction: {}", currentStatus);
        NavigableMap<Integer, SagaExecutionEvent> executionHistory = state.getExecutionHistory();
        log.info("Execution history of transaction: {}", executionHistory);
        OrderDomainEntity currentDomainEntity = state.getCurrentDomainEntity();
        log.info("Current domain entity state: {}", currentDomainEntity);
        ZonedDateTime startedDateTime = state.getStartedDateTime();
        log.info("Transaction started at: {}", startedDateTime.toLocalDateTime());
    }
}
ReactiveTransactionEventListener

You can implement the ReactiveTransactionEventListener interface to listen to the transaction events in reactive environment as below.

@Slf4j
@Component
@RequiredArgsConstructor
public class ReactivePlaceOrderHandler implements ReactiveTransactionEventListener<OrderDomainEntity> {
    @Override
    public Mono<Void> onStateChanged(TransactionState<OrderDomainEntity> transactionState) {
        return Mono
                .just(transactionState)
                .doOnNext(state -> {
                    TransactionCompleteStatus currentStatus = state.getCurrentStatus();
                    log.info("Current status of transaction: {}", currentStatus);
                    NavigableMap<Integer, SagaExecutionEvent> executionHistory = state.getExecutionHistory();
                    log.info("Execution history of transaction: {}", executionHistory);
                    OrderDomainEntity currentDomainEntity = state.getCurrentDomainEntity();
                    log.info("Current domain entity state: {}", currentDomainEntity);
                    ZonedDateTime startedDateTime = state.getStartedDateTime();
                    log.info("Transaction started at: {}", startedDateTime.toLocalDateTime());
                })
                .flatMap(orderDomainEntityTransactionState -> {
                    //simulate some async processing here.
                    return Mono.delay(Duration.ofSeconds(1))
                            .then();
                });
    }
}
Handler Architecture

StackSaga team suggests to implement the saga transaction related things in a separate class called Handler that is responsible for starting, listening and checking the status of the saga transactions. The below code snippet is showing the recommended architecture for the Handler.

@Slf4j
@Component
@RequiredArgsConstructor
public class PlaceOrderHandler implements TransactionEventListener<OrderDomainEntity> {

    private StackSagaKafkaTemplate<OrderDomainEntity, PlaceOrderTopic> stackSagaKafkaTemplate;

    public String handle(String username, double amount, List<String> items) { (1)
        return this
                .stackSagaKafkaTemplate
                .init(() -> {
                  //....
                })
                .....
    }

    public CustomOrderStatusView getCurrentState(String orderId) { (2)
        return this
                .stackSagaKafkaTemplate
                .getCurrentState(orderId)
                ......

    }
    @Override
    public void onStateChanged(TransactionState<OrderDomainEntity> transactionState) { (3)
        //....
    }
}
1 The handle method is responsible for starting the saga transaction by using the StackSagaKafkaTemplate and providing the required parameters to start the transaction.
2 Fetch the current state of the transaction by using the StackSagaKafkaTemplate
3 Implement the onStateChanged method to listen to the transaction status changes by implementing the TransactionEventListener interface and override the onStateChanged method.
Configuration Properties Of stacksaga-kafka-orchestrator
Property DataType Default Value Description

stacksaga.kafka.orchestrator.domain-entity-scan

String[]

[]

A comma-separated list of package names to scan for @SagaDomainEntity annotated classes. for instance, com.example.domain,com.example.anotherdomain.

stacksaga.kafka.orchestrator.auto-create

boolean

true

A flag to enable or disable automatic creation of Kafka topics when the application starts.

stacksaga.kafka.orchestrator.shared-response-listener.processing-mode

SharedListenerProcessingMode

PARTITION_ORDERED`

The processing mode for the shared response listener. see more ISOLATED for more details about the processing modes.

stacksaga.kafka.orchestrator.shared-response-listener.concurrency

int

128

if the shared-response-listener.processing-mode is set to CONCURRENT, this property defines the level of concurrency for processing messages in the shared response listener. it is ignored for other processing modes.

stacksaga-kafka-worker

Stacksaga-kafka-worker is the module that provides the functionalities for the worker services to execute the commands that are sent by the orchestrator via the regular topics. it is responsible for consuming the command messages from the regular topics, executing the business logic, and sending the response messages back to the orchestrator via the response topics (shared root topic or domain-root topic based on the listener scope configuration).

In high-level, The following duties are done by the stacksaga-kafka-worker module.

  1. Provides the specifications for creating the kafka saga endpoints according to the Stacksaga kafka engine in the target client service.

  2. Making the response back to the orchestrator service after each invocation.

  3. Providing immediate retrying facilities in case of retryable exceptions.

Stacksaga Kafka Endpoints

Stacksaga Kafka Endpoints stands for creating topics accordingly Stacksaga engine’s form. the endpoint As per the architecture it can be created 2 types of endpoints as fallows,

Query Endpoint

If an execution(atomic execution) doesn’t make any state change due to executing that execution those kinds of executions are executed in Query Endpoints. due to it doesn’t make any changes in the database state, it has no any compensation action to undo.
these are the topics that define in the in StackSaga Topic with type of SagaEventType.QUERY_DO_ACTION

For instance, in the place-order example, the execution of fetching user’s details and validate is a query execution because it doesn’t make any state change in the database, it just fetches the data and returns it. here is the custom endpoint for that execution.

Query endpoints can be created in two different ways in StackSaga.

Non-Reactive (Imperative) Query Endpoint

Non-reactive query endpoints are the traditional and blocking way of implementing the query endpoints. in this approach, the method that is responsible for executing the query is implemented in a blocking way. for instance, in the place-order example, the query endpoint for fetching user’s details can be implemented as follows.

(2)
@SagaEndpoint(
        //the real topic name will be saga.do.user-service.validate-user
        topicNameSuffix = "user-service.get-user", (3)
        listenerScope = ListenerScope.SHARED (4)
)
public class UserValidateEndpoint extends QueryEndpoint { (1)

    (5)
    @Override
    public void doProcess(ConsumerRecord<String, SagaPayload> consumerRecord)
            throws JustRetryableExecutorException, RetryableExecutorException, NonRetryableExecutorException {

        log.info("Message Key (Transaction Id): {}", consumerRecord.key()); (6)
        log.info("IdempotencyKey: {}", consumerRecord.value().getIdempotencyKey()); (7)

        consumerRecord.value().getCurrentDomainEntityStateForUpdate().ifPresent(currentDomainEntityState -> { (8)
            log.info("Received payload for before user validation: {}", currentDomainEntityState);
            try {
                {
                    //user validation logic (9)
                    String username = currentDomainEntityState.get("username").asText();
                    log.info("Validating user with username: {}", username);
                }
                {(10)
                    // put the result into the domain entity state for the next topic to use.
                    currentDomainEntityState.put("is_user_validated", true);
                    currentDomainEntityState.put("validation_note", "user validated at " + LocalDateTime.now());
                }
            } catch (SomeRetryableException e) {
                (11)
                throw RetryableExecutorException.of(e);
            } catch (SomeNonRetryableException e) {
                (12)
                throw NonRetryableExecutorException
                        .buildWith(e)
                        .put("error_code", "USER_VALIDATION_FAILED")
                        .put("key-1", "value-1")
                        .put("key-2", "value-2")
                        .build();
            }
        });
    }
}
The example has been simplified to focus on the main points. make sure refer each and every concept though the documentation to have a better understanding of the framework and its capabilities.
1 Create a custom query endpoint class by extending the QueryEndpoint class.
2 Annotate the custom query endpoint class with @SagaEndpoint annotation. it primarily marks the class as a spring bean.
3 topicNameSuffix: provide the suffix of the topic name for this endpoint. the framework will create the real topic name by concatenating the provided suffix as saga.do.user-service.get-user. the prefix saga.do is added by the framework to indicate that this topic is used for the primary execution. this is the topic the EventManager will send the command messages to trigger the execution of this endpoint.
4 listenerScope: Provide the listener scope for the endpoint. it can be either ListenerScope.SHARED or ListenerScope.ISOLATED. see Listener Models For Endpoint Topics in stacksaga-kafka-worker
5 Override the doProcess() method to implement the business logic for executing the query. this method is invoked by the framework when a message is received in the respective topic. you can use the provided ConsumerRecord to access the message key, payload, and other metadata withing the method scope.
6 You can access the message key from the ConsumerRecord in string format. the key is the transaction id from the orchestrator that is sent along with the message.
For your understanding,The reason is for setting the transaction id as the message key is to ensure that all messages related to the same transaction are sent to the same partition in Kafka, which allows for ordered processing of messages within a transaction.
7 You can access the idempotency key from the payload of the ConsumerRecord. the idempotency key is generated by the framework in the orchestrator side and sent along with the message to ensure that even if the same message is processed multiple times due to retries or duplicates, it will have the same idempotency key, allowing you to implement idempotent processing logic in your endpoint.
8 You can access the current state of the domain entity from the payload of the ConsumerRecord by using the getCurrentDomainEntityStateForUpdate() method. this method returns an optional value that contains the current state of the domain entity if it is present. in primary execution, the domain entity state is always present because the state is sent from the orchestrator. the Optional provide the safe access.
In the target service side, the domain entity state can be got as the Java Object. because the original CustomDomainEntity is in the orchestrator side. in the worker side you can get the domain entity state as a ObjectNode which is a JSON tree structure that is provided by the Jackson library. you can use the methods of the ObjectNode class to access and manipulate the domain entity state. for instance, you can use the get("fieldName") method to access a specific field in the domain entity state, and you can use the put("fieldName", value) method to add or update a field in the domain entity state.
9 Implement the business logic for executing the query. you can use the current state of the domain entity to perform the necessary operations and get the required data. for instance, you can call the internal service class and get the user data from the database based on the username that is provided in the domain entity state.
10 After getting the required data and performing the necessary operations, you can put the result into the domain entity state for the next topic to use. for instance, you can put a field is_user_validated with value true to indicate that the user has been validated successfully, and also you can put a field validation_note with some note about the validation process.
the adding field to the domain entity state in the worker side should have been defined in the CustomDomainEntity class in the orchestrator side because the domain entity state is shared between the orchestrator and the worker via the kafka messages, so the structure of the domain entity state should be defined in a way that both sides can understand. therefore, it is recommended to define all the possible fields that can be added to the domain entity state in the CustomDomainEntity class in the orchestrator side.
If you try to add a field in the worker side that is not defined in the CustomDomainEntity class in the orchestrator side, it will be added to the missingProperties map in the domain entity state, and you can access it by using the getMissingProperties() or getMissingProperty(String key) in the orchestrator side. it helps to avoid the serialization and deserialization issues that can be caused by adding undefined fields in the worker side, but it is recommended to define all the possible fields in the CustomDomainEntity class in the orchestrator side to have a clear contract between the orchestrator and the worker.
Even if the undefined fields are added to the missingProperties map, it can be accessed in the same way by another next worker that is executed after the current worker.
As mentioned in the architecture, all the modifications that made on the domain entity state within the method are not applied on the state if any exception (RetryableExecutorException or NonRetryableExecutorException) is thrown from the method.
the reason is that the original domain entity state (Entered State) is sent from the orchestrator to the worker should be as it is for the re-invocation in case of retrying. on other hand, if it is thrown NonRetryableExecutorException, the updated values are not valid due to the compensation process is started and the state will be reverted to the last successful state. if you want to bring some error related data you can use the put() method of the NonRetryableExecutorException to add that data in a key-value format, and then you can access it in the orchestrator side during the compensation processing.
11 This is the most important part in the implementation of the endpoint. you must handle the exceptions properly by throwing them with the appropriate type. if the exception is retryable, you should throw it by wrapping with RetryableExecutorException. if the exception is non-retryable, you should throw it by wrapping with NonRetryableExecutorException.
IF the exception is retryable one, the full exception trace message is not sent to the orchestrator, instead, only the exception message is sent to the orchestrator to reduce the redundant information that is sent to the orchestrator because the retryable exceptions are automatically retried by the framework and there is nothing to monitor or debug.
By throwing the RetryableExecutorException, stacksaga support asynchronous retrying based on scheduling. but it is recommended to implement immediate retrying by using the JustRetryableExecutorException before throwing the RetryableExecutorException to have a better user experience and reduce the overall processing time of the transaction. because the JustRetryableExecutorException will trigger an immediate retry without waiting for the scheduled retry, and if the immediate retry is successful, it will avoid the delay that is caused by scheduling, and also it will avoid sending redundant messages to the orchestrator for each retry attempt. in case the immediate retry is failed, you can throw the RetryableExecutorException to trigger the scheduled retry as a fallback mechanism. see here how the JustRetryableExecutorException can be used for immediate retrying in the endpoint implementation.
12 when throwing NonRetryableExecutorException, you can also provide additional information by using the put() method to add key-value pairs to the exception. this additional information can be useful for future compensation processing. because in the compensation processing, you can access the exception and get the additional information that you provided in the exception to perform the necessary compensation logic. for instance, if you put an error_code with value USER_VALIDATION_FAILED in the exception, you can access it in the compensation processing and check if the error code is USER_VALIDATION_FAILED to perform specific compensation logic for that error code.
Make sure not to manage the exception related metadata withing your exception because the exception object is not serialized and sent to the orchestrator. therefore, if you want to send any additional information related to the exception to the orchestrator for future compensation processing, you should use the put() method of the NonRetryableExecutorException to add that information in a key-value format. this way, the additional information will be serialized and sent to the orchestrator along with the message, and you can access it in the orchestrator side during the compensation processing. (only the exception trace is sent to the orchestrator as a string)
Immediate retry with JustRetryableExecutorException

Immediate retry with JustRetryableExecutorException only support for the non-reactive endpoints because the reactive endpoints already support immediate retry by using the reactive programming paradigm without the need for a specific exception type. in the non-reactive endpoints, you can throw JustRetryableExecutorException to trigger an immediate retry without waiting for the scheduled retry. if the immediate retry is successful, it will avoid the delay that is caused by scheduling, and also it will avoid sending redundant messages to the orchestrator for each retry attempt. in case the immediate retry is failed, you can throw the RetryableExecutorException to trigger the scheduled retry as a fallback mechanism. in the non-reactive endpoints, the retry is handled behind the scenes by the framework by using spring retry-template. so you are able to use all the features that are provided by the spring retry-template for retrying in the non-reactive endpoints. Here is an example of how the JustRetryableExecutorException can be used for immediate retrying in the non-reactive query endpoint implementation.

Immediate retry with RetryTemplate is supported for doProcess() method in both non-reactive command and query endpoints and also for undoProcess() method in the non-reactive command endpoints.
@Slf4j
@SagaEndpoint(
        topicNameSuffix = "payment.service.make-payment",
        listenerScope = ListenerScope.SHARED
)
public class MakePaymentEndpoint extends CommandEndpoint {

    @Override
    public void doProcess(ConsumerRecord<String, SagaPayload> consumerRecord)
            throws JustRetryableExecutorException, RetryableExecutorException, NonRetryableExecutorException {
        consumerRecord
                .value()
                .getCurrentDomainEntityStateForUpdate()
                .ifPresent(currentDomainEntityState -> {
                    final String paymentReferenceId = currentDomainEntityState.get("payment_reference_id").asText();
                    final double totalAmount = currentDomainEntityState.get("total_amount").asDouble();
                    try {
                        this.internalPaymentService.makePayment(paymentReferenceId, totalAmount);
                    } catch (SomeRetryableException e) {
                        RetryContext retryContext = consumerRecord.value().getRetryContext().orElseThrow();
                        if (retryContext.getRetryCount() >= 3) {
                            //if the retry count has reached the threshold, we can throw a NonRetryableExecutorException to stop further retries.
                            throw NonRetryableExecutorException
                                    .buildWith(e)
                                    .put("error_code", "PAYMENT_FAILED_AFTER_RETRIES")
                                    .build();
                        } else {
                            throw JustRetryableExecutorException.of("Payment failed, retrying... attempt " + (retryContext.getRetryCount() + 1));
                        }
                    } catch (SomeNonRetryableException e) {
                        throw NonRetryableExecutorException
                                .buildWith(e)
                                .put("error_code", "PAYMENT_FAILED")
                                .build();
                    }
                });
    }

    @Override
    public void undoProcess(ConsumerRecord<String, SagaPayload> consumerRecord) throws
            JustRetryableExecutorException, RetryableExecutorException {
    }
}

This example shows how to implement immediate retrying in the MakePaymentEndpoint command endpoint by using the JustRetryableExecutorException. in this example, if the makePayment() method throws a retryable exception, it checks the retry count from the RetryContext that is provided in the payload of the ConsumerRecord. although it has been wrapped with Optional you can safely access it because the retry context is always present if the executor is a non-reactive one. if the retry count has reached the threshold (in this example, it is 3), it throws a NonRetryableExecutorException to stop further immediate retries and indicate to the framework to schedule the retry based on the retry configurations. if the retry count has not reached the threshold, it throws a JustRetryableExecutorException with a message indicating to the framework to invoke an immediate retry. if the immediate retry is successful, it will avoid the delay that is caused by scheduling, and also it will avoid sending redundant messages to the orchestrator for each retry attempt.

Configure Custom AbstractSagaEndpointSchedulerProvider for Non-Reactive Endpoints

Due to the fact that StackSaga works in non-blocking and reactive under the hood, If the endpoint is one of non-reactive, it should be configured the Scheduler for executing the blocking code in a non-blocking way. the framework provides a default shared Scheduler for both primary execution and revert execution of the non-reactive endpoints, but you can configure a custom Scheduler for your endpoints by creating a custom AbstractSagaEndpointSchedulerProvider and configuring it in the endpoint. because sometimes the default shared Scheduler may not be suitable for your use case, for instance, if you want to have a dedicated Scheduler for each endpoint or if you want to configure the Scheduler with specific configurations.

The default shared Scheduler that is provided by the framework is a boundedElastic Scheduler with the fallowing configurations:

  1. Primary Execution Scheduler

    • threadCap : Runtime.getRuntime().availableProcessors() * 10

    • queuedTaskCap: 100_000

    • ttlSeconds: 60

    • daemon: false

  2. Revert Execution Scheduler

    • threadCap : Runtime.getRuntime().availableProcessors() * 3

    • queuedTaskCap: 100_000

    • ttlSeconds: 60

    • daemon: false

Override the default Scheduler configurations

If you want to override the default Scheduler configurations for the application level, you can provide the custom configurations bean to override the default configurations in your class path.

@Configuration
public class EndpointSchedulerConfig {

    @Bean
    public AbstractDefaultPrimaryExecutionSchedulerProvider customSharedPrimaryExecutionSchedulerProvider() {
        return new AbstractDefaultPrimaryExecutionSchedulerProvider() {
            @Override
            protected Scheduler scheduler() {
                log.info("lazy initialization of custom shared primary execution scheduler for non-reactive(blocking) execution");
                return Schedulers.newBoundedElastic(
                        100,
                        Integer.MAX_VALUE,
                        "custom-do-shared",
                        60,
                        false
                );
            }
        };
    }

    @Bean
    public AbstractDefaultRevertExecutionSchedulerProvider customSharedRevertExecutionSchedulerProvider() {
        return new AbstractDefaultRevertExecutionSchedulerProvider() {
            @Override
            protected Scheduler scheduler() {
                log.info("lazy initialization of custom shared revert execution scheduler for non-reactive(blocking) execution");
                return Schedulers.newBoundedElastic(
                        20,
                        Integer.MAX_VALUE,
                        "custom-undo-shared",
                        60,
                        false
                );
            }
        };
    }
}
Create custom AbstractSagaEndpointSchedulerProvider and configure it in the endpoint

Here is an example of how to create a custom AbstractSagaEndpointSchedulerProvider and configure it in the endpoint.

@Component
public class MakePaymentExecutionSchedulerProvider extends AbstractSagaEndpointSchedulerProvider {
    @Override
    protected Scheduler scheduler() {
        return Schedulers.newBoundedElastic(20, Integer.MAX_VALUE, "demo-name");
    }
}


@Slf4j
@SagaEndpoint(
        listenerScope = ListenerScope.SHARED,
        primaryExecutionSchedulerProvider = MakePaymentExecutionSchedulerProvider.class,
        revertExecutionSchedulerProvider = MakePaymentExecutionSchedulerProvider.class
)
public class ReactiveMakePaymentEndpoint extends ReactiveCommandEndpoint {
    //...
}
the scheduler() method is invoked only once when the application starts and the returned the Scheduler and then framework will cache it for the future use.

In this example, we created a custom MakePaymentExecutionSchedulerProvider that extends the AbstractSagaEndpointSchedulerProvider and overrides the scheduler() method to provide a custom Scheduler configuration. then we configured the custom SchedulerProvider in the ReactiveMakePaymentEndpoint for both primary execution and revert execution by using the primaryExecutionSchedulerProvider and revertExecutionSchedulerProvider attributes of the @SagaEndpoint annotation. you can configure different SchedulerProvider for primary execution and revert execution if you want to have different configurations for each execution as well.

Configure RetryTemplate for Non-Reactive Endpoints

As we mentioned above, the non-reactive endpoints use RetryTemplate for handling retries, and you can configure the RetryTemplate to customize the retry behavior for your endpoints. you can configure the RetryTemplateBuilder by defining a bean of type AbstractRetryTemplateProvider in your application context and overriding the retryTemplateBuilder() method to provide your custom configuration for the RetryTemplate. here is an example of how to configure the RetryTemplate for non-reactive endpoints.

@Component
public class MakePaymentRetryTemplateProvider extends AbstractRetryTemplateProvider {
    @Override
    protected RetryTemplateBuilder retryTemplateBuilder() {
        SimpleRetryPolicy simpleRetryPolicy = new SimpleRetryPolicy();
        simpleRetryPolicy.setMaxAttempts(3);
        return RetryTemplate.builder().customPolicy(simpleRetryPolicy);
    }
}
The MakePaymentRetryTemplateProvider class is annotated with @Component to make it a Spring bean.

After creating the custom RetryTemplateProvider, you can configure it into the endpoint as below.

@Slf4j
@SagaEndpoint(
        topicNameSuffix = "payment.service.make-payment",
        listenerScope = ListenerScope.SHARED,
        primaryExecutionRetryTemplate = MakePaymentRetryTemplateProvider.class, (1)
        revertExecutionRetryTemplate = MakePaymentRetryTemplateProvider.class (2)
)
public class MakePaymentEndpoint extends CommandEndpoint {
    //...
}

In this example, the MakePaymentRetryTemplateProvider is configured for both primary execution and revert execution by using the primaryExecutionRetryTemplate and revertExecutionRetryTemplate attributes of the @SagaEndpoint annotation. you can configure different RetryTemplateProvider for primary execution and revert execution if you want to have different retry behavior for each execution.

Reactive Query Endpoint
@Slf4j
@SagaEndpoint(
        //the real topic name will be saga.do.user-service.validate-user
        topicNameSuffix = "user-service.get-user",
        listenerScope = ListenerScope.SHARED
)
public class ReactiveUserValidateEndpoint extends ReactiveQueryEndpoint {(1)

    (2)
    @Override
    public Mono<Void> doProcess(ConsumerRecord<String, SagaPayload> consumerRecord) {
        log.info("Message Key (Transaction Id): {}", consumerRecord.key());
        log.info("IdempotencyKey: {}", consumerRecord.value().getIdempotencyKey());
        final ObjectNode currentDomainEntityState = consumerRecord
                .value()
                .getCurrentDomainEntityStateForUpdate()
                .orElseThrow();
        log.info("Received payload for before user validation: {}", currentDomainEntityState);
        String username = currentDomainEntityState.get("username").asText();
        return this.internalUserService
                .getUserDetails(username)
                .flatMap(userDetails -> {
                    {
                        //user validation logic
                    }
                    {
                        // put the result into the domain entity state for the next topic to use.
                        currentDomainEntityState.put("is_user_validated", true);
                        currentDomainEntityState.put("validation_note", "user validated at " + LocalDateTime.now());
                    }
                    return Mono.empty();
                })
                .onErrorResume(SomeRetryableException.class, e -> {
                    return Mono.error(RetryableExecutorException.of(e));
                })
                .onErrorResume(SomeNonRetryableException.class, e -> {
                    return Mono.error(NonRetryableExecutorException
                            .buildWith(e)
                            .put("error_code", "USER_VALIDATION_FAILED")
                            .build());
                });
    }
}
If you are not familiar with how the endpoints are working in the Stacksaga Kafka Orchestrator, it is recommended to refer to the Non-Reactive (Imperative) Query Endpoint before going through the reactive query endpoint because all the concepts and the main points have been explained there in a more detailed way.

Most of the points are the same as the Non-Reactive (Imperative) Query Endpoint, the main difference is that in the reactive query endpoint, you need to return a Mono<Void> from the doProcess() method, and you can use the reactive programming paradigm to implement the business logic for executing the query. you can use the Mono and Flux types from Project Reactor to handle asynchronous and non-blocking operations in your endpoint implementation. for instance, in the example above, the internalUserService.getUserDetails(username) method returns a Mono<UserDetails> which is a reactive type that represents a single asynchronous value. you can use the map() operator to transform the result and perform the necessary operations, and you can use the onErrorResume() operator to handle exceptions and return the appropriate error types.

Command Endpoint

If an atomic execution makes state change in the database those kinds of executions are executed in Command Endpoints. due to it makes changes in the database, it has compensation action to undo.
StackSaga Topic which creates as the type of SagaEventType.COMMAND_DO_ACTION and SagaEventType.COMMAND_UNDO_ACTION are configured in the command endpoints.

For instance, in the place-order example, the execution of Make Payment execution is a command execution because it makes state change in the payment service’s database by creating a new payment record and updating the user’s balance. in case the entire process should be compensated after successfully executing the Make Payment execution, the compensation action will be triggered to undo the payment.

In the Command endpoints, there are two methods that you need to implement to handle the primary execution and the compensation execution. the doProcess() method is responsible for executing the primary execution and the undoProcess() method is responsible for executing the compensation logic to undo the changes that are made by the primary execution in case of any failure in the next steps after the primary execution. therefore, you need to implement both methods to have a complete command endpoint implementation.

  • in the doProcess() method, it can be thrown RetryableExecutorException and NonRetryableExecutorException to indicate the failure of the primary execution to the framework. if RetryableExecutorException is thrown, the framework will schedule the transaction for retrying based on the retry configurations. if NonRetryableExecutorException is thrown, the framework will trigger the compensation process to undo the transaction.

  • If it is thrown NonRetryableExecutorException from the doProcess() method, the modified values withing the method is not applied to the domain entity state because the compensation process is started and the state will be reverted to the last successful state. see more in details in the example.

Command endpoints can be created in two different ways in StackSaga.

Non-Reactive (Imperative) Command Endpoint

Non-reactive command endpoints are the traditional and blocking way of implementing the command endpoints. in this approach, the method that is responsible for executing the command is implemented in a blocking way.

@Slf4j
@SagaEndpoint(
        topicNameSuffix = "payment.service.make-payment",
        listenerScope = ListenerScope.SHARED
)
class MakePaymentEndpoint extends CommandEndpoint { (1)

    (2)
    @Override
    public void doProcess(ConsumerRecord<String, SagaPayload> consumerRecord)
            throws JustRetryableExecutorException, RetryableExecutorException, NonRetryableExecutorException {
        //the theory is the same as the query endpoint.
    }

    (2)
    @Override
    public void undoProcess(ConsumerRecord<String, SagaPayload> consumerRecord) throws JustRetryableExecutorException, RetryableExecutorException {
        log.info("Message Key (Transaction Id): {}", consumerRecord.key());
        log.info("IdempotencyKey: {}", consumerRecord.value().getIdempotencyKey());
        (3)
        final JsonNode lastDomainEntityState = consumerRecord.value().getDomainEntityState();
        log.info("Last domain entity state {}", lastDomainEntityState);

        try {
            {//undo payment logic
                final double totalAmount = lastDomainEntityState.get("total_amount").asDouble();
                final String paymentRef = lastDomainEntityState.get("payment_reference_id").asText();
                this.internalPaymentSerivce.refundPayment(paymentRef, totalAmount);
                //update some hints for the next revert execution if needed.
                consumerRecord.value().getHintStore().ifPresent(hintStore -> {(4)
                    hintStore.put("last_refund_time", LocalDateTime.now().toString());
                });
            }
        } catch (SomeRetryableException e) {(5)
            throw RetryableExecutorException.of(e);
        }
    }
}
The doProcess() method implementation is not provided in the example because the main focus is on the undoProcess() method to explain the compensation processing in the command endpoints. but the theory for implementing the doProcess() method is the same as the query endpoint that is explained in the previous section. read the Non-Reactive (Imperative) Query Endpoint to understand how to implement the doProcess() method.
1 Create a custom command endpoint class by extending the CommandEndpoint class.
2 Override the doProcess() method to implement the business logic for executing the command. this is exactly the same as the query endpoint, you can refer Non-Reactive (Imperative) Query Endpoint example to get the details about how to implement the doProcess() method.
3 Override the undoProcess() method to implement the business logic for undoing the command. this method is invoked by the framework when a compensation action is triggered for the respective command execution. you can use the provided ConsumerRecord to access the message key, payload, and other metadata within the method scope.
the payload of the ConsumerRecord in the undoProcess() method contains the last state of the domain entity before the execution of the command that is being compensated. you can access this state by using the getDomainEntityState() method of the payload. this allows you to have the necessary information to perform the compensation logic based on the last known state of the domain entity. getDomainEntityState() method returns a JsonNode which is a JSON tree structure that is provided by the Jackson library. you can use the methods of the JsonNode class to access the fields in the last domain entity state. for instance, you can use the get("fieldName") method to access a specific field in the last domain entity state. but you can not modify the last domain entity state because it is immutable in the compensation processing.
4 If you want to send some hints or additional information from the current compensation execution to the next compensation execution in the next step, you can use the getHintStore() method of the payload to access the hint store and put the necessary information in it. the hint store is a key-value store that allows you to share information between different compensation executions. for instance, if you want to put the last refund time as a hint for the next compensation execution, you can use the put() method of the hint store to add that information with a specific key as shown in the example.
The updated or added hints in the hint store is applied only if there is no any error that is thrown from the undoProcess() method. if RetryableExecutorException is thrown from the undoProcess() method, the compensation process will be retried based on the retry configurations, and the hints that are added or updated in the current execution will not be applied in the next retry execution because it is expected to have the same conditions for the retry executions until it is succeeded. therefore, make sure to manage the hints properly in the compensation processing to ensure that they are applied correctly in the next compensation execution. if any unhandled exception is thrown from the undoProcess() method, the compensation process will be stopped and the transaction will be terminated and the hints that are added or updated in the current execution will not be applied because there will be no next compensation execution due to the termination of the transaction.
5 Identify the retryable exceptions that can be thrown from the compensation logic and make sure to throw them by wrapping with RetryableExecutorException to enable the retrying mechanism for the compensation process. if any unhandled exception is thrown from the undoProcess() method, the compensation process will be stopped and the transaction will be terminated. therefore, it is important to handle the exceptions properly in the compensation logic to ensure that the compensation process can retry when necessary and to avoid unexpected termination of the transaction. in undoProcess() only can throw JustRetryableExecutorException and RetryableExecutorException.
Reactive (Imperative) Command Endpoint

If you are in a reactive programming environment and want to implement the command endpoints in a non-blocking way, you can extend the ReactiveCommandEndpoint class and implement the doProcess() and undoProcess() methods by using the reactive programming paradigm with Project Reactor’s Mono and Flux types. the theory for implementing the reactive command endpoint is the same as the non-reactive command endpoint, but you need to return a Mono<Void> from both methods and use the reactive operators to handle asynchronous operations and exceptions. for instance, you can use the map(), flatMap(), and onErrorResume() operators to implement the business logic and exception handling in a reactive way.

@Slf4j
@SagaEndpoint( (2)
        topicNameSuffix = "payment.service.make-payment",
        listenerScope = ListenerScope.SHARED
)
class ReactiveMakePaymentEndpoint extends ReactiveCommandEndpoint { (1)

    @Override
    public Mono<Void> doProcess(ConsumerRecord<String, SagaPayload> consumerRecord) {
        //the theory is the same as the query endpoint.
    }

    @Override
    public Mono<Void> undoProcess(ReceiverRecord<String, SagaPayload> consumerRecord) {
        log.info("Message Key (Transaction Id): {}", consumerRecord.key());
        log.info("IdempotencyKey: {}", consumerRecord.value().getIdempotencyKey());
        final JsonNode lastDomainEntityState = consumerRecord.value().getDomainEntityState();
        log.info("Last domain entity state {}", lastDomainEntityState);
        final double totalAmount = lastDomainEntityState.get("total_amount").asDouble();
        final String paymentRef = lastDomainEntityState.get("payment_reference_id").asText();
        this.internalPaymentSerivce
                .refundPayment(paymentRef, totalAmount)
                .flatMap(response -> {
                        //update some hints for the next revert execution if needed.
                        consumerRecord.value().getHintStore().ifPresent(hintStore -> {
                            hintStore.put("last_refund_time", LocalDateTime.now().toString());
                        });
                        return Mono.empty();
                })
                .onErrorResume(SomeRetryableException.class, e -> {(3)
                    return Mono.error(RetryableExecutorException.of(e));
                });
    }
}
doProcess() method implementation is not provided in the example because the main focus is on the undoProcess() method to explain the compensation processing in the command endpoints. but the theory for implementing the doProcess() method is the same as the query endpoint that is explained in the previous section. read the Non-Reactive (Imperative) Query Endpoint to understand how to implement the doProcess() method.
1 Create a custom reactive command endpoint class by extending the ReactiveCommandEndpoint class.
2 Annotate the custom reactive command endpoint class with @SagaEndpoint annotation. it primarily marks the class as a spring bean.
3 Identify the retryable exceptions that can be thrown from the compensation logic and make sure to handle them by using the onErrorResume() operator to return a Mono.error() with RetryableExecutorException to enable the retrying mechanism for the compensation process. if any unhandled exception is thrown from the undoProcess() method, the compensation process will be stopped and the transaction will be terminated. therefore, it is important to handle the exceptions properly in the compensation logic to ensure that the compensation process can retry when necessary and to avoid unexpected termination of the transaction.
Based on the RetryableExecutorException Stacksaga framework provide only the asynchronous retrying (scheduling retry). if you want to have the immediate retrying( in the same thread), you can manage it withing the pipeline by using the retryWhen() operator of the reactive programming. it is recommended implement the immediate retrying logic first before throwing the RetryableExecutorException to have a better control over the retrying mechanism and to avoid unnecessary scheduling of retries when the retry can be done immediately. in case the immediate retry is exhausted, you can throw the RetryableExecutorException to schedule the retry based on the retry configurations. read more about Retry to better understand the retrying mechanisms.
@SagaEndpoint Annotation

@SagaEndpoint is the main annotation that is used to mark the custom endpoint classes in Stacksaga Kafka worker applications. it is a meta-annotation that is used to create custom annotations for different types of endpoints such as query endpoints and command endpoints. the @SagaEndpoint annotation provides several attributes that can be used to configure the behavior of the endpoint, such as the topic name suffix, listener scope, and other configurations. by using this annotation, you can easily create and configure your custom endpoints in a consistent way across your application. here we are going to see all the attributes of the @SagaEndpoint annotation and how to use them to tuning the behavior of your endpoints for your specific use case.

  1. value

    • Description: This attribute is used to provide a unique name for the endpoint. it is optional and can be used for better readability and debugging purposes. if not provided, the framework will use the class name as the default value for the endpoint name. Spring bean is created with the name provided in this attribute.

  2. topicNameSuffix

    • provide the suffix of the topic name for this endpoint. the framework will create the real topic name by concatenating the provided suffix as saga.do.user-service.get-user. the prefix saga.do is added by the framework to indicate that this topic is used for the primary execution. this is the topic the EventManager will send the command messages to trigger the execution of this endpoint. therefore, it is important to provide the correct topic name suffix that matches the topic name that is used in the EventManager to ensure that the messages are sent to the correct topic and consumed by the correct endpoint.

  3. listenerScope

    • Provide the listener scope for the endpoint. listenerScope defines how the message listeners are created and managed for the endpoint. it can be either ListenerScope.SHARED or ListenerScope.ISOLATED. if it is set to ListenerScope.SHARED, the framework binds the respective topic to the default shared message listener container. if it is set to ListenerScope.ISOLATED, the framework creates a dedicated message listener container for the respective topic. if you want to have an isolated listener for a specific endpoint, you can set the listenerScope to ListenerScope.ISOLATED for that endpoint. this allows you to have better control over the message consumption and processing for that specific endpoint without affecting other endpoints that share the same listener. listenerScope tuning is one of the most important configurations that should be highly considered. read the ListenerScope tuning section in the best practices chapter to understand how to choose the appropriate listener scope for your endpoints based on your use case and requirements.

  4. processingMode

  5. primaryExecutionConcurrency

    • Specifies the concurrency level used for primary record processing (for doProcess()) when processingMode is configured as ProcessingMode.CONCURRENT. Multiple records may be processed concurrently using the configured number of worker threads.
      The value may be provided either as a numeric literal or as a Spring property placeholder.
      For example:
      primaryExecutionConcurrency = "10"
      primaryExecutionConcurrency = "${saga.sample1.primary.concurrency}"

      When a property placeholder is used, the value is resolved from the application configuration. saga.sample1.primary.concurrency=10

      Default: Runtime.getRuntime().availableProcessors() * 2

  6. revertExecutionConcurrency

    • Specifies the concurrency level used for compensation record processing (for undoProcess()) when processingMode is configured as ProcessingMode.CONCURRENT. Multiple records may be processed concurrently using the configured number of worker threads.
      The value may be provided either as a numeric literal or as a Spring property placeholder.
      For example:
      revertExecutionConcurrency = "5"
      revertExecutionConcurrency = "${saga.sample1.revert.concurrency}"

      When a property placeholder is used, the value is resolved from the application configuration. saga.sample1.revert.concurrency=5

      Default: Runtime.getRuntime().availableProcessors()

      This is not applicable for the QueryEndpoint because it does not have compensation processing, so the revertExecutionConcurrency will be ignored if it is provided in the @SagaEndpoint annotation of a QueryEndpoint.

  7. primaryExecutionRetryTemplate

  8. revertExecutionRetryTemplate

  9. primaryExecutionSchedulerProvider

    • Only applicable with primary executions (CommandEndpoint.doProcess(ConsumerRecord), QueryEndpoint.doProcess(ConsumerRecord)). Provides a custom AbstractPrimaryExecutionSchedulerProvider for controlling the scheduler on which the primary execution processing occurs. This is important for isolating blocking operations from the Kafka consumer thread to prevent performance degradation and ensure responsiveness. By default, the framework will use a bounded elastic scheduler suitable for blocking workloads via DefaultSchedulerConfig, but you can provide your own implementation to customize pool size, naming, and other characteristics. It is not recommended to provide separate schedulers for each unless if it is necessary for your use case, as it may lead to resource contention and inefficient thread usage. In most cases, a shared scheduler for all endpoints is sufficient and more efficient. but using shared default scheduler can lead to starvation of tasks. see the Configure Custom AbstractSagaEndpointSchedulerProvider for Non-Reactive Endpoints section to see how it can be configured a custom scheduler or see the Override the default Scheduler configurations section to see how to override the existing default shared scheduler configurations.

Listener Models For Endpoint Topics in stacksaga-kafka-worker
  • Endpoint Topics
    The topics that are used to send the command messages to the worker services are called as Endpoint Topics. these are the topics that you register in the EventManager by overriding the registerTopics() method in stacksaga-kafka-orchestrator and also the topics that are registered via the KafkaEndpoint annotation in the stacksaga-kafka-worker. these If the topics are not exist in the Kafka cluster and auto-creation of topics is enabled framework will create the topics automatically during the application startup. otherwise, you need to create the topics manually in the Kafka cluster before starting the application. The special thing is that both stacksaga-kafka-orchestrator and stacksaga-kafka-worker try to create the topics if they are not exist in the Kafka cluster and auto-creation of topics is enabled to avoid the delay of metadata polling that is caused by the first time topic creation. the reason for both trying to create the topics is the stacksaga-kafka-orchestrator has the awareness of all the configured topics via the EventManager and also the stacksaga-kafka-worker has the awareness of the topics that are relevant to their service via the `SagaEndpoint`s.

Even though there are separate topics in kafka, the framework provides two different listener models for consuming messages from the topics in the worker application based on the listenerScope configuration in the @SagaEndpoint annotation. The fallowing are the two listener models for consuming messages from the topics in the worker application.

Let’s dive into the details of each listener model.

SHARED
  • If the @SagaEndpoint is configured with listenerScope=ListenerScope.SHARED, the framework binds the relevant topic(s) to the shared receiver to consume messages for the respective endpoint. all the endpoints that are configured with listenerScope=ListenerScope.SHARED are bind into this shared KafkaReceiver.

Here is the architecture of the shared listener model for the endpoint topics in stacksaga-kafka-worker application.

Shared kafka receiver in stacksaga kafka worker
  • The consumer group is configured using the following naming convention: {serviceName}-ws where ws stands for worker service. for instance, if the service name is payment-service, the consumer group name will be payment-service-ws.

  • The shared listener/receiver is configured with: auto.offset.reset=earliest This ensures that previously published messages can be consumed after a service restart when no committed offset is available.

  • Message consumption is configured using At-Least-Once delivery semantics to ensure that messages are not lost, even in the event of service downtime or unexpected failures.

  • To guarantee At-Least-Once delivery semantics, each message is acknowledged only after it has been successfully processed.

  • Acknowledged offsets are committed periodically using the framework’s default time-based commit configuration. Therefore, if a failure occurs after a message has been processed and acknowledged but before the offset has been committed, the message may be delivered again and reprocessed after recovery.

  • Processing mode: The shared listener can be configured via stacksaga.kafka.worker.shared-topic-listener.processing-mode property to control the processing mode for the messages consumed by the shared listener. it can be either SharedListenerProcessingMode.CONCURRENT or SharedListenerProcessingMode.PARTITION_ORDERED. the default processing mode for the shared listener is SharedListenerProcessingMode.PARTITION_ORDERED to ensure that messages from the same partition of the same topic are processed in order. in SharedListenerProcessingMode.CONCURRENT mode, messages are processed concurrently without preserving the order of messages from the same partition. And
    In SharedListenerProcessingMode.PARTITION_ORDERED mode, messages from the same partition of the same topic are processed sequentially to preserve the order, while messages from different partitions can be processed concurrently. The default processing mode for the shared listener is SharedListenerProcessingMode.PARTITION_ORDERED to ensure that messages from the same partition of the same topic are processed in order, which is important for maintaining data consistency and integrity in many use cases. however, you can change it to SharedListenerProcessingMode.CONCURRENT if your use case allows for concurrent processing without strict ordering requirements, which can improve throughput and performance. See more relevant Configuration Properties Of stacksaga-kafka-worker

ISOLATED

If the @SagaEndpoint is configured with listenerScope=ListenerScope.ISOLATED, the framework creates a dedicated KafkaReceiver for the respective topic(s) to consume messages for that endpoint. this approach allows for better isolation and control over the message consumption and processing for that specific endpoint without affecting other endpoints that share the same listener. but it can increase the resource consumption as well.

Here is the architecture of the isolated listener model for the endpoint topics in stacksaga-kafka-worker application.

Isolated kafka receiver in stacksaga kafka worker
  • The consumer group is configured using the following naming convention: {serviceName}-ws where ws stands for worker service. for instance, if the service name is payment-service, the consumer group name will be payment-service-ws.

  • Each isolated listener is configured with: auto.offset.reset=earliest This ensures that previously published messages can be consumed after a service restart when no committed offset is available.

  • Message consumption is configured using At-Least-Once delivery semantics to ensure that messages are not lost, even in the event of service downtime or unexpected failures.

  • To guarantee At-Least-Once delivery semantics, each message is acknowledged only after it has been successfully processed.

  • Acknowledged offsets are committed periodically using the framework’s default time-based commit configuration. Therefore, if a failure occurs after a message has been processed and acknowledged but before the offset has been committed, the message may be delivered again and reprocessed after recovery.

    • Scheduler Config For Non-Reactive Endpoint: If the endpoint is non-reactive one, it can be provided a separate Scheduler for the primary execution and compensation execution by using the primaryExecutionSchedulerProvider and revertExecutionSchedulerProvider attributes in the @SagaEndpoint annotation. by default a shared two Schedulers are provided for all the endpoints in the application via AbstractDefaultPrimaryExecutionSchedulerProvider and AbstractDefaultRevertExecutionSchedulerProvider which are configured to use bounded elastic schedulers that are suitable for blocking workloads. but you can provide your own implementation of the AbstractSagaEndpointSchedulerProvider to primary and compensation separately for better isolation and control over the thread management for each execution type of each endpoint. see Configure Custom AbstractSagaEndpointSchedulerProvider for Non-Reactive Endpoints for the implementation.

  • Processing mode: The isolated listener can be configured via processingMode attribute in the @SagaEndpoint annotation which determines how messages are processed in the isolated listener. it can be configured for either fallowing modes,

    • IsolatedListenerProcessingMode.CONCURRENT mode, messages are processed concurrently without preserving the order of messages from the same partition.

      If the processing mode IsolatedListenerProcessingMode.CONCURRENT, the concurrency can be customized by using the primaryExecutionConcurrency and revertExecutionConcurrency attributes in the @SagaEndpoint annotation. it allows to control the number of concurrent threads for processing primary and compensation executions respectively.
    • IsolatedListenerProcessingMode.PARTITION_ORDERED: In this mode, messages are processed sequentially per partition while allowing concurrent execution across multiple partitions.
      The default processing mode for the isolated listener is PARTITION_ORDERED to ensure a balance between throughput and consistency while preserving the order of messages within each partition.

    • IsolatedListenerProcessingMode.SEQUENTIAL: In this mode, messages are processed sequentially without any concurrency. this mode is used when the order of processing is critical and must be preserved across all partitions.
      The SEQUENTIAL processing mode is not recommended for high-throughput scenarios due to its sequential nature, which can become a bottleneck. it is recommended to use this mode only when the strict ordering of message processing is a hard requirement and the expected message volume is low to moderate.

Configuration Properties Of stacksaga-kafka-worker
Property DataType Default Value Description

stacksaga.kafka.worker.auto-create-topics

boolean

true

Whether the worker should automatically create the necessary Kafka topics (the topics that configured via `SagaEndpoint`s) if they do not exist.

stacksaga.kafka.worker.shared-topic-listener.processing-mode

SharedListenerProcessingMode

PARTITION_ORDERED

The processing mode for the shared endpoint message listener. Default is PARTITION_ORDERED, which ensures that messages from the same partition of the same topic are processed in order.

stacksaga.kafka.worker.shared-topic-listener.concurrency

int

Runtime.getRuntime().availableProcessors() * 3

The concurrency level for the shared endpoint message listener when the processing mode is set to CONCURRENT. This defines how many messages can be processed in parallel.

Retry Configurations for Non-Reactive Endpoints : Primary execution

stacksaga.kafka.worker.retry.primary.max-attempts

int

3

The maximum number of retry attempts.

stacksaga.kafka.worker.retry.primary.initial-interval

Duration

1s

The initial interval between retry attempts.

stacksaga.kafka.worker.retry.primary.max-interval

Duration

1s

The maximum interval between retry attempts.

stacksaga.kafka.worker.retry.primary.multiplier

double

2.0

The multiplier for the retry interval. For example, with an initial interval of 1 second and a multiplier of 2.0, the retry intervals would be 1s, 2s, 4s, etc., up to the maximum interval.

Retry Configurations for Non-Reactive Endpoints: Revert execution

stacksaga.kafka.worker.retry.revert.max-attempts

int

3

The maximum number of retry attempts.

stacksaga.kafka.worker.retry.revert.initial-interval

Duration

1s

The initial interval between retry attempts.

stacksaga.kafka.worker.retry.revert.max-interval

Duration

1s

The maximum interval between retry attempts.

stacksaga.kafka.worker.retry.revert.multiplier

double

2.0

The multiplier for the retry interval. For example, with an initial interval of 1 second and a multiplier of 2.0, the retry intervals would be 1s, 2s, 4s, etc., up to the maximum interval.