stacksaga-kafka-worker — Endpoints

Stacksaga-kafka-worker is the module that provides the functionalities for the worker services to execute the commands that are sent by the orchestrator via the regular topics. it is responsible for consuming the command messages from the regular topics, executing the business logic, and sending the response messages back to the orchestrator via the response topics (shared root topic or domain-root topic based on the listener scope configuration).

In high-level, The following duties are done by the stacksaga-kafka-worker module.

  1. Provides the specifications for creating the kafka saga endpoints according to the Stacksaga kafka engine in the target client service.

  2. Making the response back to the orchestrator service after each invocation.

  3. Providing immediate retrying facilities in case of retryable exceptions.

StackSaga Kafka Endpoints

Stacksaga Kafka Endpoints stands for creating topics accordingly Stacksaga engine’s form. the endpoint As per the architecture it can be created 2 types of endpoints as fallows,

Exception Handling Reference

Choosing the correct exception type inside an endpoint method is one of the most critical implementation decisions in StackSaga-Kafka. The wrong choice can either leave a saga stalled indefinitely, trigger unnecessary compensation, or discard valuable error context that the compensation process needs.

Exception Types at a Glance

Exception Available In Framework Response Use When

NonRetryableExecutorException

doProcess(), undoProcess()

In doProcess(): transitions the saga to FAILED and triggers the compensation sequence.
In undoProcess(): terminates the compensation sequence immediately.

The failure is permanent — a business rule violation, invalid input, or any condition that retrying will never resolve. Use put() to attach error metadata for the compensation process.

RetryableExecutorException

doProcess(), undoProcess()

Schedules the saga for a deferred retry via the ring coordinator. The step is re-attempted from the last persisted event store state.

The failure is transient — a network timeout, temporary resource unavailability, or an infrastructure blip expected to resolve on its own.

JustRetryableExecutorException

doProcess(), undoProcess() (non-reactive only)

Triggers an immediate in-process retry via Spring RetryTemplate without notifying the orchestrator. When the immediate retry limit is exhausted, fall back to RetryableExecutorException to schedule a deferred retry.

A fast local retry is worth attempting before escalating to a scheduled retry. Always use this as the first line of defence for transient failures in non-reactive endpoints.

Any unhandled exception

doProcess(), undoProcess(), onNext(), onNextRevert()

Treated identically to NonRetryableExecutorException by the framework.

Never intentional in endpoints — see the WARNING below. In onNext(), throwing intentionally is a supported pattern to force compensation — see below.

Never let an unhandled exception escape from an endpoint accidentally. Any Throwable not wrapped in a framework exception type is treated as non-retryable. The compensation sequence starts (or terminates if in undoProcess()) with no error context, because unhandled exceptions carry no put() key-value pairs for the compensation process to use. Always catch your exceptions and wrap them with the correct type.

Exception Decision Flow

When implementing an endpoint method, use the following logic to decide which exception to throw:

  1. Is the failure transient? (timeout, temporary resource unavailability, infrastructure blip)

    1. YES — non-reactive endpoint? → Throw JustRetryableExecutorException first. If the immediate retry limit is exhausted → throw RetryableExecutorException.

    2. YES — reactive endpoint? → Use retryWhen() in the reactive pipeline. If retries are exhausted → return Mono.error(RetryableExecutorException.of(…​)).

  2. Is it a permanent business failure? (invalid data, violated rule, unrecoverable state) → Throw NonRetryableExecutorException. Use put() to attach error context for the compensation steps.

  3. Neither of the above? → Wrap in NonRetryableExecutorException with a descriptive message. Never let it propagate as an unhandled exception.

Throwing from onNext() and onNextRevert()

Exception handling is not limited to endpoint methods. The onNext() and onNextRevert() methods in the EventManager also participate in the saga failure model:

  • onNext() — Any exception thrown here (including via SagaPrimaryEventAction.error(…​)) is treated as a non-retryable failure. The framework transitions the saga to FAILED and begins the compensation sequence immediately. This means you can use onNext() intentionally to force compensation based on orchestrator-side business conditions — for instance, if a rule evaluated after a worker’s reply determines the saga must not continue.

  • onNextRevert() — Any exception thrown here causes the compensation sequence to terminate immediately. The transaction is marked as compensation-failed and will not be retried.

Thrown From Immediate Effect Resulting State Transition

doProcess()NonRetryableExecutorException or unhandled exception

Compensation begins

IN_PROGRESSFAILEDCOMPENSATING

onNext() — any exception

Compensation begins

IN_PROGRESSFAILEDCOMPENSATING

undoProcess() — unhandled exception

Compensation terminates

COMPENSATING → Compensation Failed

onNextRevert() — any exception

Compensation terminates

COMPENSATING → Compensation Failed

Query Endpoint

If an execution(atomic execution) doesn’t make any state change due to executing that execution those kinds of executions are executed in Query Endpoints. due to it doesn’t make any changes in the database state, it has no any compensation action to undo.
these are the topics that define in the in stacksaga-kafka-implementation/orchestrator/topics-event-manager.adoc#custom_topic_implementation with type of SagaEventType.QUERY_DO_ACTION

For instance, in the place-order example, the execution of fetching user’s details and validate is a query execution because it doesn’t make any state change in the database, it just fetches the data and returns it. here is the custom endpoint for that execution.

