Stacksaga Kafka Client

Overview

Stacksaga kafka client is a supportive library for Stacksaga Kafka engine to maintain the standard of kafka saga endpoints and validating them. due to the fact that the stacksaga kafka behave in request-response model it is not enough triggering an event to the target service. the target service also should trigger the response-event back to the orchestrator service. that means the target service acts as the publisher and as well as the subscriber. as a developer you don’t need to worry about triggering the response-event. that responsibility is taken by the Stacksaga Kafka Client, and it is done behind the scene.

The following duties are done by the Stacksaga kafka client.

  • Provides the specifications for creating the kafka saga endpoints according to the Stacksaga kafka engine in the target client service.

  • Making the response back to the orchestrator service after each invocation.

  • Providing immediate retrying facilities in case of retryable exceptions.

stacksaga-kafka-client dependency can be added in this way into your target service.

<dependency>
  <groupId>org.stacksaga</groupId>
  <artifactId>stacksaga-kafka-client</artifactId>
  <version>${org.stacksaga.version}</version>
</dependency>

As you can see here you have to add the stacksaga-kafka-client library for all target service that you want access while processing the entire transaction.

if a service acts as Orchestrator service by adding stacksag-kafka-starter, the stacksaga-kafka-client has been added by default.
stacksaga kafka engine stacksaga engine vs satacksaga client

Stacksaga Kafka Endpoints

Stacksaga Kafka Endpoints stands for creating topics accordingly Stacksaga engine’s form. the endpoint As per the architecture it can be created 3 types of endpoints as fallows,

Query Endpoint

If an execution(atomic execution) doesn’t make any state change due to executing that execution those kinds of executions are executed in Query Endpoints. due to it doesn’t make any changes in the database state, it has no any compensation action to undo.
these are the topics that define in the in SagaTopic with type of SagaTopicType.QUERY_DO_ACTION

  • Each Query Endpoint creates one topic in kafka.

    • DO_VALIDATE_USER (primary execution topic)

    • the topic name should be started with DO_

stacksaga kafka engine query endpoint

As per the above diagram,

Query Endpoints — Keep‑Ordering Mode with In‑memory retries

This example shows that how a custom Query-Endpoint is implemented in Keep‑Ordering Mode (Synchronous) and the retrying is done in In‑memory retries mode.

@Endpoint(eventName = "USER_VALIDATE")(1)
public class UserValidateEndpoint extends QueryEndpoint { (2)
    @Override
    @SagaEndpointListener(value = "DO_USER_VALIDATE", id = "DO_USER_VALIDATE") (3)
    public void doProcess(ConsumerRecord<String, SagaPayload> consumerRecord) throws RetryableException, NonRetryableException{
        try {
            //implement your execution (4)
        } catch (Exception e) {
            //throws RetryableException Or NonRetryableException based on the exception; (5)
        }
    }
    @Override (6)
    public void onDoProcessExhausted(ConsumerRecord<String, SagaPayload> consumerRecord) {
        //do something when Query execution retrying exhausted.
    }
}
1 Annotate your custom QueryEndpoint class with @Endpoint and set the eventName.
2 Extend your custom QueryEndpoint class from QueryEndpoint abstract class. then you have to override a required abstract method called doProcess(consumerRecord).
3 Annotate the doProcess(consumerRecord) with @SagaEndpointListener annotation to make the method as a kafka consumer and provide the topic name as per the stacksaga Topic name specifications.
4 Implement your real execution here (ex: database accessing etc.).
5 Catch the exception carefully. the error that you throws will determine whether if the execution should be retried or stop processing.
As you can see in the method signature, you can throw RetryableException or NonRetryableException based on the exception you have.
Classification the exception is the most important part in stacksaga. the unhandled RuntimeException`s are considered as `NonRetryableException exceptions. then the engine stops executing and sent a message to the orchestrator-service as there is a primary execution exception. and then it will start compensation process.
6 If you want to execute something when the execution is failed even after retrying(retrying-exhausted) you can override and use the onDoProcessExhausted(consumerRecord) method to overcome that as a hock.

Query Endpoints — Keep‑Ordering Mode with Topic‑based/Non-Blocking retries

This example shows that how a custom Query-Endpoint is implemented in Keep‑Ordering Mode (Synchronous) and the retrying is done in Topic‑based/Non-Blocking retries mode.

@Endpoint(eventName = "USER_VALIDATE")(1)
public class UserValidateEndpoint extends QueryEndpoint { (2)
    @Override
    @SagaEndpointListener(value = "DO_USER_VALIDATE", id= "DO_USER_VALIDATE") (3)
    @SagaRetryableEndpoint(attempts = "3", backoff = @Backoff(delay = 5000, multiplier = 0.5)) (7)
    public void doProcess(ConsumerRecord<String, SagaPayload> consumerRecord) throws RetryableException, NonRetryableException{
        try {
            //implement your execution (4)
        } catch (Exception e) {
            //throws RetryableException Or NonRetryableException based on the exception; (5)
        }
    }
    @Override (6)
    public void onDoProcessExhausted(ConsumerRecord<String, SagaPayload> consumerRecord) {
        //do something when Query execution retrying exhausted.
    }
}
1 Annotate your custom QueryEndpoint class with @Endpoint and set the eventName.
2 Extend your custom QueryEndpoint class from QueryEndpoint abstract class. then you have to override a required abstract method called doProcess(consumerRecord).
3 Annotate the doProcess(consumerRecord) with @SagaEndpointListener annotation to make the method as a kafka consumer and provide the topic name as per the stacksaga Topic name specifications.
4 Implement your real execution here (ex: database accessing etc.).
5 Catch the exception carefully. the error that you throws will determine whether if the execution should be retried or stop processing.
As you can see in the method signature, you can throw RetryableException or NonRetryableException based on the exception you have.
Classification the exception is the most important part in stacksaga. the unhandled RuntimeException(s) are considered as NonRetryableException exceptions. then the engine stops executing and sent a message to the orchestrator-service as there is a primary execution exception. and then it will start compensation process.
6 If you want to execute something when the execution is failed even after retrying(retrying-exhausted) you can override and use the onDoProcessExhausted(consumerRecord) method to overcome that as a hock.
7 @SagaRetryableEndpoint annotation has been used for enabling Topic‑based/Non-Blocking retries mode. it avoids In‑memory retries and helps to keep consumer thread non-blocking in retrying. read more…​

