Topics and EventManager

StackSaga Topic

StackSaga Topics are constant values that are used to identify the execution endpoint in StackSaga. Topic are nothing but the kafka topics that are used to send the command messages to the worker services. Stacksaga topics contains additional metadata such as the name of the topic, the type of the topic (primary or compensation), and the span name that is used to identify the execution point. Stacksaga topics are used in the EventManager to determine which topic should be triggered next based on the execution flow.

As per the place-order example, there were 4 atomic executions (Spans) in the primary flow like fetching user’s details, initialize order, make the payment,and inventory update. and also there were 3 atomic executions in the compensation flow like cancel order, refund payment, and release inventory. so there are 7 spans in total. here is the sample topics that are used in the place-order example,

class PlaceOrderTopic extends AbstractTopic<PlaceOrderTopic> {(1)

    (2)
    protected PlaceOrderTopic(String topicName, float topicKey, SagaEventType sagaEventType, String targetService) {
        super(topicName, topicKey, sagaEventType, targetService);
    }


    (3)
    protected PlaceOrderTopic(String topicName, float topicKey, SagaEventType sagaEventType, String targetService, PlaceOrderTopic parent) {
        super(topicName, topicKey, sagaEventType, targetService, parent);
    }

    (4)
    //primary execution topics.
    public static final PlaceOrderTopic DO_FETCH_USER_DETAILS = new PlaceOrderTopic("user.fetch", 1, SagaEventType.QUERY_DO_ACTION, "user-service");
    public static final PlaceOrderTopic DO_INITIALIZE_ORDER = new PlaceOrderTopic("order.init", 2, SagaEventType.COMMAND_DO_ACTION, "order-service");
    public static final PlaceOrderTopic DO_MAKE_PAYMENT = new PlaceOrderTopic("payment.make", 3, SagaEventType.COMMAND_DO_ACTION, "payment-service");
    public static final PlaceOrderTopic DO_INVENTORY_UPDATE = new PlaceOrderTopic("inventory.update", 4, SagaEventType.COMMAND_DO_ACTION, "inventory-service");

    (5)
    //revert/compensation topics.
    public static final PlaceOrderTopic UNDO_INITIALIZE_ORDER = new PlaceOrderTopic("order.init", -2, SagaEventType.COMMAND_UNDO_ACTION, "order-service", DO_INITIALIZE_ORDER);
    public static final PlaceOrderTopic UNDO_MAKE_PAYMENT = new PlaceOrderTopic("payment.make", -3, SagaEventType.COMMAND_UNDO_ACTION, "payment-service", DO_MAKE_PAYMENT);
    public static final PlaceOrderTopic UNDO_INVENTORY_UPDATE = new PlaceOrderTopic("inventory.update", -4, SagaEventType.COMMAND_UNDO_ACTION, "inventory-service", DO_INVENTORY_UPDATE);
}
1 Create the custom topic class by extending the AbstractTopic class.
2 Override the constructors for primary execution topic instantiation.
topicName: the name of the topic that is used in the kafka. see Topic Name Specification for more details
topicKey: the constant and unique (withing the domain) float value to represent the topic. see Topic Key Specification for more details.
sagaEventType: the type of the topic. it can be either SagaEventType.QUERY_DO_ACTION for query execution or SagaEventType.COMMAND_DO_ACTION for command execution.
targetService: the name of the target service that is responsible for executing the command. this is used for logging and debugging purposes.
3 Override the constructors for compensation execution topic instantiation.
the parameters are the same as the primary execution topic constructor with an additional parameter for the parent topic.
parent: the primary execution topic that is related to the compensation topic. this is used to indicate the relationship between the primary execution and the compensation execution. for instance, in the place-order example, the compensation topic UNDO_INITIALIZE_ORDER is related to the primary execution topic DO_INITIALIZE_ORDER.
4 Create the primary execution topics as static final fields in the custom topic class by fallowing the conventions.
5 Create the compensation execution topics as static final fields in the custom topic class by fallowing the conventions.
The custom topic class is not a spring bean at all. make sure not to annotate it with any spring annotations.

Topic Name Specification In StackSaga-Kafka

The topic name is the name of the kafka topic that is used to send the messages to the worker services. it is recommended to use a clear and descriptive name for the topic that reflects the purpose of the execution. for instance, in the place-order example, the topic name for DO_FETCH_USER_DETAILS is do.user.fetch, the topic name for DO_INITIALIZE_ORDER is do.order.init, the topic name for DO_MAKE_PAYMENT is do.payment.make, and the topic name for DO_INVENTORY_UPDATE is do.inventory.update. and also for compensation topics, it is recommended to use the same topic name as the primary execution topic to indicate that they are related. for instance, in the place-order example, the topic name for UNDO_INITIALIZE_ORDER is also undo.order.init, the topic name for UNDO_MAKE_PAYMENT is also undo.payment.make, and the topic name for UNDO_INVENTORY_UPDATE is also undo.inventory.update.