Query endpoints can be created in two different ways in StackSaga.

Non-Reactive (Imperative) Query Endpoint

Non-reactive query endpoints are the traditional and blocking way of implementing the query endpoints. in this approach, the method that is responsible for executing the query is implemented in a blocking way. for instance, in the place-order example, the query endpoint for fetching user’s details can be implemented as follows.

(2)
@SagaEndpoint(
        //the real topic name will be saga.do.user-service.validate-user
        topicNameSuffix = "user-service.get-user", (3)
        listenerScope = ListenerScope.SHARED (4)
)
public class UserValidateEndpoint extends QueryEndpoint { (1)

    (5)
    @Override
    public void doProcess(ConsumerRecord<String, SagaPayload> consumerRecord)
            throws JustRetryableExecutorException, RetryableExecutorException, NonRetryableExecutorException {

        log.info("Message Key (Transaction Id): {}", consumerRecord.key()); (6)
        log.info("IdempotencyKey: {}", consumerRecord.value().getIdempotencyKey()); (7)

        consumerRecord.value().getCurrentDomainEntityStateForUpdate().ifPresent(currentDomainEntityState -> { (8)
            log.info("Received payload for before user validation: {}", currentDomainEntityState);
            try {
                {
                    //user validation logic (9)
                    String username = currentDomainEntityState.get("username").asText();
                    log.info("Validating user with username: {}", username);
                }
                {(10)
                    // put the result into the domain entity state for the next topic to use.
                    currentDomainEntityState.put("is_user_validated", true);
                    currentDomainEntityState.put("validation_note", "user validated at " + LocalDateTime.now());
                }
            } catch (SomeRetryableException e) {
                (11)
                throw RetryableExecutorException.of(e);
            } catch (SomeNonRetryableException e) {
                (12)
                throw NonRetryableExecutorException
                        .buildWith(e)
                        .put("error_code", "USER_VALIDATION_FAILED")
                        .put("key-1", "value-1")
                        .put("key-2", "value-2")
                        .build();
            }
        });
    }
}
The example has been simplified to focus on the main points. make sure refer each and every concept though the documentation to have a better understanding of the framework and its capabilities.
1 Create a custom query endpoint class by extending the QueryEndpoint class.
2 Annotate the custom query endpoint class with @SagaEndpoint annotation. it primarily marks the class as a spring bean.
3 topicNameSuffix: provide the suffix of the topic name for this endpoint. the framework will create the real topic name by concatenating the provided suffix as saga.do.user-service.get-user. the prefix saga.do is added by the framework to indicate that this topic is used for the primary execution. this is the topic the EventManager will send the command messages to trigger the execution of this endpoint.
4 listenerScope: Provide the listener scope for the endpoint. it can be either ListenerScope.SHARED or ListenerScope.ISOLATED. see stacksaga-kafka-implementation/worker/worker-configuration.adoc#topic_model_stacksaga_kafka_worker
5 Override the doProcess() method to implement the business logic for executing the query. this method is invoked by the framework when a message is received in the respective topic. you can use the provided ConsumerRecord to access the message key, payload, and other metadata withing the method scope.
6 You can access the message key from the ConsumerRecord in string format. the key is the transaction id from the orchestrator that is sent along with the message.
For your understanding,The reason is for setting the transaction id as the message key is to ensure that all messages related to the same transaction are sent to the same partition in Kafka, which allows for ordered processing of messages within a transaction.
7 You can access the idempotency key from the payload of the ConsumerRecord. the idempotency key is generated by the framework in the orchestrator side and sent along with the message to ensure that even if the same message is processed multiple times due to retries or duplicates, it will have the same idempotency key, allowing you to implement idempotent processing logic in your endpoint.
8 You can access the current state of the domain entity from the payload of the ConsumerRecord by using the getCurrentDomainEntityStateForUpdate() method. this method returns an optional value that contains the current state of the domain entity if it is present. in primary execution, the domain entity state is always present because the state is sent from the orchestrator. the Optional provide the safe access.
In the target service side, the domain entity state can be got as the Java Object. because the original CustomDomainEntity is in the orchestrator side. in the worker side you can get the domain entity state as a ObjectNode which is a JSON tree structure that is provided by the Jackson library. you can use the methods of the ObjectNode class to access and manipulate the domain entity state. for instance, you can use the get("fieldName") method to access a specific field in the domain entity state, and you can use the put("fieldName", value) method to add or update a field in the domain entity state.
9 Implement the business logic for executing the query. you can use the current state of the domain entity to perform the necessary operations and get the required data. for instance, you can call the internal service class and get the user data from the database based on the username that is provided in the domain entity state.
10 After getting the required data and performing the necessary operations, you can put the result into the domain entity state for the next topic to use. for instance, you can put a field is_user_validated with value true to indicate that the user has been validated successfully, and also you can put a field validation_note with some note about the validation process.
the adding field to the domain entity state in the worker side should have been defined in the CustomDomainEntity class in the orchestrator side because the domain entity state is shared between the orchestrator and the worker via the kafka messages, so the structure of the domain entity state should be defined in a way that both sides can understand. therefore, it is recommended to define all the possible fields that can be added to the domain entity state in the CustomDomainEntity class in the orchestrator side.
If you try to add a field in the worker side that is not defined in the CustomDomainEntity class in the orchestrator side, it will be added to the missingProperties map in the domain entity state, and you can access it by using the getMissingProperties() or getMissingProperty(String key) in the orchestrator side. it helps to avoid the serialization and deserialization issues that can be caused by adding undefined fields in the worker side, but it is recommended to define all the possible fields in the CustomDomainEntity class in the orchestrator side to have a clear contract between the orchestrator and the worker.
Even if the undefined fields are added to the missingProperties map, it can be accessed in the same way by another next worker that is executed after the current worker.
As mentioned in the architecture, all the modifications that made on the domain entity state within the method are not applied on the state if any exception (RetryableExecutorException or NonRetryableExecutorException) is thrown from the method.
the reason is that the original domain entity state (Entered State) is sent from the orchestrator to the worker should be as it is for the re-invocation in case of retrying. on other hand, if it is thrown NonRetryableExecutorException, the updated values are not valid due to the compensation process is started and the state will be reverted to the last successful state. if you want to bring some error related data you can use the put() method of the NonRetryableExecutorException to add that data in a key-value format, and then you can access it in the orchestrator side during the compensation processing.
11 This is the most important part in the implementation of the endpoint. you must handle the exceptions properly by throwing them with the appropriate type. if the exception is retryable, you should throw it by wrapping with RetryableExecutorException. if the exception is non-retryable, you should throw it by wrapping with NonRetryableExecutorException.
IF the exception is retryable one, the full exception trace message is not sent to the orchestrator, instead, only the exception message is sent to the orchestrator to reduce the redundant information that is sent to the orchestrator because the retryable exceptions are automatically retried by the framework and there is nothing to monitor or debug.
By throwing the RetryableExecutorException, stacksaga support asynchronous retrying based on scheduling. but it is recommended to implement immediate retrying by using the JustRetryableExecutorException before throwing the RetryableExecutorException to have a better user experience and reduce the overall processing time of the transaction. because the JustRetryableExecutorException will trigger an immediate retry without waiting for the scheduled retry, and if the immediate retry is successful, it will avoid the delay that is caused by scheduling, and also it will avoid sending redundant messages to the orchestrator for each retry attempt. in case the immediate retry is failed, you can throw the RetryableExecutorException to trigger the scheduled retry as a fallback mechanism. see here how the JustRetryableExecutorException can be used for immediate retrying in the endpoint implementation.
12 when throwing NonRetryableExecutorException, you can also provide additional information by using the put() method to add key-value pairs to the exception. this additional information can be useful for future compensation processing. because in the compensation processing, you can access the exception and get the additional information that you provided in the exception to perform the necessary compensation logic. for instance, if you put an error_code with value USER_VALIDATION_FAILED in the exception, you can access it in the compensation processing and check if the error code is USER_VALIDATION_FAILED to perform specific compensation logic for that error code.
Make sure not to manage the exception related metadata withing your exception because the exception object is not serialized and sent to the orchestrator. therefore, if you want to send any additional information related to the exception to the orchestrator for future compensation processing, you should use the put() method of the NonRetryableExecutorException to add that information in a key-value format. this way, the additional information will be serialized and sent to the orchestrator along with the message, and you can access it in the orchestrator side during the compensation processing. (only the exception trace is sent to the orchestrator as a string)