Query Endpoints — Async/Parallel Mode with RetryTemplate in-memory retries

This example shows that how a custom Query-Endpoint is implemented in Async/Parallel Mode (Order‑Free) and the retrying is done in RetryTemplate in-memory retries mode.

@Endpoint(eventName = "USER_VALIDATE")(1)
public class UserValidateEndpoint extends QueryEndpoint { (2)
    @Override
    @SagaEndpointListener(value = "DO_USER_VALIDATE", id = "DO_USER_VALIDATE") (3)
    public void doProcess(ConsumerRecord<String, SagaPayload> consumerRecord) {
        this.doProcessAsync(consumerRecord); (4)
    }

    @Override (5)
    protected void doProcessAsyncInAction(ConsumerRecord<String, SagaPayload> consumerRecord) throws RetryableException, NonRetryableException {
        try {
            //implement your execution (6)
        } catch (Exception e) {
            //throws RetryableException Or NonRetryableException based on the exception; (7)
        }
    }

    @Override (8)
    public void onDoProcessExhausted(ConsumerRecord<String, SagaPayload> consumerRecord) {
        //do something when Query execution retrying exhausted.
    }
}
1 Annotate your custom QueryEndpoint class with @Endpoint and set the eventName.
2 Extend your custom QueryEndpoint class from QueryEndpoint abstract class. then you have to override a required abstract method called doProcess(consumerRecord).
3 Annotate the doProcess(consumerRecord) with @SagaEndpointListener annotation to make the method as a kafka consumer and provide the topic name as per the stacksaga Topic name specifications.
4 Call the doProcessAsync(consumerRecord) method by passing the received consumerRecord. it will handed off the execution to a separate thread pool and internally configure the retry capabilities, and also it sends the response back to the orchestrator-service after executing(success or failed). it executes the doProcessAsyncInAction() to invoke your real code.
5 override the doProcessAsyncInAction() method due to that method is invoked by the doProcessAsync() internally to run your exact business logic. this is where you should write your exact business logic.
The method is called in different thread from thread pool.
6 Implement your real execution here (ex: database accessing etc.).
7 Catch the exception carefully. the error that you throws will determine whether if the execution should be retried or stop processing.
As you can see in the method signature, you can throw RetryableException or NonRetryableException based on the exception you have.

Classification the exception is the most important part in stacksaga. the unhandled RuntimeException(s) are considered as NonRetryableException exceptions. then the engine stops executing and sent a message to the orchestrator-service as there is a primary execution exception. and then it will start compensation process.
8 If you want to execute something when the execution is failed even after retrying(retrying-exhausted) you can override and use the onDoProcessExhausted(consumerRecord) method to overcome that as a hock.

Command Endpoint

if an execution(atomic execution) make some state change in the database of the respective service those kinds of executions are executed in Command Endpoints. due to the fact that it make some state changes in the database of the respective service, in case of failure, the changes should be restored by invoking compensation reaction. these are the topics that define in the in SagaTopic with type of SagaTopicType.COMMAND_DO_ACTION

  • Each Command Endpoint creates two topics in kafka for the primary execution and the compensation execution.

    • DO_MAKE_PAYMENT (primary execution topic)

      • the topic name should be started with DO_

    • UNDO_MAKE_PAYMENT (compensating execution topic)

      • the topic name should be started with UNDO_

stacksaga kafka engine command endpoint

Command-Endpoints — Keep‑Ordering Mode with In‑memory retries

This example shows that how a custom Command-Endpoint is implemented in Keep‑Ordering Mode (Synchronous) and the retrying is done in In‑memory retries mode.

@Endpoint(eventName = "MAKE_PAYMENT")(1)
public class MakePaymentEndpoint extends CommandEndpoint { (2)

    @Override
    @SagaEndpointListener(value = "DO_MAKE_PAYMENT", id = "DO_MAKE_PAYMENT") (3)
    public void doProcess(ConsumerRecord<String, SagaPayload> consumerRecord) throws RetryableException, NonRetryableException {

        try {
            String idempotencyKey = consumerRecord.value().getIdempotencyKey(); //accessing the IdempotencyKey for the respective event (4)
            log.debug("idempotencyKey for UPDATE_STOCK_LOG: {}", idempotencyKey);
            final ObjectNode aggregatorForUpdate = consumerRecord.value().getAggregatorForUpdate(); //accessing the current aggregator state (5)
            double amount = aggregatorForUpdate.get("amount").asDouble();
            if (amount == 0) {
                throw NonRetryableException.buildWith(new IllegalStateException("amount must be greater than 0")); (6)
            }
            aggregatorForUpdate.put("payment_status", "SUCCESS"); //updating the aggregator state (7)
        } catch (Exception e) { (8)
            if (retryable) {
                throw RetryableException.buildWith(e);
            } else {
                throw NonRetryableException.buildWith(e);
            }
        }
    }

    @Override (9)
    protected void onDoProcessExhausted(ConsumerRecord<String, SagaPayload> consumerRecord) {
        //do something when primary Command execution retrying exhausted.
    }