The prefix do. for the primary execution topics and undo. for the compensation topics the must accordingly to StackSaga guidelines. but you can freely mention the topic name without mentioning the prefix as well, because internally the framework will create the topics by adding the prefix as per the topic type. for instance, if you mention the topic name for DO_FETCH_USER_DETAILS as user.fetch, the framework will create the topic name as do.user.fetch internally and you are able to see that real topic in the kafka topic list.

Rules for topic naming in StackSaga-Kafka:

  1. Use . as the hierarchical segment separator in topic names (e.g., order.init, user-service.get-user). Hyphens (-) are permitted within a segment (e.g., user-service, make-payment) but must not be used as the segment separator itself. Underscores (_) are not permitted anywhere in the topic name.

Topic Key Specification In StackSaga-Kafka

The topic key is used for serialization and deserialization process. The row topic names are not passed via the header of the kafka messages, instead, the topic key is used for that purpose. therefore, the keys can not be changed once it is used in the system. therefore, it is highly recommended to use a constant and unique float value for each topic within the same domain. for instance, in the place-order example, the topic key for DO_FETCH_USER_DETAILS is 1, the topic key for DO_INITIALIZE_ORDER is 2, the topic key for DO_MAKE_PAYMENT is 3, and the topic key for DO_INVENTORY_UPDATE is 4. and also for compensation topics, it is recommended to use negative float values to differentiate them from primary execution topics. for instance, in the place-order example, the topic key for UNDO_INITIALIZE_ORDER is -2, the topic key for UNDO_MAKE_PAYMENT is -3, and the topic key for UNDO_INVENTORY_UPDATE is -4.

Decimal topic key values (e.g., 1.1, 1.2, -1.1) are reserved for the upcoming sub-execution feature, which will allow additional before/after sub-steps to be attached to a primary or compensation execution. Avoid using decimal keys in the current version to prevent conflicts with that future capability.

As per the custom topic class that is mentioned above, here is the list of topics with their keys and names that are used in the place-order example,

Execution Topic Type Mentioned Topic Name Real Topic Name

Fetch User Details

QUERY_DO_ACTION

user.fetch

saga.do.user.fetch

Initialize Order

COMMAND_DO_ACTION

order.init

saga.do.order.init

Make Payment

COMMAND_DO_ACTION

payment.make

saga.do.payment.make

Inventory Update

COMMAND_DO_ACTION

inventory.update

saga.do.inventory.update

Cancel Order

COMMAND_UNDO_ACTION

order.init

saga.undo.order.init

Refund Payment

COMMAND_UNDO_ACTION

payment.make

saga.undo.payment.make

Release Inventory

COMMAND_UNDO_ACTION

inventory.update

saga.undo.inventory.update

COMMAND_UNDO_BEFORE_ACTION and COMMAND_UNDO_AFTER_ACTION are not supported in the current version of the framework, but they will be supported in the upcoming versions. therefore, it is not recommended to use those types of topic types.

EventManager

Stacksaga-Kafka supports fully runtime dynamic execution navigation based on your conditions and the state of the transaction. So the EventManager is the component that is responsible for that. let’s create a custom EventManager for the OrderDomainEntity as below.

(2)
@SagaEventManager(
        value = "placeOrderEventManager", (3)
        listenerScope = ListenerScope.SHARED, (4)
        domainRootTopicSuffix = "place-order" (5)
)
public class PlaceOrderEventManager extends AbstractEventManager<OrderDomainEntity, PlaceOrderTopic> { (1)

    (6)
    @Override
    public Supplier<List<PlaceOrderTopic>> registerTopics() {
        return () -> List.of(
                PlaceOrderTopic.DO_FETCH_USER_DETAILS,
                PlaceOrderTopic.DO_INITIALIZE_ORDER,
                PlaceOrderTopic.DO_MAKE_PAYMENT,
                PlaceOrderTopic.DO_INVENTORY_UPDATE,
                PlaceOrderTopic.UNDO_INITIALIZE_ORDER,
                PlaceOrderTopic.UNDO_MAKE_PAYMENT,
                PlaceOrderTopic.UNDO_INVENTORY_UPDATE
        );
    }