Reactive (Non-Blocking) Query Endpoint

@Slf4j
@SagaEndpoint(
        //the real topic name will be saga.do.user-service.validate-user
        topicNameSuffix = "user-service.get-user",
        listenerScope = ListenerScope.SHARED
)
public class ReactiveUserValidateEndpoint extends ReactiveQueryEndpoint {(1)

    (2)
    @Override
    public Mono<Void> doProcess(ConsumerRecord<String, SagaPayload> consumerRecord) {
        log.info("Message Key (Transaction Id): {}", consumerRecord.key());
        log.info("IdempotencyKey: {}", consumerRecord.value().getIdempotencyKey());
        final ObjectNode currentDomainEntityState = consumerRecord
                .value()
                .getCurrentDomainEntityStateForUpdate()
                .orElseThrow();
        log.info("Received payload for before user validation: {}", currentDomainEntityState);
        String username = currentDomainEntityState.get("username").asText();
        return this.internalUserService
                .getUserDetails(username)
                .flatMap(userDetails -> {
                    {
                        //user validation logic
                    }
                    {
                        // put the result into the domain entity state for the next topic to use.
                        currentDomainEntityState.put("is_user_validated", true);
                        currentDomainEntityState.put("validation_note", "user validated at " + LocalDateTime.now());
                    }
                    return Mono.empty();
                })
                .onErrorResume(SomeRetryableException.class, e -> {
                    return Mono.error(RetryableExecutorException.of(e));
                })
                .onErrorResume(SomeNonRetryableException.class, e -> {
                    return Mono.error(NonRetryableExecutorException
                            .buildWith(e)
                            .put("error_code", "USER_VALIDATION_FAILED")
                            .build());
                });
    }
}
If you are not familiar with how the endpoints are working in the Stacksaga Kafka Orchestrator, it is recommended to refer to the Non-Reactive (Imperative) Query Endpoint before going through the reactive query endpoint because all the concepts and the main points have been explained there in a more detailed way.