    @Override
    @SagaEndpointListener(value = "UNDO_MAKE_PAYMENT", id = "UNDO_MAKE_PAYMENT") (10)
    public void undoProcess(ConsumerRecord<String, SagaPayload> consumerRecord) throws RetryableException {
        try {
            final JsonNode aggregator = consumerRecord.value().getAggregator();//accessing the last aggregator state (11)
            String idempotencyKey = consumerRecord.value().getIdempotencyKey(); //accessing the IdempotencyKey for the respective event (12)
            log.debug("idempotencyKey for UNDO_MAKE_PAYMENT: {}", idempotencyKey);
            final double amount = aggregator.get("amount").asDouble();
            final PrimaryExecutionException primaryExecutionException = consumerRecord.value().getPrimaryExecutionException().orElseThrow(); //accessing the primary execution exception (13)
            log.debug("amount is going to be deducted from the account {} due to {}", amount, primaryExecutionException.getRealExceptionMessage());

            consumerRecord.value().getHintStore().ifPresent(historyStore -> {
                historyStore.put("payment_status_revert", "SUCCESS"); //updating the historyStore (14)
            });
        } catch (Exception e) {
            throw RetryableException.buildWith(e); (15)
        }
    }

    @Override (16)
    protected void onUndoProcessExhausted(ConsumerRecord<String, SagaPayload> consumerRecord) {
        //do something when undo(revert/compensation) execution retrying exhausted.
    }
}
1 Annotate your custom CommandEndpoint class with @Endpoint and set the eventName.
2 Extend your custom CommandEndpoint class from CommandEndpoint abstract class. then you have to override two required abstract methods called doProcess(consumerRecord) and undoProcess(consumerRecord).
3 Annotate the doProcess(consumerRecord) with @SagaEndpointListener annotation to make the method as a kafka consumer and provide the topic name as per the stacksaga Topic name specifications.
4 Accessing the IdempotencyKey for the respective event. the key is set by stacksaga engine from the orchestrator service. read more about maintaining the idempotency.
5 Accessing the current aggregator state. you can get the current aggregator state from the SagaPayload object and also update the aggregator state upon the business logic.
6 You can throw a NonRetryableException if you want to stop the transaction going forward. orchestrator service will be received an error response, and it will start compensation process.
7 updating the aggregator state
8 Catch the exception carefully. the error that you throws will determine whether if the execution should be retried or stop processing.
As you can see in the method signature, you can throw RetryableException or NonRetryableException based on the exception you have.
Classification the exception is the most important part in stacksaga. the unhandled RuntimeException(s) are considered as NonRetryableException exceptions. then the engine stops executing and sent a message to the orchestrator-service as there is a primary execution exception. and then it will start compensation process.
9 If you want to execute something when the primary-execution is failed even after retrying(retrying-exhausted) you can override and use the onDoProcessExhausted(consumerRecord) method to overcome that as a hock.
10 Annotate the undoProcess(consumerRecord) with @SagaEndpointListener annotation to make the method as a kafka consumer and provide the topic name as per the stacksaga Topic name specifications.
11 Accessing the last aggregator state(the state that was before primary-exception occurred) to retrieve the aggregator data.
12 Accessing the IdempotencyKey for the respective event. the key is set by stacksaga engine from the orchestrator service. read more about maintaining the idempotency.
13 Accessing the primary execution exception.
14 Updating the HistoryStore for setting the data on compensation process.
15 throws an exception. in the compensation process, it can not have any NonRetryableException or RuntimeException due to compensation. it can have only RetryableException. if an exception is thrown except RetryableException the transaction will be terminated by stopping compensation process.
16 If you want to execute something when the revert-execution is failed even after retrying(retrying-exhausted) you can override and use the onUndoProcessExhausted(consumerRecord) method to overcome that as a hock.

Command-Endpoints — Keep‑Ordering Mode with Topic‑based/Non-Blocking retries

This example shows that how a custom Command-Endpoint is implemented in Keep‑Ordering Mode (Synchronous) and the retrying is done in Topic‑based/Non-Blocking retries mode.

@Endpoint(eventName = "MAKE_PAYMENT")(1)
public class MakePaymentEndpoint extends CommandEndpoint { (2)

    @Override
    @SagaEndpointListener(value = "DO_MAKE_PAYMENT", id = "DO_MAKE_PAYMENT") (3)
    @SagaRetryableEndpoint(attempts = "3", backoff = @Backoff(delay = 5000, multiplier = 0.5))(15)
    public void doProcess(ConsumerRecord<String, SagaPayload> consumerRecord) throws RetryableException, NonRetryableException {

        try {
            String idempotencyKey = consumerRecord.value().getIdempotencyKey(); //accessing the IdempotencyKey for the respective event (4)
            log.debug("idempotencyKey for DO_MAKE_PAYMENT: {}", idempotencyKey);

            final ObjectNode aggregatorForUpdate = consumerRecord.value().getAggregatorForUpdate(); //accessing the current aggregator state (5)
            double amount = aggregatorForUpdate.get("amount").asDouble();
            if (amount == 0) {
                throw NonRetryableException.buildWith(new IllegalStateException("amount must be greater than 0")); (6)
            }
            aggregatorForUpdate.put("payment_status", "SUCCESS"); //updating the aggregator state (7)
        } catch (Exception e) { (8)
            if (retryable) {
                throw RetryableException.buildWith(e);
            } else {
                throw NonRetryableException.buildWith(e);
            }
        }
    }

    @Override (9)
    protected void onDoProcessExhausted(ConsumerRecord<String, SagaPayload> consumerRecord) {
        //do something when primary Command execution retrying exhausted.
    }


    @Override
    @SagaEndpointListener(value = "UNDO_MAKE_PAYMENT", id = "UNDO_MAKE_PAYMENT") (10)
    @SagaRetryableEndpoint(attempts = "3", backoff = @Backoff(delay = 5000, multiplier = 0.5))(17)
    public void undoProcess(ConsumerRecord<String, SagaPayload> consumerRecord) throws RetryableException {
        try {
            String idempotencyKey = consumerRecord.value().getIdempotencyKey(); //accessing the IdempotencyKey for the respective event (11)
            log.debug("idempotencyKey for UNDO_MAKE_PAYMENT: {}", idempotencyKey);

            final JsonNode aggregator = consumerRecord.value().getAggregator();//accessing the last aggregator state (12)

            final double amount = aggregator.get("amount").asDouble();
            final PrimaryExecutionException primaryExecutionException = consumerRecord.value().getPrimaryExecutionException().orElseThrow(); //accessing the primary execution exception (13)
            log.debug("amount is going to be deducted from the account {} due to {}", amount, primaryExecutionException.getRealExceptionMessage());

            consumerRecord.value().getHintStore().ifPresent(historyStore -> {
                historyStore.put("payment_status_revert", "SUCCESS"); //updating the historyStore (14)
            });
        } catch (Exception e) {
            throw RetryableException.buildWith(e); (15)
        }
    }