    (7)
    @Override
    public @NonNull SagaPrimaryEventAction<PlaceOrderTopic> onNext(PlaceOrderTopic recentTopic, OrderDomainEntity currentDomainEntityState) {
        if (recentTopic.equals(PlaceOrderTopic.DO_FETCH_USER_DETAILS)) {
            currentDomainEntityState.getMetadata().put("navigated-do-initialize-order-at", LocalDateTime.now().toString());
            return SagaPrimaryEventAction.next(PlaceOrderTopic.DO_INITIALIZE_ORDER);
        }
        if (recentTopic.equals(PlaceOrderTopic.DO_INITIALIZE_ORDER)) {
            currentDomainEntityState.getMetadata().put("navigated-do-make-payment-at", LocalDateTime.now().toString());
            return SagaPrimaryEventAction.next(PlaceOrderTopic.DO_MAKE_PAYMENT);
        }
        if (recentTopic.equals(PlaceOrderTopic.DO_MAKE_PAYMENT)) {
            currentDomainEntityState.getMetadata().put("navigated-do-inventory-update-at", LocalDateTime.now().toString());
            return SagaPrimaryEventAction.next(PlaceOrderTopic.DO_INVENTORY_UPDATE);
        }
        if (recentTopic.equals(PlaceOrderTopic.DO_INVENTORY_UPDATE)) {
            return SagaPrimaryEventAction.complete();
        }
        return SagaPrimaryEventAction.error(new IllegalStateException("Unexpected topic: " + recentTopic));
    }

    (8)
    @Override
    public void onNextRevert(
            PlaceOrderTopic recentExecutedTopic,
            PlaceOrderTopic nextTopic,
            OrderDomainEntity lastDomainEntityState,
            NonRetryableExecutorException nonRetryableExecutorException,
            RevertHintStore revertHintStore,
            Supplier<NavigableMap<Integer, PlaceOrderTopic>> remainingReverts
    ) {

        (9)
        {//sample usage of revert hint store and next topic in the onNextRevert method.
            if (nextTopic.equals(PlaceOrderTopic.UNDO_MAKE_PAYMENT)) {
                revertHintStore.put("BEFORE_NOTE:UNDO_MAKE_PAYMENT", "Sample value before reverting UNDO_MAKE_PAYMENT");
            }
            if (nextTopic.equals(PlaceOrderTopic.UNDO_INITIALIZE_ORDER)) {
                revertHintStore.put("BEFORE_NOTE:UNDO_INITIALIZE_ORDER", "Sample value before reverting UNDO_INITIALIZE_ORDER");
            }
        }

        (10)
        {//sample usage of remaining reverts and recentExecutedTopic
            if (recentExecutedTopic.equals(PlaceOrderTopic.UNDO_MAKE_PAYMENT)) {
                log.info("Remaining reverts after reverting UNDO_MAKE_PAYMENT: {}", remainingReverts);
            }

            if (recentExecutedTopic.equals(PlaceOrderTopic.UNDO_INITIALIZE_ORDER)) {
                log.info("Remaining reverts after reverting UNDO_INITIALIZE_ORDER: {}", remainingReverts);
            }
        }
    }
}
1 Create a custom EventManager class by extending the AbstractEventManager class and providing the custom your custom DomainEntity and the custom created Topic class as the generic parameters.
2 Annotate the custom EventManager class with @SagaEventManager annotation. it primarily marks the class as a spring bean.
3 value: provide the name of the spring bean for the custom EventManager. it is used for identification of the EventManager by the name.
4 listenerScope: Provide the listener scope for the EventManager. it can be either ListenerScope.SHARED or ListenerScope.ISOLATED. see Reply Topic and Listener Models for more details.
5 domainRootTopicSuffix: provide the common suffix for the topics that are related to the same domain. this is used for creating the domain-root-topic (topic for receiving the response messages from the kafka clint endpoints) internally by the framework.
6 Override the registerTopics() method to register the topics that are used in the transaction. this method is invoked by the framework at the startup phase to register the topics regarding the CustomDomainEntity. you should return the list of the topics via a supplier. in this example, it has been 7 topics that are used in the place-order example. these are the real topics that sends the message to the target services.
7 Override the onNext() method to determine the next topic based on the recently completed topic and the current state of the domain entity. This method is invoked by the framework after each successful execution in the primary flow and its primary responsibility is routing — deciding which topic to trigger next. Lightweight orchestrator-side bookkeeping is also permitted here: for example, recording a navigation timestamp in the domain entity metadata (as shown above) is a safe, low-cost operation that will be included in the persisted domain entity snapshot for traceability. However, business logic, database calls, external HTTP requests, or any other I/O must not be placed here, as this method runs on the reactor event-loop thread. Return SagaPrimaryEventAction.next(topic) to advance to the next step, or SagaPrimaryEventAction.complete() when all steps have been processed successfully.
8 Override the onNextRevert() (Optional) method to perform any action at each step of the compensation flow. Call timing: this method is called after recentExecutedTopic has completed its compensation successfully and before nextTopic is dispatched to the worker. Both parameters are therefore available at the same time — you can react to what just finished and prepare context for what is about to run.
The nextTopic parameter was introduced in preparation for an upcoming sub-execution feature that will allow additional before/after sub-steps to be attached to a compensation execution. In the current version it reflects the next top-level compensation topic.
1 Sample usage of the RevertHintStore and the nextTopic parameters in the onNextRevert() method. Use nextTopic to determine which compensation topic is about to be dispatched, and use revertHintStore to store any metadata that the upcoming compensation execution will need. See the Javadoc of the class for full details about all parameters.
2 Sample usage of the remainingReverts and the recentExecutedTopic parameters in the onNextRevert() method. Use recentExecutedTopic to identify which compensation step just completed, and remainingReverts to inspect the full set of compensation steps still pending.
As a best practice, avoid performing any I/O-intensive or High CPU-intensive operations inside the event navigator. This method should be used only to evaluate conditions based on the provided parameters.
The reason is that the Stacksaga-engine internally relies on Project Reactor’s reactive pipelines, which run on non-blocking event-loop threads. Executing expensive operations here could block those threads, leading to performance bottlenecks and impacting the entire application.
Table 1. Quick Reference: Exception Behavior in onNext() and onNextRevert()
Thrown From Immediate Effect Resulting State Transition