Most of the points are the same as the Non-Reactive (Imperative) Query Endpoint, the main difference is that in the reactive query endpoint, you need to return a Mono<Void> from the doProcess() method, and you can use the reactive programming paradigm to implement the business logic for executing the query. you can use the Mono and Flux types from Project Reactor to handle asynchronous and non-blocking operations in your endpoint implementation. for instance, in the example above, the internalUserService.getUserDetails(username) method returns a Mono<UserDetails> which is a reactive type that represents a single asynchronous value. you can use the map() operator to transform the result and perform the necessary operations, and you can use the onErrorResume() operator to handle exceptions and return the appropriate error types.

Command Endpoint

If an atomic execution makes state change in the database those kinds of executions are executed in Command Endpoints. due to it makes changes in the database, it has compensation action to undo.
stacksaga-kafka-implementation/orchestrator/topics-event-manager.adoc#custom_topic_implementation which creates as the type of SagaEventType.COMMAND_DO_ACTION and SagaEventType.COMMAND_UNDO_ACTION are configured in the command endpoints.

For instance, in the place-order example, the execution of Make Payment execution is a command execution because it makes state change in the payment service’s database by creating a new payment record and updating the user’s balance. in case the entire process should be compensated after successfully executing the Make Payment execution, the compensation action will be triggered to undo the payment.

In the Command endpoints, there are two methods that you need to implement to handle the primary execution and the compensation execution. the doProcess() method is responsible for executing the primary execution and the undoProcess() method is responsible for executing the compensation logic to undo the changes that are made by the primary execution in case of any failure in the next steps after the primary execution. therefore, you need to implement both methods to have a complete command endpoint implementation.

  • in the doProcess() method, it can be thrown RetryableExecutorException and NonRetryableExecutorException to indicate the failure of the primary execution to the framework. if RetryableExecutorException is thrown, the framework will schedule the transaction for retrying based on the retry configurations. if NonRetryableExecutorException is thrown, the framework will trigger the compensation process to undo the transaction.

  • If it is thrown NonRetryableExecutorException from the doProcess() method, the modified values withing the method is not applied to the domain entity state because the compensation process is started and the state will be reverted to the last successful state. see more in details in the example.

Command endpoints can be created in two different ways in StackSaga.

Non-Reactive (Imperative) Command Endpoint

Non-reactive command endpoints are the traditional and blocking way of implementing the command endpoints. in this approach, the method that is responsible for executing the command is implemented in a blocking way.

@Slf4j
@SagaEndpoint(
        topicNameSuffix = "payment.service.make-payment",
        listenerScope = ListenerScope.SHARED
)
class MakePaymentEndpoint extends CommandEndpoint { (1)

    (2)
    @Override
    public void doProcess(ConsumerRecord<String, SagaPayload> consumerRecord)
            throws JustRetryableExecutorException, RetryableExecutorException, NonRetryableExecutorException {
        //the theory is the same as the query endpoint.
    }

    (2)
    @Override
    public void undoProcess(ConsumerRecord<String, SagaPayload> consumerRecord) throws JustRetryableExecutorException, RetryableExecutorException {
        log.info("Message Key (Transaction Id): {}", consumerRecord.key());
        log.info("IdempotencyKey: {}", consumerRecord.value().getIdempotencyKey());
        (3)
        final JsonNode lastDomainEntityState = consumerRecord.value().getDomainEntityState();
        log.info("Last domain entity state {}", lastDomainEntityState);

        try {
            {//undo payment logic
                final double totalAmount = lastDomainEntityState.get("total_amount").asDouble();
                final String paymentRef = lastDomainEntityState.get("payment_reference_id").asText();
                this.internalPaymentSerivce.refundPayment(paymentRef, totalAmount);
                //update some hints for the next revert execution if needed.
                consumerRecord.value().getHintStore().ifPresent(hintStore -> {(4)
                    hintStore.put("last_refund_time", LocalDateTime.now().toString());
                });
            }
        } catch (SomeRetryableException e) {(5)
            throw RetryableExecutorException.of(e);
        }
    }
}
The doProcess() method implementation is not provided in the example because the main focus is on the undoProcess() method to explain the compensation processing in the command endpoints. but the theory for implementing the doProcess() method is the same as the query endpoint that is explained in the previous section. read the Non-Reactive (Imperative) Query Endpoint to understand how to implement the doProcess() method.
1 Create a custom command endpoint class by extending the CommandEndpoint class.
2 Override the doProcess() method to implement the business logic for executing the command. this is exactly the same as the query endpoint, you can refer Non-Reactive (Imperative) Query Endpoint example to get the details about how to implement the doProcess() method.
3 Override the undoProcess() method to implement the business logic for undoing the command. this method is invoked by the framework when a compensation action is triggered for the respective command execution. you can use the provided ConsumerRecord to access the message key, payload, and other metadata within the method scope.
the payload of the ConsumerRecord in the undoProcess() method contains the last state of the domain entity before the execution of the command that is being compensated. you can access this state by using the getDomainEntityState() method of the payload. this allows you to have the necessary information to perform the compensation logic based on the last known state of the domain entity. getDomainEntityState() method returns a JsonNode which is a JSON tree structure that is provided by the Jackson library. you can use the methods of the JsonNode class to access the fields in the last domain entity state. for instance, you can use the get("fieldName") method to access a specific field in the last domain entity state. but you can not modify the last domain entity state because it is immutable in the compensation processing.
4 If you want to send some hints or additional information from the current compensation execution to the next compensation execution in the next step, you can use the getHintStore() method of the payload to access the hint store and put the necessary information in it. the hint store is a key-value store that allows you to share information between different compensation executions. for instance, if you want to put the last refund time as a hint for the next compensation execution, you can use the put() method of the hint store to add that information with a specific key as shown in the example.
The updated or added hints in the hint store is applied only if there is no any error that is thrown from the undoProcess() method. if RetryableExecutorException is thrown from the undoProcess() method, the compensation process will be retried based on the retry configurations, and the hints that are added or updated in the current execution will not be applied in the next retry execution because it is expected to have the same conditions for the retry executions until it is succeeded. Note that JustRetryableExecutorException is a subclass of RetryableExecutorException and the same rule applies — hints are not applied when it is thrown either. therefore, make sure to manage the hints properly in the compensation processing to ensure that they are applied correctly in the next compensation execution. if any unhandled exception is thrown from the undoProcess() method, the compensation process will be stopped and the transaction will be terminated and the hints that are added or updated in the current execution will not be applied because there will be no next compensation execution due to the termination of the transaction.
5 Identify the retryable exceptions that can be thrown from the compensation logic and make sure to throw them by wrapping with RetryableExecutorException to enable the retrying mechanism for the compensation process. if any unhandled exception is thrown from the undoProcess() method, the compensation process will be stopped and the transaction will be terminated. therefore, it is important to handle the exceptions properly in the compensation logic to ensure that the compensation process can retry when necessary and to avoid unexpected termination of the transaction. in undoProcess() only can throw JustRetryableExecutorException and RetryableExecutorException.
The maximum number of retry attempts for compensation executions, retry window durations, boundary guard thresholds, and the behavior for permanently stuck or exhausted compensation transactions (dead-letter handling) are all configured through stacksaga-database-support. See the stacksaga-database-support documentation for full details on these configurations.