    @Override (16)
    protected void onUndoProcessExhausted(ConsumerRecord<String, SagaPayload> consumerRecord) {
        //do something when undo(revert/compensation) execution retrying exhausted.
    }
}
1 Annotate your custom CommandEndpoint class with @Endpoint and set the eventName.
2 Extend your custom CommandEndpoint class from CommandEndpoint abstract class. then you have to override two required abstract methods called doProcess(consumerRecord) and undoProcess(consumerRecord).
3 Annotate the doProcess(consumerRecord) with @SagaEndpointListener annotation to make the method as a kafka consumer and provide the topic name as per the stacksaga Topic name specifications.
4 Accessing the IdempotencyKey for the respective event. the key is set by stacksaga engine from the orchestrator service. read more about maintaining the idempotency.
5 Accessing the current aggregator state. you can get the current aggregator state from the SagaPayload object and also update the aggregator state upon the business logic.
6 You can throw a NonRetryableException if you want to stop the transaction going forward. orchestrator service will be received an error response, and it will start compensation process.
7 updating the aggregator state
8 Catch the exception carefully. the error that you throws will determine whether if the execution should be retried or stop processing.
As you can see in the method signature, you can throw RetryableException or NonRetryableException based on the exception you have.
Classification the exception is the most important part in stacksaga. the unhandled RuntimeException(s) are considered as NonRetryableException exceptions. then the engine stops executing and sent a message to the orchestrator-service as there is a primary execution exception. and then it will start compensation process.
9 If you want to execute something when the primary-execution is failed even after retrying(retrying-exhausted) you can override and use the onDoProcessExhausted(consumerRecord) method to overcome that as a hock.
10 Annotate the undoProcess(consumerRecord) with @SagaEndpointListener annotation to make the method as a kafka consumer and provide the topic name as per the stacksaga Topic name specifications.
11 Accessing the IdempotencyKey for the respective event. the key is set by stacksaga engine from the orchestrator service. read more about maintaining the idempotency.
12 Accessing the last aggregator state(the state that was before primary-exception occurred) to retrieve the aggregator data.
13 Accessing the primary execution exception.
14 Updating the HistoryStore for setting the data on compensation process.
15 throws an exception. in the compensation process, it can not have any NonRetryableException or RuntimeException due to compensation. it can have only RetryableException. if an exception is thrown except RetryableException the transaction will be terminated by stopping compensation process.
16 If you want to execute something when the revert-execution is failed even after retrying(retrying-exhausted) you can override and use the onUndoProcessExhausted(consumerRecord) method to overcome that as a hock.
17 @SagaRetryableEndpoint annotation has been used on doProcess() method and undoProcess() for enabling Topic‑based/Non-Blocking retries mode. it avoids In‑memory retries and helps to keep consumer thread non-blocking in retrying. read more…​

Command-Endpoints — Async/Parallel Mode with RetryTemplate in-memory retries

This example shows that how a custom Command-Endpoint is implemented in Async/Parallel Mode (Order‑Free) and the retrying is done in RetryTemplate in-memory retries mode.

@Endpoint(eventName = "MAKE_PAYMENT")
public class MakePaymentEndpoint extends CommandEndpoint {

    private static final Logger log = LoggerFactory.getLogger(MakePaymentEndpoint.class);

    @Override
    @SagaEndpointListener(value = "DO_MAKE_PAYMENT", id = "DO_MAKE_PAYMENT")
    public void doProcess(ConsumerRecord<String, SagaPayload> consumerRecord) throws RetryableException, NonRetryableException {
        this.doProcessAsync(consumerRecord);
    }

    @Override
    protected void doProcessAsyncInAction(ConsumerRecord<String, SagaPayload> consumerRecord) throws RetryableException, NonRetryableException {
        try {
            String idempotencyKey = consumerRecord.value().getIdempotencyKey(); //accessing the IdempotencyKey for the respective event (4)
            log.debug("idempotencyKey for DO_MAKE_PAYMENT: {}", idempotencyKey);

            final ObjectNode aggregatorForUpdate = consumerRecord.value().getAggregatorForUpdate(); //accessing the current aggregator state
            double amount = aggregatorForUpdate.get("amount").asDouble();
            if (amount == 0) {
                throw NonRetryableException.buildWith(new IllegalStateException("amount must be greater than 0"));
            }
            aggregatorForUpdate.put("payment_status", "SUCCESS"); //updating the aggregator state
        } catch (Exception e) {
            if (retryable) {
                throw RetryableException.buildWith(e);
            } else {
                throw NonRetryableException.buildWith(e);
            }
        }
    }

    @Override
    protected void onDoProcessExhausted(ConsumerRecord<String, SagaPayload> consumerRecord) {
        //do something when primary Command execution retrying exhausted.
    }

    @Override
    @SagaEndpointListener(value = "UNDO_MAKE_PAYMENT", id = "UNDO_MAKE_PAYMENT")
    public void undoProcess(ConsumerRecord<String, SagaPayload> consumerRecord) throws RetryableException {
        this.undoProcessAsync(consumerRecord);
    }