onNext() — any exception (including SagaPrimaryEventAction.error(…​))

Compensation begins

IN_PROGRESSFAILEDCOMPENSATING

onNextRevert() — any exception

Compensation terminates

COMPENSATING → Compensation Failed

Any exception thrown from onNext() — including via SagaPrimaryEventAction.error(…​) — is treated by the framework as a non-retryable failure and immediately transitions the saga to FAILED, triggering the compensation sequence. This means onNext() can be used intentionally to start compensation based on orchestrator-side conditions: for instance, if a business rule evaluated after receiving a worker reply determines the saga should not proceed further.
Similarly, any exception thrown from onNextRevert() causes the compensation sequence to terminate immediately and the transaction is marked as compensation-failed. See the Exception Handling Reference for the full breakdown across all methods.

Reply Topic and Listener Models

Per-Domain Reply Topic

In stacksaga-kafka-orchestrator,it creates dedicated topic for each EventManager of the DomainEntity to receive the response messages from the kafka worker endpoints. The name of the topic is created by concatenating the provided domainRootTopicSuffix with the prefix saga.internal.{serviceName}.{domainRootTopicSuffix} for instance, if the service name is order-service and the domainRootTopicSuffix is place-order, the topic name will be saga.internal.order-service.place-order.

the domainRootTopicSuffix is configured in the @SagaEventManager annotation of the EventManager as below.

@SagaEventManager(
        domainRootTopicSuffix = "place-order"
)

Listener Models

Even though it creates dedicated topic for each EventManager, there are two different listener models that are used to consume the messages from those topics based on the listenerScope configuration of the EventManager which is mentioned in the @SagaEventManager annotation. those two listener models are,

Let’s dive into details of each listener model.

SHARED

If the @SagaEventManager is configured with listenerScope = ListenerScope.SHARED, the framework bind the relevant response-topic of that EventManager to a shared-listener.
If you have multiple EventManagers with listenerScope = ListenerScope.SHARED all the topics that are related to those EventManagers will be bind to the same shared-listener.

Here is the architecture for the shared listener model in stacksaga-kafka-orchestrator.