Immediate Retry with JustRetryableExecutorException

Immediate retry with JustRetryableExecutorException only support for the non-reactive endpoints because the reactive endpoints already support immediate retry by using the reactive programming paradigm without the need for a specific exception type. in the non-reactive endpoints, you can throw JustRetryableExecutorException to trigger an immediate retry without waiting for the scheduled retry. if the immediate retry is successful, it will avoid the delay that is caused by scheduling, and also it will avoid sending redundant messages to the orchestrator for each retry attempt. in case the immediate retry is failed, you can throw the RetryableExecutorException to trigger the scheduled retry as a fallback mechanism. in the non-reactive endpoints, the retry is handled behind the scenes by the framework by using spring retry-template. so you are able to use all the features that are provided by the spring retry-template for retrying in the non-reactive endpoints. Here is an example of how the JustRetryableExecutorException can be used for immediate retrying in the non-reactive command endpoint implementation.

Immediate retry with RetryTemplate is supported for doProcess() method in both non-reactive command and query endpoints and also for undoProcess() method in the non-reactive command endpoints.
@Slf4j
@SagaEndpoint(
        topicNameSuffix = "payment.service.make-payment",
        listenerScope = ListenerScope.SHARED
)
public class MakePaymentEndpoint extends CommandEndpoint {

    @Override
    public void doProcess(ConsumerRecord<String, SagaPayload> consumerRecord)
            throws JustRetryableExecutorException, RetryableExecutorException, NonRetryableExecutorException {
        consumerRecord
                .value()
                .getCurrentDomainEntityStateForUpdate()
                .ifPresent(currentDomainEntityState -> {
                    final String paymentReferenceId = currentDomainEntityState.get("payment_reference_id").asText();
                    final double totalAmount = currentDomainEntityState.get("total_amount").asDouble();
                    try {
                        this.internalPaymentService.makePayment(paymentReferenceId, totalAmount);
                    } catch (SomeRetryableException e) {
                        RetryContext retryContext = consumerRecord.value().getRetryContext().orElseThrow();
                        if (retryContext.getRetryCount() >= 3) {
                            //if the retry count has reached the threshold, we can throw a NonRetryableExecutorException to stop further retries.
                            throw NonRetryableExecutorException
                                    .buildWith(e)
                                    .put("error_code", "PAYMENT_FAILED_AFTER_RETRIES")
                                    .build();
                        } else {
                            throw JustRetryableExecutorException.of("Payment failed, retrying... attempt " + (retryContext.getRetryCount() + 1));
                        }
                    } catch (SomeNonRetryableException e) {
                        throw NonRetryableExecutorException
                                .buildWith(e)
                                .put("error_code", "PAYMENT_FAILED")
                                .build();
                    }
                });
    }

    @Override
    public void undoProcess(ConsumerRecord<String, SagaPayload> consumerRecord) throws
            JustRetryableExecutorException, RetryableExecutorException {
    }
}