    @Override
    protected void undoProcessAsyncInAction(ConsumerRecord<String, SagaPayload> consumerRecord) throws RetryableException {
        try {
            String idempotencyKey = consumerRecord.value().getIdempotencyKey(); //accessing the IdempotencyKey for the respective event (4)
            log.debug("idempotencyKey for DO_MAKE_PAYMENT: {}", idempotencyKey);

            final JsonNode aggregator = consumerRecord.value().getAggregator();//accessing the last aggregator state
            final double amount = aggregator.get("amount").asDouble();
            final PrimaryExecutionException primaryExecutionException = consumerRecord.value().getPrimaryExecutionException().orElseThrow(); //accessing the primary execution exception
            log.debug("amount is going to be deducted from the account {} due to {}", amount, primaryExecutionException.getRealExceptionMessage());

            consumerRecord.value().getHintStore().ifPresent(historyStore -> {
                historyStore.put("payment_status_revert", "SUCCESS"); //updating the historyStore
            });
        } catch (Exception e) {
            throw RetryableException.buildWith(e);
        }
    }

    @Override
    protected void onUndoProcessExhausted(ConsumerRecord<String, SagaPayload> consumerRecord) {
        //do something when undo(revert/compensation) execution retrying exhausted.
    }
}

Revert Endpoint

in Command Endpoints,there is a compensation execution for revering. in case if you want to add more execution before or after when the compensation execution is done, you can create Revert Endpoints to execute those kind of executions.

  • Each Revert Endpoint creates one topic in kafka.

    • REVERT_MAKE_PAYMENT_LOG (sub compensating execution topic)

stacksaga kafka engine revert endpoints

Revert-Endpoints — Keep‑Ordering Mode with In‑memory retries

This example shows that how a custom Revert-Endpoint is implemented in Keep‑Ordering Mode (Synchronous) and the retrying is done in In‑memory retries mode.

@Endpoint(eventName = "UPDATE_STOCK_LOG") (1)
public class UpdateStockLogEndpoint extends RevertEndpoint { (2)

    @Override
    @SagaEndpointListener(value = "UPDATE_STOCK_LOG", id = "UPDATE_STOCK_LOG") (3)
    public void doProcess(ConsumerRecord<String, SagaPayload> consumerRecord) throws RetryableException {
        try {
            String idempotencyKey = consumerRecord.value().getIdempotencyKey(); //accessing the IdempotencyKey for the respective event (4)
            log.debug("idempotencyKey for UPDATE_STOCK_LOG: {}", idempotencyKey);

            final JsonNode aggregator = consumerRecord.value().getAggregator();//accessing the last aggregator state for reading (5)
            final double amount = aggregator.get("amount").asDouble();
            final PrimaryExecutionException primaryExecutionException = consumerRecord.value().getPrimaryExecutionException().orElseThrow(); //accessing the primary execution exception (6)
            log.debug("amount has been deducted from the account {} due to {}", amount, primaryExecutionException.getRealExceptionMessage());

            consumerRecord.value().getHintStore().ifPresent(historyStore -> {
                historyStore.put("update_stock_log_status", "SUCCESS"); //updating the historyStore (7)
            });
        } catch (Exception e) {
            throw RetryableException.buildWith(e); (8)
        }
    }

    @Override
    protected void onDoProcessExhausted(ConsumerRecord<String, SagaPayload> consumerRecord) { (9)
        //do something when extra-revert execution retrying exhausted.
    }
}
1 Annotate your custom RevertEndpoint class with @Endpoint and set the eventName.
2 Extend your custom RevertEndpoint class from RevertEndpoint abstract class. then you have to override a required abstract method called doProcess(consumerRecord).
3 Annotate the doProcess(consumerRecord) with @SagaEndpointListener annotation to make the method as a kafka consumer and provide the topic name as per the stacksaga Topic name specifications.
4 Accessing the IdempotencyKey for the respective event. the key is set by stacksaga engine from the orchestrator service. read more about maintaining the idempotency.
5 Accessing the current aggregator state. you can get the current aggregator state from the SagaPayload object and also update the aggregator state upon the business logic.
6 Accessing the primary execution exception.
7 Updating the HistoryStore for setting the data on compensation process.
8 Throws an exception. in the compensation process, it can not have any NonRetryableException or RuntimeException due to compensation. it can have only RetryableException. if an exception is thrown except RetryableException the transaction will be terminated by stopping compensation process.
9 If you want to execute something when the execution is failed even after retrying(retrying-exhausted) you can override and use the onDoProcessExhausted(consumerRecord) method to overcome that as a hock.

Revert-Endpoints — Keep‑Ordering Mode with Topic‑based/Non-Blocking retries

This example shows that how a custom Revert-Endpoint is implemented in Keep‑Ordering Mode (Synchronous) and the retrying is done in Topic‑based/Non-Blocking retries mode.

@Endpoint(eventName = "UPDATE_STOCK_LOG")
public class UpdateStockLogEndpoint extends RevertEndpoint {

    @Override
    @SagaEndpointListener(value = "UPDATE_STOCK_LOG", id = "UPDATE_STOCK_LOG")
    @SagaRetryableEndpoint(attempts = "3", backoff = @Backoff(delay = 5000, multiplier = 0.5)) (1)
    public void doProcess(ConsumerRecord<String, SagaPayload> consumerRecord) throws RetryableException {
        try {
            String idempotencyKey = consumerRecord.value().getIdempotencyKey(); //accessing the IdempotencyKey for the respective event
            log.debug("idempotencyKey for UPDATE_STOCK_LOG: {}", idempotencyKey);

            final JsonNode aggregator = consumerRecord.value().getAggregator();//accessing the last aggregator state for reading
            final double amount = aggregator.get("amount").asDouble();
            final PrimaryExecutionException primaryExecutionException = consumerRecord.value().getPrimaryExecutionException().orElseThrow(); //accessing the primary execution exception
            log.debug("amount has been deducted from the account {} due to {}", amount, primaryExecutionException.getRealExceptionMessage());

            consumerRecord.value().getHintStore().ifPresent(historyStore -> {
                historyStore.put("update_stock_log_status", "SUCCESS"); //updating the historyStore
            });
        } catch (Exception e) {
            throw RetryableException.buildWith(e);
        }
    }

    @Override
    protected void onDoProcessExhausted(ConsumerRecord<String, SagaPayload> consumerRecord) {
        //do something when extra-revert execution retrying exhausted.
    }
}