Shared kafka receiver in stacksaga kafka orchestrator
  • The consumer group is configured using the following naming convention: {serviceName}-os where os stands for Orchestration Service.
    For example, if the service name is order-service, the consumer group for the shared root topic will be: order-service-os

  • The shared listener/receiver is configured with: auto.offset.reset=earliest This ensures that previously published messages can be consumed after a service restart when no committed offset is available.

  • Message consumption is configured using At-Least-Once delivery semantics to ensure that messages are not lost, even in the event of service downtime or unexpected failures.

  • To guarantee At-Least-Once delivery semantics, each message is acknowledged only after it has been successfully processed.

  • Acknowledged offsets are committed periodically using the framework’s default time-based commit configuration. Therefore, if a failure occurs after a message has been processed and acknowledged but before the offset has been committed, the message may be delivered again and reprocessed after recovery.

  • Processing mode: The shared listener can be configured via stacksaga.kafka.orchestrator.shared-response-listener.processing-mode property which determines how messages are processed. it can be configured for either SharedListenerProcessingMode.CONCURRENT or SharedListenerProcessingMode.PARTITION_ORDERED.
    In CONCURRENT mode, messages are processed concurrently without preserving the order of messages from the same partition.
    In PARTITION_ORDERED mode, messages are processed sequentially per partition while allowing concurrent execution across multiple partitions.
    The default processing mode for the shared listener is PARTITION_ORDERED to ensure a balance between throughput and consistency while preserving the order of messages within each partition. See more relevant stacksaga-kafka-implementation/orchestrator/orchestrator-usage.adoc#configuration_properties_for_stacksaga_kafka_orchestrator

ISOLATED

If the @SagaEventManager is configured with listenerScope = ListenerScope.ISOLATED, the framework will create a dedicated listener for the relevant response-topic of that EventManager. therefore, if you have multiple EventManagers with listenerScope = ListenerScope.ISOLATED, each EventManager will have its own dedicated listener and its own dedicated topic to receive the response messages from the kafka worker endpoints. this approach provides better isolation and can help to avoid any potential interference between different EventManagers, but it can also increase the resource consumption due to having multiple listeners.

Here is the architecture for the isolated-listener model in stacksaga-kafka-orchestrator.

Isolated kafka receiver in stacksaga kafka orchestrator
  • The name of the domain-root topic is created by concatenating the provided domainRootTopicSuffix with the prefix saga.internal.isolated.{serviceName}.{domainRootTopicSuffix} For instance, if the service name is order-service and the domainRootTopicSuffix is place-order, the domain-root topic name will be saga.internal.isolated.order-service.place-order.

  • the consumer group for the domain-root topic is set as the same way as the shared root topic by following the convention {serviceName}-os. therefore, in the previous example, the consumer group for the domain-root topic will be order-service-os. the other configurations for the domain-root topic are the same as the shared root topic.

    The consumer group name is intentionally kept identical to the shared listener ({serviceName}-os) so that switching an EventManager from ListenerScope.SHARED to ListenerScope.ISOLATED (or back) at any time does not reset offset tracking. Without this, changing the listener scope would cause the new consumer group to start from earliest, potentially re-processing a large backlog of already-completed reply messages.
  • Same as the shared, the domain-root topics are also configured with auto.offset.reset=earliest to ensure that previously published messages can be consumed after a service restart when no committed offset is available.

  • Same as the shared root topic, the domain-root topics are also configured with At-Least-Once delivery semantics, and the offsets are acknowledged and committed in the same way as the shared root topic.

  • Same as the shared root topic, each message is acknowledged only after it has been successfully processed, and if a failure occurs after a message has been processed and acknowledged but before the offset has been committed, the message may be delivered again and reprocessed after recovery.

  • Same as the shared root topic, Acknowledged offsets are committed periodically using the framework’s default time-based commit configuration. Therefore, if a failure occurs after a message has been processed and acknowledged but before the offset has been committed, the message may be delivered again and reprocessed after recovery.

  • Processing mode: The isolated listener can be configured via the @SagaEventManager(processingMode=?) of each EventManager of the domain by providing the processingMode parameter which determines how messages are processed in the isolated listener. it can be configured for either for fallowing processing modes,

    • IsolatedListenerProcessingMode.CONCURRENT: In this mode, messages are processed concurrently without preserving the order of messages from the same partition.

If the isolated listener is configured with CONCURRENT processing mode, the concurrency can be customized via @SagaEventManager(isolatedListenerConcurrency=?). read the class Javadoc for more details.
  • IsolatedListenerProcessingMode.PARTITION_ORDERED: In this mode, messages are processed sequentially per partition while allowing concurrent execution across multiple partitions.
    The default processing mode for the isolated listener is PARTITION_ORDERED to ensure a balance between throughput and consistency while preserving the order of messages within each partition.

  • IsolatedListenerProcessingMode.SEQUENTIAL: In this mode, messages are processed sequentially without any concurrency. this mode is used when the order of processing is critical and must be preserved across all partitions.
    The SEQUENTIAL processing mode is not recommended for high-throughput scenarios due to its sequential nature, which can become a bottleneck. it is recommended to use this mode only when the strict ordering of message processing is a hard requirement and the expected message volume is low to moderate.