This example shows how to implement immediate retrying in the MakePaymentEndpoint command endpoint by using the JustRetryableExecutorException. in this example, if the makePayment() method throws a retryable exception, it checks the retry count from the RetryContext that is provided in the payload of the ConsumerRecord. although it has been wrapped with Optional you can safely access it because the retry context is always present if the executor is a non-reactive one. if the retry count has reached the threshold (in this example, it is 3), it throws a NonRetryableExecutorException to stop further immediate retries and indicate to the framework to schedule the retry based on the retry configurations. if the retry count has not reached the threshold, it throws a JustRetryableExecutorException with a message indicating to the framework to invoke an immediate retry. if the immediate retry is successful, it will avoid the delay that is caused by scheduling, and also it will avoid sending redundant messages to the orchestrator for each retry attempt.

Configure Custom AbstractSagaEndpointSchedulerProvider for Non-Reactive Endpoints

Due to the fact that StackSaga works in non-blocking and reactive under the hood, If the endpoint is one of non-reactive, it should be configured the Scheduler for executing the blocking code in a non-blocking way. the framework provides a default shared Scheduler for both primary execution and revert execution of the non-reactive endpoints, but you can configure a custom Scheduler for your endpoints by creating a custom AbstractSagaEndpointSchedulerProvider and configuring it in the endpoint. because sometimes the default shared Scheduler may not be suitable for your use case, for instance, if you want to have a dedicated Scheduler for each endpoint or if you want to configure the Scheduler with specific configurations.

The default shared Scheduler that is provided by the framework is a boundedElastic Scheduler with the fallowing configurations:

  1. Primary Execution Scheduler

    • threadCap : Runtime.getRuntime().availableProcessors() * 10

    • queuedTaskCap: 100_000

    • ttlSeconds: 60

    • daemon: false

  2. Revert Execution Scheduler

    • threadCap : Runtime.getRuntime().availableProcessors() * 3

    • queuedTaskCap: 100_000

    • ttlSeconds: 60

    • daemon: false

Override the Default Scheduler Configurations

If you want to override the default Scheduler configurations for the application level, you can provide the custom configurations bean to override the default configurations in your class path.

@Configuration
public class EndpointSchedulerConfig {

    @Bean
    public AbstractDefaultPrimaryExecutionSchedulerProvider customSharedPrimaryExecutionSchedulerProvider() {
        return new AbstractDefaultPrimaryExecutionSchedulerProvider() {
            @Override
            protected Scheduler scheduler() {
                log.info("lazy initialization of custom shared primary execution scheduler for non-reactive(blocking) execution");
                return Schedulers.newBoundedElastic(
                        100,
                        Integer.MAX_VALUE,
                        "custom-do-shared",
                        60,
                        false
                );
            }
        };
    }

    @Bean
    public AbstractDefaultRevertExecutionSchedulerProvider customSharedRevertExecutionSchedulerProvider() {
        return new AbstractDefaultRevertExecutionSchedulerProvider() {
            @Override
            protected Scheduler scheduler() {
                log.info("lazy initialization of custom shared revert execution scheduler for non-reactive(blocking) execution");
                return Schedulers.newBoundedElastic(
                        20,
                        Integer.MAX_VALUE,
                        "custom-undo-shared",
                        60,
                        false
                );
            }
        };
    }
}

Create a Custom AbstractSagaEndpointSchedulerProvider for an Endpoint

Here is an example of how to create a custom AbstractSagaEndpointSchedulerProvider and configure it in the endpoint.

@Component
public class MakePaymentExecutionSchedulerProvider extends AbstractSagaEndpointSchedulerProvider {
    @Override
    protected Scheduler scheduler() {
        return Schedulers.newBoundedElastic(20, Integer.MAX_VALUE, "demo-name");
    }
}


@Slf4j
@SagaEndpoint(
        listenerScope = ListenerScope.SHARED,
        primaryExecutionSchedulerProvider = MakePaymentExecutionSchedulerProvider.class,
        revertExecutionSchedulerProvider = MakePaymentExecutionSchedulerProvider.class
)
public class MakePaymentEndpoint extends CommandEndpoint {
    //...
}
the scheduler() method is invoked only once when the application starts and the returned the Scheduler and then framework will cache it for the future use.

In this example, we created a custom MakePaymentExecutionSchedulerProvider that extends the AbstractSagaEndpointSchedulerProvider and overrides the scheduler() method to provide a custom Scheduler configuration. then we configured the custom SchedulerProvider in the ReactiveMakePaymentEndpoint for both primary execution and revert execution by using the primaryExecutionSchedulerProvider and revertExecutionSchedulerProvider attributes of the @SagaEndpoint annotation. you can configure different SchedulerProvider for primary execution and revert execution if you want to have different configurations for each execution as well.

Configure RetryTemplate for Non-Reactive Endpoints

As we mentioned above, the non-reactive endpoints use RetryTemplate for handling retries, and you can configure the RetryTemplate to customize the retry behavior for your endpoints. you can configure the RetryTemplateBuilder by defining a bean of type AbstractRetryTemplateProvider in your application context and overriding the retryTemplateBuilder() method to provide your custom configuration for the RetryTemplate. here is an example of how to configure the RetryTemplate for non-reactive endpoints.