The implementation is pretty much the same as Revert-Endpoints — Keep‑Ordering Mode with In‑memory retries. additionally,

1 @SagaRetryableEndpoint annotation has been used on doProcess() method for enabling Topic‑based/Non-Blocking retries mode. it avoids In‑memory retries and helps to keep consumer thread non-blocking in retrying. read more…​

Revert-Endpoints — Async/Parallel Mode with RetryTemplate in-memory retries

This example shows that how a custom Revert-Endpoint is implemented in Async/Parallel Mode (Order‑Free) and the retrying is done in RetryTemplate in-memory retries mode.

@Endpoint(eventName = "UPDATE_STOCK_LOG")
public class UpdateStockLogEndpoint extends RevertEndpoint {

    @Override
    @SagaEndpointListener(value = "UPDATE_STOCK_LOG", id = "UPDATE_STOCK_LOG")
    public void doProcess(ConsumerRecord<String, SagaPayload> consumerRecord) throws RetryableException {
        this.doProcessAsync(consumerRecord);
    }

    @Override
    protected void doProcessAsyncInAction(ConsumerRecord<String, SagaPayload> consumerRecord) throws RetryableException {
        try {
            String idempotencyKey = consumerRecord.value().getIdempotencyKey(); //accessing the IdempotencyKey for the respective event (4)
            log.debug("idempotencyKey for UPDATE_STOCK_LOG: {}", idempotencyKey);

            final JsonNode aggregator = consumerRecord.value().getAggregator();//accessing the last aggregator state for reading
            final double amount = aggregator.get("amount").asDouble();
            final PrimaryExecutionException primaryExecutionException = consumerRecord.value().getPrimaryExecutionException().orElseThrow(); //accessing the primary execution exception
            log.debug("amount has been deducted from the account {} due to {}", amount, primaryExecutionException.getRealExceptionMessage());

            consumerRecord.value().getHintStore().ifPresent(historyStore -> {
                historyStore.put("update_stock_log_status", "SUCCESS"); //updating the historyStore
            });
        } catch (Exception e) {
            throw RetryableException.buildWith(e);
        }
    }

    @Override
    protected void onDoProcessExhausted(ConsumerRecord<String, SagaPayload> consumerRecord) {
        //do something when Query execution retrying exhausted.
    }
}

The implementation is pretty much the same as Revert-Endpoints — Keep‑Ordering Mode with In‑memory retries. But,

The business logic part has been moved to the overridden method called doProcessAsyncInAction() instead of doProcess() method. and this.doProcessAsync(consumerRecord) method has been called in the doProcess() method. the execution flow as fallows.

  | [consumer-thread] |    | [consumer-thread] |    |   [saga-async- thread]    |
>>|                   | -> |                   | -> |                           |
  |   doProcess()     |    | doProcessAsync()  |    | doProcessAsyncInAction()  |

@Endpoint Annotation

@Endpoint Annotation is used for annotating the custom saga endpoint classes, and it has been inherited from spring @Component. the annotation has two parameters as follows,

  • value: the name of the bean in spring. is it not required to be provided.

  • eventName: The name of the event action.
    For instance, if we create an endpoint for making the payment, the eventName name would be MAKE_PAYMENT.

Topic name specifications
Even though the eventName can be any name, it would be related to the real endpoint’s topic name in kafka.
for instance, if we set the eventName as MAKE_PAYMENT for our Command Endpoint, the real topic name for the primary execution’s topic name should be DO_MAKE_PAYMENT and the revert(compensation) execution’s topic name should be UNDO_MAKE_PAYMENT. it is validated by the framework when the application is started and if there are not matched, it will throw an exception.

@SagaEndpointListener Annotation

@SagaEndpointListener is a StackSaga annotation used to designate a method as a Kafka message listener following the StackSaga framework’s conventions and style. It is an inherited and optimized version of Spring’s @KafkaListener, tailored specifically to suit StackSaga’s requirements.
most of the parameters can be configured as usual from @KafkaListener. but some of requires have been configured internally by the framework like containerFactory , groupId etc.

Concurrency and TopicPartitions related configurations can be done as you prefer in the same way as in the @KafkaListener.
@SagaEndpointListener does not support batch option like in @KafkaListener. there is an alternative approach @SagaEndpointListener supports suit StackSaga’s requirements called Async/Parallel Mode (Order‑Free).

@SagaRetryableEndpoint

Event Listing approaches in Kafka Client

In Stacksaga Kafka client, there are two primary strategies for handling Kafka messages:

Keep‑Ordering Mode (Synchronous)

  • Description:

    • Messages from a partition are processed one by one in the order Kafka delivers them.

    • The consumer thread processes the record and only after successful completion acknowledges the offset.

  • Characteristics:

    • ✅ Strict ordering is guaranteed per partition.

    • ✅ Failures can trigger retry or pause/resume logic without skipping messages.

    • ❌ If processing is slow, the partition is blocked — no further messages will be processed from that partition until the current one completes.

  • Typical Flow:

    poll -> process (same thread) -> ack -> next record
  • Retrying:
    Retrying can be done in two ways in Keep‑Ordering Mode as follows, *

Retrying In Keep‑Ordering Mode (Synchronous)

In‑memory retries

In‑memory retries is a retry strategy in Spring Kafka where failed message processing is automatically re-attempted using an exponential backoff delay between each retry with help of DefaultErrorHandler and ExponentialBackOffWithMaxRetries.
When the listener throws an exception, the consumer seeks back to the same offset and re-fetches the record from Kafka. The same consumer thread retries the processing after a backoff interval that grows exponentially (e.g., 1s → 2s → 4s → 8s) until either:
✅ The message is successfully processed, or
❌ The maximum retry attempts are reached, at which point the message is ignored without delegated to a Dead Letter Topic (DLT). This approach ensures message ordering is preserved per partition, prevents tight retry loops, and provides a progressive delay mechanism to avoid overwhelming downstream systems while still guaranteeing that transient failures are handled gracefully.

  • ✅ Pros

    • Simple to set up (just configure DefaultErrorHandler)

    • Doesn’t require extra Kafka topics

    • Good for quick, transient errors (e.g. database hiccup)

  • ❌ Cons

    • Blocks the partition until retries are done. see more…​

    • If the consumer restarts midway, you “lose” the retry delay and it starts over

    • All retrying is synchronous → one thread is tied up

