Stacksaga Kafka Endpoints
Stacksaga Kafka Endpoints stands for creating topics accordingly Stacksaga engine’s form. the endpoint As per the architecture it can be created 3 types of endpoints as fallows,
Query Endpoint
If an execution(atomic execution) doesn’t make any state change due to executing that execution those kinds of executions are executed in Query Endpoints. due to it doesn’t make any changes in the database state, it has no any compensation action to undo.
these are the topics that define in the in SagaTopic with type of SagaTopicType.QUERY_DO_ACTION
-
Each Query Endpoint creates one topic in kafka.
-
DO_VALIDATE_USER
(primary execution topic) -
the topic name should be started with
DO_
-
As per the above diagram,
-
the orchestrator service(order-service) trigger an event via the SagaEventNavigator the topic of
DO_VALIDATE_USER
(the event is triggered by the SEC behind the sense accordingly your navigation in the SagaEventNavigator).
Query Endpoints — Keep‑Ordering Mode with In‑memory retries
This example shows that how a custom Query-Endpoint is implemented in Keep‑Ordering Mode (Synchronous) and the retrying is done in In‑memory retries mode.
@Endpoint(eventName = "USER_VALIDATE")(1)
public class UserValidateEndpoint extends QueryEndpoint { (2)
@Override
@SagaEndpointListener(value = "DO_USER_VALIDATE", id = "DO_USER_VALIDATE") (3)
public void doProcess(ConsumerRecord<String, SagaPayload> consumerRecord) throws RetryableException, NonRetryableException{
try {
//implement your execution (4)
} catch (Exception e) {
//throws RetryableException Or NonRetryableException based on the exception; (5)
}
}
@Override (6)
public void onDoProcessExhausted(ConsumerRecord<String, SagaPayload> consumerRecord) {
//do something when Query execution retrying exhausted.
}
}
1 | Annotate your custom QueryEndpoint class with @Endpoint and set the eventName . |
||
2 | Extend your custom QueryEndpoint class from QueryEndpoint abstract class.
then you have to override a required abstract method called doProcess(consumerRecord) . |
||
3 | Annotate the doProcess(consumerRecord) with @SagaEndpointListener annotation to make the method as a kafka consumer and provide the topic name as per the stacksaga Topic name specifications. |
||
4 | Implement your real execution here (ex: database accessing etc.). | ||
5 | Catch the exception carefully. the error that you throws will determine whether if the execution should be retried or stop processing. As you can see in the method signature, you can throw RetryableException or NonRetryableException based on the exception you have.
|
||
6 | If you want to execute something when the execution is failed even after retrying(retrying-exhausted) you can override and use the onDoProcessExhausted(consumerRecord) method to overcome that as a hock. |
Query Endpoints — Keep‑Ordering Mode with Topic‑based/Non-Blocking retries
This example shows that how a custom Query-Endpoint is implemented in Keep‑Ordering Mode (Synchronous) and the retrying is done in Topic‑based/Non-Blocking retries mode.
@Endpoint(eventName = "USER_VALIDATE")(1)
public class UserValidateEndpoint extends QueryEndpoint { (2)
@Override
@SagaEndpointListener(value = "DO_USER_VALIDATE", id= "DO_USER_VALIDATE") (3)
@SagaRetryableEndpoint(attempts = "3", backoff = @Backoff(delay = 5000, multiplier = 0.5)) (7)
public void doProcess(ConsumerRecord<String, SagaPayload> consumerRecord) throws RetryableException, NonRetryableException{
try {
//implement your execution (4)
} catch (Exception e) {
//throws RetryableException Or NonRetryableException based on the exception; (5)
}
}
@Override (6)
public void onDoProcessExhausted(ConsumerRecord<String, SagaPayload> consumerRecord) {
//do something when Query execution retrying exhausted.
}
}
1 | Annotate your custom QueryEndpoint class with @Endpoint and set the eventName . |
||
2 | Extend your custom QueryEndpoint class from QueryEndpoint abstract class.
then you have to override a required abstract method called doProcess(consumerRecord) . |
||
3 | Annotate the doProcess(consumerRecord) with @SagaEndpointListener annotation to make the method as a kafka consumer and provide the topic name as per the stacksaga Topic name specifications. |
||
4 | Implement your real execution here (ex: database accessing etc.). | ||
5 | Catch the exception carefully. the error that you throws will determine whether if the execution should be retried or stop processing. As you can see in the method signature, you can throw RetryableException or NonRetryableException based on the exception you have.
|
||
6 | If you want to execute something when the execution is failed even after retrying(retrying-exhausted) you can override and use the onDoProcessExhausted(consumerRecord) method to overcome that as a hock. |
||
7 | @SagaRetryableEndpoint annotation has been used for enabling Topic‑based/Non-Blocking retries mode. it avoids In‑memory retries and helps to keep consumer thread non-blocking in retrying. read more… |
Query Endpoints — Async/Parallel Mode with RetryTemplate in-memory retries
This example shows that how a custom Query-Endpoint is implemented in Async/Parallel Mode (Order‑Free) and the retrying is done in RetryTemplate in-memory retries mode.
@Endpoint(eventName = "USER_VALIDATE")(1)
public class UserValidateEndpoint extends QueryEndpoint { (2)
@Override
@SagaEndpointListener(value = "DO_USER_VALIDATE", id = "DO_USER_VALIDATE") (3)
public void doProcess(ConsumerRecord<String, SagaPayload> consumerRecord) {
this.doProcessAsync(consumerRecord); (4)
}
@Override (5)
protected void doProcessAsyncInAction(ConsumerRecord<String, SagaPayload> consumerRecord) throws RetryableException, NonRetryableException {
try {
//implement your execution (6)
} catch (Exception e) {
//throws RetryableException Or NonRetryableException based on the exception; (7)
}
}
@Override (8)
public void onDoProcessExhausted(ConsumerRecord<String, SagaPayload> consumerRecord) {
//do something when Query execution retrying exhausted.
}
}
1 | Annotate your custom QueryEndpoint class with @Endpoint and set the eventName . |
||
2 | Extend your custom QueryEndpoint class from QueryEndpoint abstract class.
then you have to override a required abstract method called doProcess(consumerRecord) . |
||
3 | Annotate the doProcess(consumerRecord) with @SagaEndpointListener annotation to make the method as a kafka consumer and provide the topic name as per the stacksaga Topic name specifications. |
||
4 | Call the doProcessAsync(consumerRecord) method by passing the received consumerRecord .
it will handed off the execution to a separate thread pool and internally configure the retry capabilities, and also it sends the response back to the orchestrator-service after executing(success or failed).
it executes the doProcessAsyncInAction() to invoke your real code. |
||
5 | override the doProcessAsyncInAction() method due to that method is invoked by the doProcessAsync() internally to run your exact business logic.
this is where you should write your exact business logic.
|
||
6 | Implement your real execution here (ex: database accessing etc.). | ||
7 | Catch the exception carefully. the error that you throws will determine whether if the execution should be retried or stop processing. As you can see in the method signature, you can throw RetryableException or NonRetryableException based on the exception you have.
|
||
8 | If you want to execute something when the execution is failed even after retrying(retrying-exhausted) you can override and use the onDoProcessExhausted(consumerRecord) method to overcome that as a hock. |
Command Endpoint
if an execution(atomic execution) make some state change in the database of the respective service those kinds of executions are executed in Command Endpoints. due to the fact that it make some state changes in the database of the respective service, in case of failure, the changes should be restored by invoking compensation reaction.
these are the topics that define in the in SagaTopic with type of SagaTopicType.COMMAND_DO_ACTION
-
Each Command Endpoint creates two topics in kafka for the primary execution and the compensation execution.
-
DO_MAKE_PAYMENT
(primary execution topic)-
the topic name should be started with
DO_
-
-
UNDO_MAKE_PAYMENT
(compensating execution topic)-
the topic name should be started with
UNDO_
-
-
Command-Endpoints — Keep‑Ordering Mode with In‑memory retries
This example shows that how a custom Command-Endpoint is implemented in Keep‑Ordering Mode (Synchronous) and the retrying is done in In‑memory retries mode.
@Endpoint(eventName = "MAKE_PAYMENT")(1)
public class MakePaymentEndpoint extends CommandEndpoint { (2)
@Override
@SagaEndpointListener(value = "DO_MAKE_PAYMENT", id = "DO_MAKE_PAYMENT") (3)
public void doProcess(ConsumerRecord<String, SagaPayload> consumerRecord) throws RetryableException, NonRetryableException {
try {
String idempotencyKey = consumerRecord.value().getIdempotencyKey(); //accessing the IdempotencyKey for the respective event (4)
log.debug("idempotencyKey for UPDATE_STOCK_LOG: {}", idempotencyKey);
final ObjectNode aggregatorForUpdate = consumerRecord.value().getAggregatorForUpdate(); //accessing the current aggregator state (5)
double amount = aggregatorForUpdate.get("amount").asDouble();
if (amount == 0) {
throw NonRetryableException.buildWith(new IllegalStateException("amount must be greater than 0")); (6)
}
aggregatorForUpdate.put("payment_status", "SUCCESS"); //updating the aggregator state (7)
} catch (Exception e) { (8)
if (retryable) {
throw RetryableException.buildWith(e);
} else {
throw NonRetryableException.buildWith(e);
}
}
}
@Override (9)
protected void onDoProcessExhausted(ConsumerRecord<String, SagaPayload> consumerRecord) {
//do something when primary Command execution retrying exhausted.
}
@Override
@SagaEndpointListener(value = "UNDO_MAKE_PAYMENT", id = "UNDO_MAKE_PAYMENT") (10)
public void undoProcess(ConsumerRecord<String, SagaPayload> consumerRecord) throws RetryableException {
try {
final JsonNode aggregator = consumerRecord.value().getAggregator();//accessing the last aggregator state (11)
String idempotencyKey = consumerRecord.value().getIdempotencyKey(); //accessing the IdempotencyKey for the respective event (12)
log.debug("idempotencyKey for UNDO_MAKE_PAYMENT: {}", idempotencyKey);
final double amount = aggregator.get("amount").asDouble();
final PrimaryExecutionException primaryExecutionException = consumerRecord.value().getPrimaryExecutionException().orElseThrow(); //accessing the primary execution exception (13)
log.debug("amount is going to be deducted from the account {} due to {}", amount, primaryExecutionException.getRealExceptionMessage());
consumerRecord.value().getHintStore().ifPresent(historyStore -> {
historyStore.put("payment_status_revert", "SUCCESS"); //updating the historyStore (14)
});
} catch (Exception e) {
throw RetryableException.buildWith(e); (15)
}
}
@Override (16)
protected void onUndoProcessExhausted(ConsumerRecord<String, SagaPayload> consumerRecord) {
//do something when undo(revert/compensation) execution retrying exhausted.
}
}
1 | Annotate your custom CommandEndpoint class with @Endpoint and set the eventName . |
||
2 | Extend your custom CommandEndpoint class from CommandEndpoint abstract class.
then you have to override two required abstract methods called doProcess(consumerRecord) and undoProcess(consumerRecord) . |
||
3 | Annotate the doProcess(consumerRecord) with @SagaEndpointListener annotation to make the method as a kafka consumer and provide the topic name as per the stacksaga Topic name specifications. |
||
4 | Accessing the IdempotencyKey for the respective event. the key is set by stacksaga engine from the orchestrator service. read more about maintaining the idempotency. | ||
5 | Accessing the current aggregator state. you can get the current aggregator state from the SagaPayload object and also update the aggregator state upon the business logic. |
||
6 | You can throw a NonRetryableException if you want to stop the transaction going forward. orchestrator service will be received an error response, and it will start compensation process. |
||
7 | updating the aggregator state | ||
8 | Catch the exception carefully. the error that you throws will determine whether if the execution should be retried or stop processing. As you can see in the method signature, you can throw RetryableException or NonRetryableException based on the exception you have.
|
||
9 | If you want to execute something when the primary-execution is failed even after retrying(retrying-exhausted) you can override and use the onDoProcessExhausted(consumerRecord) method to overcome that as a hock. |
||
10 | Annotate the undoProcess(consumerRecord) with @SagaEndpointListener annotation to make the method as a kafka consumer and provide the topic name as per the stacksaga Topic name specifications. |
||
11 | Accessing the last aggregator state(the state that was before primary-exception occurred) to retrieve the aggregator data. | ||
12 | Accessing the IdempotencyKey for the respective event. the key is set by stacksaga engine from the orchestrator service. read more about maintaining the idempotency. | ||
13 | Accessing the primary execution exception. | ||
14 | Updating the HistoryStore for setting the data on compensation process. | ||
15 | throws an exception. in the compensation process, it can not have any NonRetryableException or RuntimeException due to compensation. it can have only RetryableException .
if an exception is thrown except RetryableException the transaction will be terminated by stopping compensation process. |
||
16 | If you want to execute something when the revert-execution is failed even after retrying(retrying-exhausted) you can override and use the onUndoProcessExhausted(consumerRecord) method to overcome that as a hock. |
Command-Endpoints — Keep‑Ordering Mode with Topic‑based/Non-Blocking retries
This example shows that how a custom Command-Endpoint is implemented in Keep‑Ordering Mode (Synchronous) and the retrying is done in Topic‑based/Non-Blocking retries mode.
@Endpoint(eventName = "MAKE_PAYMENT")(1)
public class MakePaymentEndpoint extends CommandEndpoint { (2)
@Override
@SagaEndpointListener(value = "DO_MAKE_PAYMENT", id = "DO_MAKE_PAYMENT") (3)
@SagaRetryableEndpoint(attempts = "3", backoff = @Backoff(delay = 5000, multiplier = 0.5))(15)
public void doProcess(ConsumerRecord<String, SagaPayload> consumerRecord) throws RetryableException, NonRetryableException {
try {
String idempotencyKey = consumerRecord.value().getIdempotencyKey(); //accessing the IdempotencyKey for the respective event (4)
log.debug("idempotencyKey for DO_MAKE_PAYMENT: {}", idempotencyKey);
final ObjectNode aggregatorForUpdate = consumerRecord.value().getAggregatorForUpdate(); //accessing the current aggregator state (5)
double amount = aggregatorForUpdate.get("amount").asDouble();
if (amount == 0) {
throw NonRetryableException.buildWith(new IllegalStateException("amount must be greater than 0")); (6)
}
aggregatorForUpdate.put("payment_status", "SUCCESS"); //updating the aggregator state (7)
} catch (Exception e) { (8)
if (retryable) {
throw RetryableException.buildWith(e);
} else {
throw NonRetryableException.buildWith(e);
}
}
}
@Override (9)
protected void onDoProcessExhausted(ConsumerRecord<String, SagaPayload> consumerRecord) {
//do something when primary Command execution retrying exhausted.
}
@Override
@SagaEndpointListener(value = "UNDO_MAKE_PAYMENT", id = "UNDO_MAKE_PAYMENT") (10)
@SagaRetryableEndpoint(attempts = "3", backoff = @Backoff(delay = 5000, multiplier = 0.5))(17)
public void undoProcess(ConsumerRecord<String, SagaPayload> consumerRecord) throws RetryableException {
try {
String idempotencyKey = consumerRecord.value().getIdempotencyKey(); //accessing the IdempotencyKey for the respective event (11)
log.debug("idempotencyKey for UNDO_MAKE_PAYMENT: {}", idempotencyKey);
final JsonNode aggregator = consumerRecord.value().getAggregator();//accessing the last aggregator state (12)
final double amount = aggregator.get("amount").asDouble();
final PrimaryExecutionException primaryExecutionException = consumerRecord.value().getPrimaryExecutionException().orElseThrow(); //accessing the primary execution exception (13)
log.debug("amount is going to be deducted from the account {} due to {}", amount, primaryExecutionException.getRealExceptionMessage());
consumerRecord.value().getHintStore().ifPresent(historyStore -> {
historyStore.put("payment_status_revert", "SUCCESS"); //updating the historyStore (14)
});
} catch (Exception e) {
throw RetryableException.buildWith(e); (15)
}
}
@Override (16)
protected void onUndoProcessExhausted(ConsumerRecord<String, SagaPayload> consumerRecord) {
//do something when undo(revert/compensation) execution retrying exhausted.
}
}
1 | Annotate your custom CommandEndpoint class with @Endpoint and set the eventName . |
||
2 | Extend your custom CommandEndpoint class from CommandEndpoint abstract class.
then you have to override two required abstract methods called doProcess(consumerRecord) and undoProcess(consumerRecord) . |
||
3 | Annotate the doProcess(consumerRecord) with @SagaEndpointListener annotation to make the method as a kafka consumer and provide the topic name as per the stacksaga Topic name specifications. |
||
4 | Accessing the IdempotencyKey for the respective event. the key is set by stacksaga engine from the orchestrator service. read more about maintaining the idempotency. | ||
5 | Accessing the current aggregator state. you can get the current aggregator state from the SagaPayload object and also update the aggregator state upon the business logic. |
||
6 | You can throw a NonRetryableException if you want to stop the transaction going forward. orchestrator service will be received an error response, and it will start compensation process. |
||
7 | updating the aggregator state | ||
8 | Catch the exception carefully. the error that you throws will determine whether if the execution should be retried or stop processing. As you can see in the method signature, you can throw RetryableException or NonRetryableException based on the exception you have.
|
||
9 | If you want to execute something when the primary-execution is failed even after retrying(retrying-exhausted) you can override and use the onDoProcessExhausted(consumerRecord) method to overcome that as a hock. |
||
10 | Annotate the undoProcess(consumerRecord) with @SagaEndpointListener annotation to make the method as a kafka consumer and provide the topic name as per the stacksaga Topic name specifications. |
||
11 | Accessing the IdempotencyKey for the respective event. the key is set by stacksaga engine from the orchestrator service. read more about maintaining the idempotency. | ||
12 | Accessing the last aggregator state(the state that was before primary-exception occurred) to retrieve the aggregator data. | ||
13 | Accessing the primary execution exception. | ||
14 | Updating the HistoryStore for setting the data on compensation process. | ||
15 | throws an exception. in the compensation process, it can not have any NonRetryableException or RuntimeException due to compensation. it can have only RetryableException .
if an exception is thrown except RetryableException the transaction will be terminated by stopping compensation process. |
||
16 | If you want to execute something when the revert-execution is failed even after retrying(retrying-exhausted) you can override and use the onUndoProcessExhausted(consumerRecord) method to overcome that as a hock. |
||
17 | @SagaRetryableEndpoint annotation has been used on doProcess() method and undoProcess() for enabling Topic‑based/Non-Blocking retries mode.
it avoids In‑memory retries and helps to keep consumer thread non-blocking in retrying. read more…
|
Command-Endpoints — Async/Parallel Mode with RetryTemplate in-memory retries
This example shows that how a custom Command-Endpoint is implemented in Async/Parallel Mode (Order‑Free) and the retrying is done in RetryTemplate in-memory retries mode.
@Endpoint(eventName = "MAKE_PAYMENT")
public class MakePaymentEndpoint extends CommandEndpoint {
private static final Logger log = LoggerFactory.getLogger(MakePaymentEndpoint.class);
@Override
@SagaEndpointListener(value = "DO_MAKE_PAYMENT", id = "DO_MAKE_PAYMENT")
public void doProcess(ConsumerRecord<String, SagaPayload> consumerRecord) throws RetryableException, NonRetryableException {
this.doProcessAsync(consumerRecord);
}
@Override
protected void doProcessAsyncInAction(ConsumerRecord<String, SagaPayload> consumerRecord) throws RetryableException, NonRetryableException {
try {
String idempotencyKey = consumerRecord.value().getIdempotencyKey(); //accessing the IdempotencyKey for the respective event (4)
log.debug("idempotencyKey for DO_MAKE_PAYMENT: {}", idempotencyKey);
final ObjectNode aggregatorForUpdate = consumerRecord.value().getAggregatorForUpdate(); //accessing the current aggregator state
double amount = aggregatorForUpdate.get("amount").asDouble();
if (amount == 0) {
throw NonRetryableException.buildWith(new IllegalStateException("amount must be greater than 0"));
}
aggregatorForUpdate.put("payment_status", "SUCCESS"); //updating the aggregator state
} catch (Exception e) {
if (retryable) {
throw RetryableException.buildWith(e);
} else {
throw NonRetryableException.buildWith(e);
}
}
}
@Override
protected void onDoProcessExhausted(ConsumerRecord<String, SagaPayload> consumerRecord) {
//do something when primary Command execution retrying exhausted.
}
@Override
@SagaEndpointListener(value = "UNDO_MAKE_PAYMENT", id = "UNDO_MAKE_PAYMENT")
public void undoProcess(ConsumerRecord<String, SagaPayload> consumerRecord) throws RetryableException {
this.undoProcessAsync(consumerRecord);
}
@Override
protected void undoProcessAsyncInAction(ConsumerRecord<String, SagaPayload> consumerRecord) throws RetryableException {
try {
String idempotencyKey = consumerRecord.value().getIdempotencyKey(); //accessing the IdempotencyKey for the respective event (4)
log.debug("idempotencyKey for DO_MAKE_PAYMENT: {}", idempotencyKey);
final JsonNode aggregator = consumerRecord.value().getAggregator();//accessing the last aggregator state
final double amount = aggregator.get("amount").asDouble();
final PrimaryExecutionException primaryExecutionException = consumerRecord.value().getPrimaryExecutionException().orElseThrow(); //accessing the primary execution exception
log.debug("amount is going to be deducted from the account {} due to {}", amount, primaryExecutionException.getRealExceptionMessage());
consumerRecord.value().getHintStore().ifPresent(historyStore -> {
historyStore.put("payment_status_revert", "SUCCESS"); //updating the historyStore
});
} catch (Exception e) {
throw RetryableException.buildWith(e);
}
}
@Override
protected void onUndoProcessExhausted(ConsumerRecord<String, SagaPayload> consumerRecord) {
//do something when undo(revert/compensation) execution retrying exhausted.
}
}
Revert Endpoint
in Command Endpoints,there is a compensation execution for revering. in case if you want to add more execution before or after when the compensation execution is done, you can create Revert Endpoints to execute those kind of executions.
-
Each Revert Endpoint creates one topic in kafka.
-
REVERT_MAKE_PAYMENT_LOG
(sub compensating execution topic)
-
Revert-Endpoints — Keep‑Ordering Mode with In‑memory retries
This example shows that how a custom Revert-Endpoint is implemented in Keep‑Ordering Mode (Synchronous) and the retrying is done in In‑memory retries mode.
@Endpoint(eventName = "UPDATE_STOCK_LOG") (1)
public class UpdateStockLogEndpoint extends RevertEndpoint { (2)
@Override
@SagaEndpointListener(value = "UPDATE_STOCK_LOG", id = "UPDATE_STOCK_LOG") (3)
public void doProcess(ConsumerRecord<String, SagaPayload> consumerRecord) throws RetryableException {
try {
String idempotencyKey = consumerRecord.value().getIdempotencyKey(); //accessing the IdempotencyKey for the respective event (4)
log.debug("idempotencyKey for UPDATE_STOCK_LOG: {}", idempotencyKey);
final JsonNode aggregator = consumerRecord.value().getAggregator();//accessing the last aggregator state for reading (5)
final double amount = aggregator.get("amount").asDouble();
final PrimaryExecutionException primaryExecutionException = consumerRecord.value().getPrimaryExecutionException().orElseThrow(); //accessing the primary execution exception (6)
log.debug("amount has been deducted from the account {} due to {}", amount, primaryExecutionException.getRealExceptionMessage());
consumerRecord.value().getHintStore().ifPresent(historyStore -> {
historyStore.put("update_stock_log_status", "SUCCESS"); //updating the historyStore (7)
});
} catch (Exception e) {
throw RetryableException.buildWith(e); (8)
}
}
@Override
protected void onDoProcessExhausted(ConsumerRecord<String, SagaPayload> consumerRecord) { (9)
//do something when extra-revert execution retrying exhausted.
}
}
1 | Annotate your custom RevertEndpoint class with @Endpoint and set the eventName. |
2 | Extend your custom RevertEndpoint class from RevertEndpoint abstract class. then you have to override a required abstract method called doProcess(consumerRecord). |
3 | Annotate the doProcess(consumerRecord) with @SagaEndpointListener annotation to make the method as a kafka consumer and provide the topic name as per the stacksaga Topic name specifications. |
4 | Accessing the IdempotencyKey for the respective event. the key is set by stacksaga engine from the orchestrator service. read more about maintaining the idempotency. |
5 | Accessing the current aggregator state. you can get the current aggregator state from the SagaPayload object and also update the aggregator state upon the business logic. |
6 | Accessing the primary execution exception. |
7 | Updating the HistoryStore for setting the data on compensation process. |
8 | Throws an exception. in the compensation process, it can not have any NonRetryableException or RuntimeException due to compensation. it can have only RetryableException . if an exception is thrown except RetryableException the transaction will be terminated by stopping compensation process. |
9 | If you want to execute something when the execution is failed even after retrying(retrying-exhausted) you can override and use the onDoProcessExhausted(consumerRecord) method to overcome that as a hock. |
Revert-Endpoints — Keep‑Ordering Mode with Topic‑based/Non-Blocking retries
This example shows that how a custom Revert-Endpoint is implemented in Keep‑Ordering Mode (Synchronous) and the retrying is done in Topic‑based/Non-Blocking retries mode.
@Endpoint(eventName = "UPDATE_STOCK_LOG")
public class UpdateStockLogEndpoint extends RevertEndpoint {
@Override
@SagaEndpointListener(value = "UPDATE_STOCK_LOG", id = "UPDATE_STOCK_LOG")
@SagaRetryableEndpoint(attempts = "3", backoff = @Backoff(delay = 5000, multiplier = 0.5)) (1)
public void doProcess(ConsumerRecord<String, SagaPayload> consumerRecord) throws RetryableException {
try {
String idempotencyKey = consumerRecord.value().getIdempotencyKey(); //accessing the IdempotencyKey for the respective event
log.debug("idempotencyKey for UPDATE_STOCK_LOG: {}", idempotencyKey);
final JsonNode aggregator = consumerRecord.value().getAggregator();//accessing the last aggregator state for reading
final double amount = aggregator.get("amount").asDouble();
final PrimaryExecutionException primaryExecutionException = consumerRecord.value().getPrimaryExecutionException().orElseThrow(); //accessing the primary execution exception
log.debug("amount has been deducted from the account {} due to {}", amount, primaryExecutionException.getRealExceptionMessage());
consumerRecord.value().getHintStore().ifPresent(historyStore -> {
historyStore.put("update_stock_log_status", "SUCCESS"); //updating the historyStore
});
} catch (Exception e) {
throw RetryableException.buildWith(e);
}
}
@Override
protected void onDoProcessExhausted(ConsumerRecord<String, SagaPayload> consumerRecord) {
//do something when extra-revert execution retrying exhausted.
}
}
The implementation is pretty much the same as Revert-Endpoints — Keep‑Ordering Mode with In‑memory retries. additionally,
1 | @SagaRetryableEndpoint annotation has been used on doProcess() method for enabling Topic‑based/Non-Blocking retries mode.
it avoids In‑memory retries and helps to keep consumer thread non-blocking in retrying. read more…
|
Revert-Endpoints — Async/Parallel Mode with RetryTemplate in-memory retries
This example shows that how a custom Revert-Endpoint is implemented in Async/Parallel Mode (Order‑Free) and the retrying is done in RetryTemplate in-memory retries mode.
@Endpoint(eventName = "UPDATE_STOCK_LOG")
public class UpdateStockLogEndpoint extends RevertEndpoint {
@Override
@SagaEndpointListener(value = "UPDATE_STOCK_LOG", id = "UPDATE_STOCK_LOG")
public void doProcess(ConsumerRecord<String, SagaPayload> consumerRecord) throws RetryableException {
this.doProcessAsync(consumerRecord);
}
@Override
protected void doProcessAsyncInAction(ConsumerRecord<String, SagaPayload> consumerRecord) throws RetryableException {
try {
String idempotencyKey = consumerRecord.value().getIdempotencyKey(); //accessing the IdempotencyKey for the respective event (4)
log.debug("idempotencyKey for UPDATE_STOCK_LOG: {}", idempotencyKey);
final JsonNode aggregator = consumerRecord.value().getAggregator();//accessing the last aggregator state for reading
final double amount = aggregator.get("amount").asDouble();
final PrimaryExecutionException primaryExecutionException = consumerRecord.value().getPrimaryExecutionException().orElseThrow(); //accessing the primary execution exception
log.debug("amount has been deducted from the account {} due to {}", amount, primaryExecutionException.getRealExceptionMessage());
consumerRecord.value().getHintStore().ifPresent(historyStore -> {
historyStore.put("update_stock_log_status", "SUCCESS"); //updating the historyStore
});
} catch (Exception e) {
throw RetryableException.buildWith(e);
}
}
@Override
protected void onDoProcessExhausted(ConsumerRecord<String, SagaPayload> consumerRecord) {
//do something when Query execution retrying exhausted.
}
}
The implementation is pretty much the same as Revert-Endpoints — Keep‑Ordering Mode with In‑memory retries. But,
The business logic part has been moved to the overridden method called doProcessAsyncInAction()
instead of doProcess()
method. and this.doProcessAsync(consumerRecord)
method has been called in the doProcess()
method. the execution flow as fallows.
| [consumer-thread] | | [consumer-thread] | | [saga-async- thread] | >>| | -> | | -> | | | doProcess() | | doProcessAsync() | | doProcessAsyncInAction() |
@Endpoint Annotation
@Endpoint
Annotation is used for annotating the custom saga endpoint classes, and it has been inherited from spring @Component
.
the annotation has two parameters as follows,
-
value
: the name of the bean in spring. is it not required to be provided. -
eventName
: The name of the event action.
For instance, if we create an endpoint for making the payment, theeventName
name would beMAKE_PAYMENT
.
Topic name specifications Even though the eventName can be any name, it would be related to the real endpoint’s topic name in kafka.for instance, if we set the eventName as MAKE_PAYMENT for our Command Endpoint, the real topic name for the primary execution’s topic name should be DO_MAKE_PAYMENT and the revert(compensation) execution’s topic name should be UNDO_MAKE_PAYMENT . it is validated by the framework when the application is started and if there are not matched, it will throw an exception. |
@SagaEndpointListener Annotation
@SagaEndpointListener
is a StackSaga annotation used to designate a method as a Kafka message listener following the StackSaga framework’s conventions and style.
It is an inherited and optimized version of Spring’s @KafkaListener
, tailored specifically to suit StackSaga’s requirements.
most of the parameters can be configured as usual from @KafkaListener
.
but some of requires have been configured internally by the framework like containerFactory
, groupId
etc.
Concurrency and TopicPartitions related configurations can be done as you prefer in the same way as in the @KafkaListener .
|
@SagaEndpointListener does not support batch option like in @KafkaListener . there is an alternative approach @SagaEndpointListener supports suit StackSaga’s requirements called Async/Parallel Mode (Order‑Free).
|
Event Listing approaches in Kafka Client
In Stacksaga Kafka client, there are two primary strategies for handling Kafka messages:
Keep‑Ordering Mode (Synchronous)
-
Description:
-
Messages from a partition are processed one by one in the order Kafka delivers them.
-
The consumer thread processes the record and only after successful completion acknowledges the offset.
-
-
Characteristics:
-
✅ Strict ordering is guaranteed per partition.
-
✅ Failures can trigger retry or pause/resume logic without skipping messages.
-
❌ If processing is slow, the partition is blocked — no further messages will be processed from that partition until the current one completes.
-
-
Typical Flow:
poll -> process (same thread) -> ack -> next record
-
Retrying:
Retrying can be done in two ways in Keep‑Ordering Mode as follows, *
Retrying In Keep‑Ordering Mode (Synchronous)
In‑memory retries
In‑memory retries is a retry strategy in Spring Kafka where failed message processing is automatically re-attempted using an exponential backoff delay between each retry with help of DefaultErrorHandler
and ExponentialBackOffWithMaxRetries
.
When the listener throws an exception, the consumer seeks back to the same offset and re-fetches the record from Kafka.
The same consumer thread retries the processing after a backoff interval that grows exponentially (e.g., 1s → 2s → 4s → 8s) until either:
✅ The message is successfully processed, or
❌ The maximum retry attempts are reached, at which point the message is ignored without delegated to a Dead Letter Topic (DLT).
This approach ensures message ordering is preserved per partition, prevents tight retry loops, and provides a progressive delay mechanism to avoid overwhelming downstream systems while still guaranteeing that transient failures are handled gracefully.
In Stacksaga, delegating messages to a Dead Letter Topic (DLT) is not recommended. The framework already provides built‑in support for handling failed transactions by automatically rescheduling them for asynchronous retrying. This eliminates the need for immediate‑retry failures to be stored in Kafka again, avoiding unnecessary storage overhead and simplifying recovery logic. |
-
✅ Pros
-
Simple to set up (just configure DefaultErrorHandler)
-
Doesn’t require extra Kafka topics
-
Good for quick, transient errors (e.g. database hiccup)
-
-
❌ Cons
-
Blocks the partition until retries are done. see more…
-
If the consumer restarts midway, you “lose” the retry delay and it starts over
-
All retrying is synchronous → one thread is tied up
-
Partition-Level Blocking During Retries
When a message fails and the DefaultErrorHandler
with ExponentialBackOffWithMaxRetries
is applied, the consumer seeks back to the same offset and retries the message on the same thread.
Because Kafka enforces strict ordering within a partition, no subsequent messages from that partition will be processed until the failing message is either successfully handled or exhausts all retry attempts (after which it may be sent to a Dead Letter Topic or discarded).
This behavior ensures ordering guarantees are never violated, but it also means that messages queued behind the failing record on that partition will wait.
Messages on other partitions are not affected — if the listener container is configured with multiple concurrent consumer threads, those other partitions continue processing normally while retries occur on the blocked partition.
Topic‑based/Non-Blocking retries
Topic‑based/Non-Blocking retries can be implemented with @SagaRetryableEndpoint
annotation.
it is an inherited version of @RetryableTopic
's, tailored specifically to suit StackSaga’s requirements.
-
How it works?
-
When a listener fails:
-
The failed record is published to a new Kafka topic (e.g., orders-saga-retry-1)
-
That retry topic has its own backoff delay (controlled by consumer pause or delayed scheduling)
-
After the delay, the record is consumed from the retry topic and processed again
-
If it fails again, it may move to another retry topic (e.g., orders-saga-retry-2)
-
-
After final retry(exhausted) → send the repose to the root-topic(orchestrator service’s main topic of the aggregator)
-
As mentioned above, retry-exhausted messages are not delegated to a Dead Letter Topic (DLT) is not supported even if it is a common approach with spring’s @KafkaListener . instated, the repose is sent to the root topic of the orchestra service’s root topic for re-secluding.
|
-
Key behavior
-
Retries happen asynchronously via Kafka infrastructure.
-
The main partition is not blocked — new messages keep flowing.
-
You get better durability: retries survive restarts, because retry messages live in Kafka topics.
-
✅ Pros
-
Doesn’t block the original topic partition
-
Survives application restarts (retries live in Kafka)
-
Ideal for longer backoff or when you don’t want to tie up threads
-
-
❌ Cons
-
More complex — Spring Kafka creates extra retry topics
-
More Kafka storage overhead (messages copied to retry topics)
-
Slightly higher latency (messages hop between topics)
-
-
Async/Parallel Mode (Order‑Free)
-
Description:
-
Messages are acknowledged immediately in the consumer thread.
-
Processing is handed off to a separate
ThreadPoolTaskExecutor
for parallel execution and springRetryTemplate
is used for in-memory retrying internally.
(read more ConfiguringRetryTemplate
in Parallel Mode)
-
-
Characteristics:
-
✅ Very fast – consumer thread keeps polling new records without waiting.
-
✅ Parallel processing – multiple messages can be processed at the same time.
-
❌ No ordering guarantee – messages may finish out of order.
-
❌ If async processing fails, Kafka won’t retry because the offset has already been committed (requires a custom retry/error handler).
-
-
Typical Flow:
poll -> hand off to thread pool -> ack -> consumer continues polling
Configure ThreadPoolTaskExecutor
in Parallel Mode
As mentioned above in Async/Parallel Mode (Order‑Free), spring ThreadPoolTaskExecutor
is used for retrying pool.
so you provide or customize the ThreadPoolTaskExecutor
for each endpoint
as follows,
-
use default
stacksagaKafkaAsyncTaskExecutor
or update it via the configuration properties -
provide
ThreadPoolTaskExecutor
via theendpoint
class for each, separately.
provide ThreadPoolTaskExecutor
via the endpoint
class (Bean)
If you want to provide separate ThreadPoolTaskExecutor for the endpoint instead of using default one, you can configure the pool by overriding the customize+actionName+ThreadPoolTaskExecutor
method.
-
Query Endpoint or Revert Endpoint
-
customizeDoProcessThreadPoolTaskExecutor()
— customizedoProcess
topic’s pool.
-
-
-
customizeDoProcessThreadPoolTaskExecutor()
— customizedoProcess
topic’s pool. -
customizeUndoProcessThreadPoolTaskExecutor()
— customizeundoProcess
topic’s pool.
-
All above customize** methods are called when only the bean is initialized(one time call method).
|
See th example below,
@Endpoint(eventName = "MAKE_PAYMENT")
public class MakePaymentEndpoint extends CommandEndpoint {
@Override
@SagaEndpointListener(value = "DO_MAKE_PAYMENT", id = "DO_MAKE_PAYMENT")
public void doProcess(ConsumerRecord<String, SagaPayload> consumerRecord) throws RetryableException, NonRetryableException {
this.doProcessAsync(consumerRecord);
}
@Override
protected void doProcessAsyncInAction(ConsumerRecord<String, SagaPayload> consumerRecord) throws RetryableException, NonRetryableException {
//TODO: implement the business logic
}
@Override
protected ThreadPoolTaskExecutor customizeDoProcessThreadPoolTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5); // number of core threads (always alive)
executor.setMaxPoolSize(10); // max threads when queue is full
executor.setQueueCapacity(50); // tasks that can wait before new thread is created
executor.setKeepAliveSeconds(60); // idle thread timeout (for threads above core)
executor.setThreadNamePrefix("custom-do-pool-"); // helps debugging
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); (1)
// if queue is full, runs task in the caller thread instead of discarding
executor.initialize(); // MUST call to apply settings
return executor;
}
@Override
@SagaEndpointListener(value = "UNDO_MAKE_PAYMENT", id = "UNDO_MAKE_PAYMENT")
public void undoProcess(ConsumerRecord<String, SagaPayload> consumerRecord) throws RetryableException {
this.undoProcessAsync(consumerRecord);
}
@Override
protected void undoProcessAsyncInAction(ConsumerRecord<String, SagaPayload> consumerRecord) throws RetryableException {
//TODO: implement the business logic
}
@Override
protected ThreadPoolTaskExecutor customizeUndoProcessThreadPoolTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5); // number of core threads (always alive)
executor.setMaxPoolSize(10); // max threads when queue is full
executor.setQueueCapacity(50); // tasks that can wait before new thread is created
executor.setKeepAliveSeconds(60); // idle thread timeout (for threads above core)
executor.setThreadNamePrefix("custom-undo-pool-"); // helps debugging
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); (1)
// if queue is full, runs task in the caller thread instead of discarding
executor.initialize(); // MUST call to apply settings
return executor;
}
}
1 | You can manage the Backpressure naturally by setting CallerRunsPolicy to the pool. or of it is different, you can manage the Backpressure using pause() / resume() polling. read more Managing Backpressure in Parallel Mode |
Managing Backpressure in Parallel Mode
What is Backpressure?
Backpressure is the mechanism that prevents a fast producer (Kafka) from overwhelming a slower consumer (our application).
In streaming and messaging systems, producers can deliver messages at very high rates, but consumers often process them at varying speeds due to CPU, memory, or downstream system constraints.
Without backpressure, the system risks:
-
Queue overload – unbounded growth of messages waiting to be processed.
-
Out-of-memory errors – if queues or caches keep accumulating messages.
-
System instability – spikes of load leading to crashes or degraded performance.
Why Backpressure Matters in Kafka Consumers
Kafka is designed for high throughput and can deliver thousands of messages per second. But in real-world applications:
-
Processing involves I/O calls (DB writes, API calls).
-
Consumers often run on thread pools with finite capacity.
-
Workloads may have traffic spikes (e.g., flash sales, seasonal demand).
👉 If we just let Kafka keep polling endlessly, the consumer’s thread pool queue fills up, memory usage increases, and tasks start failing.
Managing Backpressure in Parallel Mode
is crucial. because consumer thread(s) are not blocked by default due to the executions are hand off to a custom thread-pool.
it can be done in two ways,
-
Natural Backs-off
By configuring the executor pool with CallerRunsPolicy. it slows down the consumer naturally by forcing the caller thread to run tasks, which indirectly throttles Kafka polling.This is the default Backpressure managing mechanism if you are using default pool called stacksagaKafkaAsyncTaskExecutor
. or even you customize the pool you can by setting therejectedExecutionHandler
asCallerRunsPolicy
.
Checkout here. -
pause()
/resume()
Temporarily stop polling messages from Kafka when the system is under pressure, and resume when it’s ready to handle more. If your custom ThreadPoolTaskExecutor does not use CallerRunsPolicy as its RejectedExecutionHandler, tasks will be rejected once the pool is full. To prevent task loss and system overload, the framework internally pauses message polling from the respective topic when the executor queue reaches its capacity. Polling is resumed only when the queue has dropped to a safe threshold — by default, when theQueueSize
falls below 50% of the configuredQueueCapacity
.If you want to customize the threshold or whatever logic, you can override the relevant isReadyToResume method(
isUndoProcessReadyToResume()
,isDoProcessReadyToResume()
) and provide the resume should be started or not by returning theboolen
value. it is called after each and every record execution if the container ispaused
status untilresume
. You can use the following necessary utility methods that the endpoint provides to accumulate(by using the pool’s matrix) whether it should be resumed or not.-
getConfiguredDoProcessThreadPoolTaskExecutor()
orgetConfiguredUndoProcessThreadPoolTaskExecutor()
to get the configured thread pool details. (default one or customized one).or if you use own way to
resume
the polling likeScheduling
, you can use the inbuilt methods mentioned below, -
resumeDoProcessContainer()
— Can be used forresume
message polling from thedoProcess
topic. -
resumeUndoProcessContainer()
— Can be used forresume
message polling from theundoProcess
topic. -
isDoProcessPauseRequested()
— Can be used for check polling pause requested or not from thedoProcess
topic. -
isUndoProcessPauseRequested()
— Can be used for check polling pause requested or not from theundoProcess
topic. -
pauseDoProcessContainer()
— Can be used forpaused
message polling from thedoProcess
topic. -
pauseUndoProcessContainer()
— Can be used forpaused
message polling from theundoProcess
topic.
-
Configuring RetryTemplate
in Parallel Mode
As mentioned above in Async/Parallel Mode (Order‑Free), spring RetryTemplate
is used for retrying. so you provide or customize the RetryTemplate
for each endpoint
as follows,
-
use default
sagaAsyncRetryTemplate
or update it via the configuration properties -
provide
RetryTemplate
via theendpoint
class for each, separately.
provide RetryTemplate
via the endpoint
class (Bean)
If you are using Parallel Mode
, a RetryTemplate
is used internally for handling retrying. by default, it uses sagaAsyncRetryTemplate
which provide by stacksaga.
otherwise, you can change the configuration properties of the default, or you can provide separate RetryTemplate
for each endpoint by overriding the following methods.
-
Query Endpoint or Revert Endpoint
-
customizeDoProcessRetryTemplate()
— customizedoProcess
topic’s pool.
-
-
-
customizeDoProcessRetryTemplate()
— customizedoProcess
topic’s pool. -
customizeUndoProcessRetryTemplateCustomize()
— customizeundoProcess
topic’s pool.
-
All above customize** methods are called when only the bean is initialized(one time call method).
|
See the example belwo,
@Endpoint(eventName = "USER_VALIDATE")
public class UserValidateEndpoint extends QueryEndpoint {
@Override
@SagaEndpointListener(value = "DO_USER_VALIDATE",id = "DO_USER_VALIDATE")
public void doProcess(ConsumerRecord<String, SagaPayload> consumerRecord) throws RetryableException, NonRetryableException {
this.doProcessAsync(consumerRecord);
}
@Override
protected void doProcessAsyncInAction(ConsumerRecord<String, SagaPayload> consumerRecord) throws RetryableException, NonRetryableException {
//...
}
@Override
protected RetryTemplate customizeDoProcessRetryTemplate() {
RetryTemplate retryTemplate = new RetryTemplate();
// ✅ Retry Policy (e.g., retry max 3 times for any Exception)
SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy(3);
retryTemplate.setRetryPolicy(retryPolicy);
// ✅ Backoff Policy (e.g., wait 2 seconds between retries)
FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
backOffPolicy.setBackOffPeriod(2000);
retryTemplate.setBackOffPolicy(backOffPolicy);
return retryTemplate;
}
}