@Component
public class MakePaymentRetryTemplateProvider extends AbstractRetryTemplateProvider {
    @Override
    protected RetryTemplateBuilder retryTemplateBuilder() {
        SimpleRetryPolicy simpleRetryPolicy = new SimpleRetryPolicy();
        simpleRetryPolicy.setMaxAttempts(3);
        return RetryTemplate.builder().customPolicy(simpleRetryPolicy);
    }
}
The MakePaymentRetryTemplateProvider class is annotated with @Component to make it a Spring bean.

After creating the custom RetryTemplateProvider, you can configure it into the endpoint as below.

@Slf4j
@SagaEndpoint(
        topicNameSuffix = "payment.service.make-payment",
        listenerScope = ListenerScope.SHARED,
        primaryExecutionRetryTemplate = MakePaymentRetryTemplateProvider.class, (1)
        revertExecutionRetryTemplate = MakePaymentRetryTemplateProvider.class (2)
)
public class MakePaymentEndpoint extends CommandEndpoint {
    //...
}

In this example, the MakePaymentRetryTemplateProvider is configured for both primary execution and revert execution by using the primaryExecutionRetryTemplate and revertExecutionRetryTemplate attributes of the @SagaEndpoint annotation. you can configure different RetryTemplateProvider for primary execution and revert execution if you want to have different retry behavior for each execution.

Reactive (Non-Blocking) Command Endpoint

If you are in a reactive programming environment and want to implement the command endpoints in a non-blocking way, you can extend the ReactiveCommandEndpoint class and implement the doProcess() and undoProcess() methods by using the reactive programming paradigm with Project Reactor’s Mono and Flux types. the theory for implementing the reactive command endpoint is the same as the non-reactive command endpoint, but you need to return a Mono<Void> from both methods and use the reactive operators to handle asynchronous operations and exceptions. for instance, you can use the map(), flatMap(), and onErrorResume() operators to implement the business logic and exception handling in a reactive way.

@Slf4j
@SagaEndpoint( (2)
        topicNameSuffix = "payment.service.make-payment",
        listenerScope = ListenerScope.SHARED
)
class ReactiveMakePaymentEndpoint extends ReactiveCommandEndpoint { (1)

    @Override
    public Mono<Void> doProcess(ConsumerRecord<String, SagaPayload> consumerRecord) {
        //the theory is the same as the query endpoint.
    }

    @Override
    public Mono<Void> undoProcess(ReceiverRecord<String, SagaPayload> consumerRecord) {
        log.info("Message Key (Transaction Id): {}", consumerRecord.key());
        log.info("IdempotencyKey: {}", consumerRecord.value().getIdempotencyKey());
        final JsonNode lastDomainEntityState = consumerRecord.value().getDomainEntityState();
        log.info("Last domain entity state {}", lastDomainEntityState);
        final double totalAmount = lastDomainEntityState.get("total_amount").asDouble();
        final String paymentRef = lastDomainEntityState.get("payment_reference_id").asText();
        this.internalPaymentSerivce
                .refundPayment(paymentRef, totalAmount)
                .flatMap(response -> {
                        //update some hints for the next revert execution if needed.
                        consumerRecord.value().getHintStore().ifPresent(hintStore -> {
                            hintStore.put("last_refund_time", LocalDateTime.now().toString());
                        });
                        return Mono.empty();
                })
                .onErrorResume(SomeRetryableException.class, e -> {(3)
                    return Mono.error(RetryableExecutorException.of(e));
                });
    }
}
doProcess() method implementation is not provided in the example because the main focus is on the undoProcess() method to explain the compensation processing in the command endpoints. but the theory for implementing the doProcess() method is the same as the query endpoint that is explained in the previous section. read the Non-Reactive (Imperative) Query Endpoint to understand how to implement the doProcess() method.
1 Create a custom reactive command endpoint class by extending the ReactiveCommandEndpoint class.
2 Annotate the custom reactive command endpoint class with @SagaEndpoint annotation. it primarily marks the class as a spring bean.
3 Identify the retryable exceptions that can be thrown from the compensation logic and make sure to handle them by using the onErrorResume() operator to return a Mono.error() with RetryableExecutorException to enable the retrying mechanism for the compensation process. if any unhandled exception is thrown from the undoProcess() method, the compensation process will be stopped and the transaction will be terminated. therefore, it is important to handle the exceptions properly in the compensation logic to ensure that the compensation process can retry when necessary and to avoid unexpected termination of the transaction.
Based on the RetryableExecutorException Stacksaga framework provide only the asynchronous retrying (scheduling retry). if you want to have the immediate retrying( in the same thread), you can manage it withing the pipeline by using the retryWhen() operator of the reactive programming. it is recommended implement the immediate retrying logic first before throwing the RetryableExecutorException to have a better control over the retrying mechanism and to avoid unnecessary scheduling of retries when the retry can be done immediately. in case the immediate retry is exhausted, you can throw the RetryableExecutorException to schedule the retry based on the retry configurations. read more about Retry to better understand the retrying mechanisms.

@SagaEndpoint Annotation