Partition-Level Blocking During Retries

When a message fails and the DefaultErrorHandler with ExponentialBackOffWithMaxRetries is applied, the consumer seeks back to the same offset and retries the message on the same thread.
Because Kafka enforces strict ordering within a partition, no subsequent messages from that partition will be processed until the failing message is either successfully handled or exhausts all retry attempts (after which it may be sent to a Dead Letter Topic or discarded).
This behavior ensures ordering guarantees are never violated, but it also means that messages queued behind the failing record on that partition will wait.
Messages on other partitions are not affected — if the listener container is configured with multiple concurrent consumer threads, those other partitions continue processing normally while retries occur on the blocked partition.

Topic‑based/Non-Blocking retries

Topic‑based/Non-Blocking retries can be implemented with @SagaRetryableEndpoint annotation. it is an inherited version of @RetryableTopic's, tailored specifically to suit StackSaga’s requirements.

  • How it works?

    • When a listener fails:

      • The failed record is published to a new Kafka topic (e.g., orders-saga-retry-1)

      • That retry topic has its own backoff delay (controlled by consumer pause or delayed scheduling)

      • After the delay, the record is consumed from the retry topic and processed again

      • If it fails again, it may move to another retry topic (e.g., orders-saga-retry-2)

    • After final retry(exhausted) → send the repose to the root-topic(orchestrator service’s main topic of the aggregator)

As mentioned above, retry-exhausted messages are not delegated to a Dead Letter Topic (DLT) is not supported even if it is a common approach with spring’s @KafkaListener. instated, the repose is sent to the root topic of the orchestra service’s root topic for re-secluding.
  • Key behavior

    • Retries happen asynchronously via Kafka infrastructure.

    • The main partition is not blocked — new messages keep flowing.

    • You get better durability: retries survive restarts, because retry messages live in Kafka topics.

    • ✅ Pros

      • Doesn’t block the original topic partition

      • Survives application restarts (retries live in Kafka)

      • Ideal for longer backoff or when you don’t want to tie up threads

    • ❌ Cons

      • More complex — Spring Kafka creates extra retry topics

      • More Kafka storage overhead (messages copied to retry topics)

      • Slightly higher latency (messages hop between topics)

Async/Parallel Mode (Order‑Free)

  • Description:

  • Characteristics:

    • ✅ Very fast – consumer thread keeps polling new records without waiting.

    • ✅ Parallel processing – multiple messages can be processed at the same time.

    • ❌ No ordering guarantee – messages may finish out of order.

    • ❌ If async processing fails, Kafka won’t retry because the offset has already been committed (requires a custom retry/error handler).

  • Typical Flow:

    poll ->  hand off to thread pool -> ack -> consumer continues polling

Configure ThreadPoolTaskExecutor in Parallel Mode

As mentioned above in Async/Parallel Mode (Order‑Free), spring ThreadPoolTaskExecutor is used for retrying pool. so you provide or customize the ThreadPoolTaskExecutor for each endpoint as follows,

provide ThreadPoolTaskExecutor via the endpoint class (Bean)

If you want to provide separate ThreadPoolTaskExecutor for the endpoint instead of using default one, you can configure the pool by overriding the customize+actionName+ThreadPoolTaskExecutor method.

  • Query Endpoint or Revert Endpoint

    • customizeDoProcessThreadPoolTaskExecutor() —  customize doProcess topic’s pool.

  • Command Endpoint

    • customizeDoProcessThreadPoolTaskExecutor() —  customize doProcess topic’s pool.

    • customizeUndoProcessThreadPoolTaskExecutor() —  customize undoProcess topic’s pool.

All above customize** methods are called when only the bean is initialized(one time call method).

See th example below,

@Endpoint(eventName = "MAKE_PAYMENT")
public class MakePaymentEndpoint extends CommandEndpoint {

    @Override
    @SagaEndpointListener(value = "DO_MAKE_PAYMENT", id = "DO_MAKE_PAYMENT")
    public void doProcess(ConsumerRecord<String, SagaPayload> consumerRecord) throws RetryableException, NonRetryableException {
        this.doProcessAsync(consumerRecord);
    }

    @Override
    protected void doProcessAsyncInAction(ConsumerRecord<String, SagaPayload> consumerRecord) throws RetryableException, NonRetryableException {
        //TODO: implement the business logic
    }

    @Override
    protected ThreadPoolTaskExecutor customizeDoProcessThreadPoolTaskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(5);           // number of core threads (always alive)
        executor.setMaxPoolSize(10);           // max threads when queue is full
        executor.setQueueCapacity(50);         // tasks that can wait before new thread is created
        executor.setKeepAliveSeconds(60);      // idle thread timeout (for threads above core)
        executor.setThreadNamePrefix("custom-do-pool-"); // helps debugging
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); (1)
        // if queue is full, runs task in the caller thread instead of discarding
        executor.initialize();  // MUST call to apply settings
        return executor;
    }


    @Override
    @SagaEndpointListener(value = "UNDO_MAKE_PAYMENT", id = "UNDO_MAKE_PAYMENT")
    public void undoProcess(ConsumerRecord<String, SagaPayload> consumerRecord) throws RetryableException {
        this.undoProcessAsync(consumerRecord);
    }

    @Override
    protected void undoProcessAsyncInAction(ConsumerRecord<String, SagaPayload> consumerRecord) throws RetryableException {
        //TODO: implement the business logic
    }

    @Override
    protected ThreadPoolTaskExecutor customizeUndoProcessThreadPoolTaskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(5);           // number of core threads (always alive)
        executor.setMaxPoolSize(10);           // max threads when queue is full
        executor.setQueueCapacity(50);         // tasks that can wait before new thread is created
        executor.setKeepAliveSeconds(60);      // idle thread timeout (for threads above core)
        executor.setThreadNamePrefix("custom-undo-pool-"); // helps debugging
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); (1)
        // if queue is full, runs task in the caller thread instead of discarding
        executor.initialize();  // MUST call to apply settings
        return executor;
    }
}
1 You can manage the Backpressure naturally by setting CallerRunsPolicy to the pool. or of it is different, you can manage the Backpressure using pause() / resume() polling. read more Managing Backpressure in Parallel Mode