@SagaEndpoint is the main annotation that is used to mark the custom endpoint classes in Stacksaga Kafka worker applications. it is a meta-annotation that is used to create custom annotations for different types of endpoints such as query endpoints and command endpoints. the @SagaEndpoint annotation provides several attributes that can be used to configure the behavior of the endpoint, such as the topic name suffix, listener scope, and other configurations. by using this annotation, you can easily create and configure your custom endpoints in a consistent way across your application. here we are going to see all the attributes of the @SagaEndpoint annotation and how to use them to tuning the behavior of your endpoints for your specific use case.

  1. value

    • Description: This attribute is used to provide a unique name for the endpoint. it is optional and can be used for better readability and debugging purposes. if not provided, the framework will use the class name as the default value for the endpoint name. Spring bean is created with the name provided in this attribute.

  2. topicNameSuffix

    • provide the suffix of the topic name for this endpoint. the framework will create the real topic name by concatenating the provided suffix as saga.do.user-service.get-user. the prefix saga.do is added by the framework to indicate that this topic is used for the primary execution. this is the topic the EventManager will send the command messages to trigger the execution of this endpoint. therefore, it is important to provide the correct topic name suffix that matches the topic name that is used in the EventManager to ensure that the messages are sent to the correct topic and consumed by the correct endpoint.

      Table 1. Quick Reference: Topic Naming Rules
      Rule Detail

      Segment separator

      Use . to separate hierarchical segments (e.g., order.init, user-service.get-user).

      Hyphens

      Permitted within a segment (e.g., user-service, make-payment) but not as the separator itself.

      Underscores

      Not permitted anywhere in the topic name.

      Prefix auto-added

      The framework prepends saga.do. for primary topics and saga.undo. for compensation topics. So topicNameSuffix = "payment.make" produces real topic saga.do.payment.make.

      Topic key

      Each topic requires a unique float key within the domain. Positive integers (1, 2, 3…) for primary topics; matching negative values (-1, -2, -3…) for compensation topics. Decimal values (e.g., 1.1) are reserved for the upcoming sub-execution feature — avoid them in the current version.

      See full specification in Topic Name Specification and Topic Key Specification.

  3. listenerScope

    • Provide the listener scope for the endpoint. listenerScope defines how the message listeners are created and managed for the endpoint. it can be either ListenerScope.SHARED or ListenerScope.ISOLATED. if it is set to ListenerScope.SHARED, the framework binds the respective topic to the default shared message listener container. if it is set to ListenerScope.ISOLATED, the framework creates a dedicated message listener container for the respective topic. if you want to have an isolated listener for a specific endpoint, you can set the listenerScope to ListenerScope.ISOLATED for that endpoint. this allows you to have better control over the message consumption and processing for that specific endpoint without affecting other endpoints that share the same listener. listenerScope tuning is one of the most important configurations that should be highly considered. read the ListenerScope tuning section in the best practices chapter to understand how to choose the appropriate listener scope for your endpoints based on your use case and requirements.

  4. processingMode

  5. primaryExecutionConcurrency

    • Specifies the concurrency level used for primary record processing (for doProcess()) when processingMode is configured as ProcessingMode.CONCURRENT. Multiple records may be processed concurrently using the configured number of worker threads.
      The value may be provided either as a numeric literal or as a Spring property placeholder.
      For example:
      primaryExecutionConcurrency = "10"
      primaryExecutionConcurrency = "${saga.sample1.primary.concurrency}"

      When a property placeholder is used, the value is resolved from the application configuration. saga.sample1.primary.concurrency=10

      Default: Runtime.getRuntime().availableProcessors() * 2

  6. revertExecutionConcurrency

    • Specifies the concurrency level used for compensation record processing (for undoProcess()) when processingMode is configured as ProcessingMode.CONCURRENT. Multiple records may be processed concurrently using the configured number of worker threads.
      The value may be provided either as a numeric literal or as a Spring property placeholder.
      For example:
      revertExecutionConcurrency = "5"
      revertExecutionConcurrency = "${saga.sample1.revert.concurrency}"

      When a property placeholder is used, the value is resolved from the application configuration. saga.sample1.revert.concurrency=5

      Default: Runtime.getRuntime().availableProcessors()

      This is not applicable for the QueryEndpoint because it does not have compensation processing, so the revertExecutionConcurrency will be ignored if it is provided in the @SagaEndpoint annotation of a QueryEndpoint.

  7. primaryExecutionRetryTemplate

  8. revertExecutionRetryTemplate

  9. primaryExecutionSchedulerProvider

    • Only applicable with primary executions (CommandEndpoint.doProcess(ConsumerRecord), QueryEndpoint.doProcess(ConsumerRecord)). Provides a custom AbstractPrimaryExecutionSchedulerProvider for controlling the scheduler on which the primary execution processing occurs. This is important for isolating blocking operations from the Kafka consumer thread to prevent performance degradation and ensure responsiveness. By default, the framework will use a bounded elastic scheduler suitable for blocking workloads via DefaultSchedulerConfig, but you can provide your own implementation to customize pool size, naming, and other characteristics. It is not recommended to provide separate schedulers for each unless if it is necessary for your use case, as it may lead to resource contention and inefficient thread usage. In most cases, a shared scheduler for all endpoints is sufficient and more efficient. but using shared default scheduler can lead to starvation of tasks. see the Configure Custom AbstractSagaEndpointSchedulerProvider for Non-Reactive Endpoints section to see how it can be configured a custom scheduler or see the Override the Default Scheduler Configurations section to see how to override the existing default shared scheduler configurations.