Managing Backpressure in Parallel Mode

What is Backpressure?

Backpressure is the mechanism that prevents a fast producer (Kafka) from overwhelming a slower consumer (our application). In streaming and messaging systems, producers can deliver messages at very high rates, but consumers often process them at varying speeds due to CPU, memory, or downstream system constraints.

Without backpressure, the system risks:

  • Queue overload – unbounded growth of messages waiting to be processed.

  • Out-of-memory errors – if queues or caches keep accumulating messages.

  • System instability – spikes of load leading to crashes or degraded performance.

Why Backpressure Matters in Kafka Consumers

Kafka is designed for high throughput and can deliver thousands of messages per second. But in real-world applications:

  • Processing involves I/O calls (DB writes, API calls).

  • Consumers often run on thread pools with finite capacity.

  • Workloads may have traffic spikes (e.g., flash sales, seasonal demand).

👉 If we just let Kafka keep polling endlessly, the consumer’s thread pool queue fills up, memory usage increases, and tasks start failing.

Managing Backpressure in Parallel Mode is crucial. because consumer thread(s) are not blocked by default due to the executions are hand off to a custom thread-pool. it can be done in two ways,

  1. Natural Backs-off
    By configuring the executor pool with CallerRunsPolicy. it slows down the consumer naturally by forcing the caller thread to run tasks, which indirectly throttles Kafka polling.

    This is the default Backpressure managing mechanism if you are using default pool called stacksagaKafkaAsyncTaskExecutor. or even you customize the pool you can by setting the rejectedExecutionHandler as CallerRunsPolicy.
    Checkout here.
  2. pause() / resume()
    Temporarily stop polling messages from Kafka when the system is under pressure, and resume when it’s ready to handle more. If your custom ThreadPoolTaskExecutor does not use CallerRunsPolicy as its RejectedExecutionHandler, tasks will be rejected once the pool is full. To prevent task loss and system overload, the framework internally pauses message polling from the respective topic when the executor queue reaches its capacity. Polling is resumed only when the queue has dropped to a safe threshold — by default, when the QueueSize falls below 50% of the configured QueueCapacity.

    If you want to customize the threshold or whatever logic, you can override the relevant isReadyToResume method(isUndoProcessReadyToResume(),isDoProcessReadyToResume()) and provide the resume should be started or not by returning the boolen value. it is called after each and every record execution if the container is paused status until resume. You can use the following necessary utility methods that the endpoint provides to accumulate(by using the pool’s matrix) whether it should be resumed or not.

    • getConfiguredDoProcessThreadPoolTaskExecutor() or getConfiguredUndoProcessThreadPoolTaskExecutor() to get the configured thread pool details. (default one or customized one).

      or if you use own way to resume the polling like Scheduling, you can use the inbuilt methods mentioned below,

    • resumeDoProcessContainer() —  Can be used for resume message polling from the doProcess topic.

    • resumeUndoProcessContainer() — Can be used for resume message polling from the undoProcess topic.

    • isDoProcessPauseRequested() — Can be used for check polling pause requested or not from the doProcess topic.

    • isUndoProcessPauseRequested() — Can be used for check polling pause requested or not from the undoProcess topic.

    • pauseDoProcessContainer() — Can be used for paused message polling from the doProcess topic.

    • pauseUndoProcessContainer() — Can be used for paused message polling from the undoProcess topic.

Configuring RetryTemplate in Parallel Mode

As mentioned above in Async/Parallel Mode (Order‑Free), spring RetryTemplate is used for retrying. so you provide or customize the RetryTemplate for each endpoint as follows,

provide RetryTemplate via the endpoint class (Bean)

If you are using Parallel Mode, a RetryTemplate is used internally for handling retrying. by default, it uses sagaAsyncRetryTemplate which provide by stacksaga. otherwise, you can change the configuration properties of the default, or you can provide separate RetryTemplate for each endpoint by overriding the following methods.

  • Query Endpoint or Revert Endpoint

    • customizeDoProcessRetryTemplate() —  customize doProcess topic’s pool.

  • Command Endpoint

    • customizeDoProcessRetryTemplate() —  customize doProcess topic’s pool.

    • customizeUndoProcessRetryTemplateCustomize() —  customize undoProcess topic’s pool.

All above customize** methods are called when only the bean is initialized(one time call method).

See the example belwo,

@Endpoint(eventName = "USER_VALIDATE")
public class UserValidateEndpoint extends QueryEndpoint {
    @Override
    @SagaEndpointListener(value = "DO_USER_VALIDATE",id = "DO_USER_VALIDATE")
    public void doProcess(ConsumerRecord<String, SagaPayload> consumerRecord) throws RetryableException, NonRetryableException {
        this.doProcessAsync(consumerRecord);
    }

    @Override
    protected void doProcessAsyncInAction(ConsumerRecord<String, SagaPayload> consumerRecord) throws RetryableException, NonRetryableException {
        //...
    }

    @Override
    protected RetryTemplate customizeDoProcessRetryTemplate() {
        RetryTemplate retryTemplate = new RetryTemplate();
        // ✅ Retry Policy (e.g., retry max 3 times for any Exception)
        SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy(3);
        retryTemplate.setRetryPolicy(retryPolicy);

        // ✅ Backoff Policy (e.g., wait 2 seconds between retries)
        FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
        backOffPolicy.setBackOffPeriod(2000);
        retryTemplate.setBackOffPolicy(backOffPolicy);
        return retryTemplate;
    }
}