stacksaga-kafka-worker — Endpoints
Stacksaga-kafka-worker is the module that provides the functionalities for the worker services to execute the commands that are sent by the orchestrator via the regular topics. it is responsible for consuming the command messages from the regular topics, executing the business logic, and sending the response messages back to the orchestrator via the response topics (shared root topic or domain-root topic based on the listener scope configuration).
In high-level, The following duties are done by the stacksaga-kafka-worker module.
-
Provides the specifications for creating the kafka saga endpoints according to the Stacksaga kafka engine in the target client service.
-
Making the response back to the orchestrator service after each invocation.
-
Providing immediate retrying facilities in case of retryable exceptions.
StackSaga Kafka Endpoints
Stacksaga Kafka Endpoints stands for creating topics accordingly Stacksaga engine’s form. the endpoint As per the architecture it can be created 2 types of endpoints as fallows,
Exception Handling Reference
Choosing the correct exception type inside an endpoint method is one of the most critical implementation decisions in StackSaga-Kafka. The wrong choice can either leave a saga stalled indefinitely, trigger unnecessary compensation, or discard valuable error context that the compensation process needs.
Exception Types at a Glance
| Exception | Available In | Framework Response | Use When |
|---|---|---|---|
|
|
In |
The failure is permanent — a business rule violation, invalid input, or any condition that retrying will never resolve.
Use |
|
|
Schedules the saga for a deferred retry via the ring coordinator. The step is re-attempted from the last persisted event store state. |
The failure is transient — a network timeout, temporary resource unavailability, or an infrastructure blip expected to resolve on its own. |
|
|
Triggers an immediate in-process retry via Spring |
A fast local retry is worth attempting before escalating to a scheduled retry. Always use this as the first line of defence for transient failures in non-reactive endpoints. |
Any unhandled exception |
|
Treated identically to |
Never intentional in endpoints — see the WARNING below.
In |
Never let an unhandled exception escape from an endpoint accidentally.
Any Throwable not wrapped in a framework exception type is treated as non-retryable.
The compensation sequence starts (or terminates if in undoProcess()) with no error context, because unhandled exceptions carry no put() key-value pairs for the compensation process to use.
Always catch your exceptions and wrap them with the correct type.
|
Exception Decision Flow
When implementing an endpoint method, use the following logic to decide which exception to throw:
-
Is the failure transient? (timeout, temporary resource unavailability, infrastructure blip)
-
YES — non-reactive endpoint? → Throw
JustRetryableExecutorExceptionfirst. If the immediate retry limit is exhausted → throwRetryableExecutorException. -
YES — reactive endpoint? → Use
retryWhen()in the reactive pipeline. If retries are exhausted → returnMono.error(RetryableExecutorException.of(…)).
-
-
Is it a permanent business failure? (invalid data, violated rule, unrecoverable state) → Throw
NonRetryableExecutorException. Useput()to attach error context for the compensation steps. -
Neither of the above? → Wrap in
NonRetryableExecutorExceptionwith a descriptive message. Never let it propagate as an unhandled exception.
Throwing from onNext() and onNextRevert()
Exception handling is not limited to endpoint methods.
The onNext() and onNextRevert() methods in the EventManager also participate in the saga failure model:
-
onNext()— Any exception thrown here (including viaSagaPrimaryEventAction.error(…)) is treated as a non-retryable failure. The framework transitions the saga toFAILEDand begins the compensation sequence immediately. This means you can useonNext()intentionally to force compensation based on orchestrator-side business conditions — for instance, if a rule evaluated after a worker’s reply determines the saga must not continue. -
onNextRevert()— Any exception thrown here causes the compensation sequence to terminate immediately. The transaction is marked as compensation-failed and will not be retried.
| Thrown From | Immediate Effect | Resulting State Transition |
|---|---|---|
|
Compensation begins |
|
|
Compensation begins |
|
|
Compensation terminates |
|
|
Compensation terminates |
|
Query Endpoint
If an execution(atomic execution) doesn’t make any state change due to executing that execution those kinds of executions are executed in Query Endpoints. due to it doesn’t make any changes in the database state, it has no any compensation action to undo.
these are the topics that define in the in stacksaga-kafka-implementation/orchestrator/topics-event-manager.adoc#custom_topic_implementation with type of SagaEventType.QUERY_DO_ACTION
For instance, in the place-order example, the execution of fetching user’s details and validate is a query execution because it doesn’t make any state change in the database, it just fetches the data and returns it. here is the custom endpoint for that execution.
Query endpoints can be created in two different ways in StackSaga.
Non-Reactive (Imperative) Query Endpoint
Non-reactive query endpoints are the traditional and blocking way of implementing the query endpoints. in this approach, the method that is responsible for executing the query is implemented in a blocking way. for instance, in the place-order example, the query endpoint for fetching user’s details can be implemented as follows.
(2)
@SagaEndpoint(
//the real topic name will be saga.do.user-service.validate-user
topicNameSuffix = "user-service.get-user", (3)
listenerScope = ListenerScope.SHARED (4)
)
public class UserValidateEndpoint extends QueryEndpoint { (1)
(5)
@Override
public void doProcess(ConsumerRecord<String, SagaPayload> consumerRecord)
throws JustRetryableExecutorException, RetryableExecutorException, NonRetryableExecutorException {
log.info("Message Key (Transaction Id): {}", consumerRecord.key()); (6)
log.info("IdempotencyKey: {}", consumerRecord.value().getIdempotencyKey()); (7)
consumerRecord.value().getCurrentDomainEntityStateForUpdate().ifPresent(currentDomainEntityState -> { (8)
log.info("Received payload for before user validation: {}", currentDomainEntityState);
try {
{
//user validation logic (9)
String username = currentDomainEntityState.get("username").asText();
log.info("Validating user with username: {}", username);
}
{(10)
// put the result into the domain entity state for the next topic to use.
currentDomainEntityState.put("is_user_validated", true);
currentDomainEntityState.put("validation_note", "user validated at " + LocalDateTime.now());
}
} catch (SomeRetryableException e) {
(11)
throw RetryableExecutorException.of(e);
} catch (SomeNonRetryableException e) {
(12)
throw NonRetryableExecutorException
.buildWith(e)
.put("error_code", "USER_VALIDATION_FAILED")
.put("key-1", "value-1")
.put("key-2", "value-2")
.build();
}
});
}
}
| The example has been simplified to focus on the main points. make sure refer each and every concept though the documentation to have a better understanding of the framework and its capabilities. |
| 1 | Create a custom query endpoint class by extending the QueryEndpoint class. |
||||
| 2 | Annotate the custom query endpoint class with @SagaEndpoint annotation. it primarily marks the class as a spring bean. |
||||
| 3 | topicNameSuffix: provide the suffix of the topic name for this endpoint. the framework will create the real topic name by concatenating the provided suffix as saga.do.user-service.get-user. the prefix saga.do is added by the framework to indicate that this topic is used for the primary execution. this is the topic the EventManager will send the command messages to trigger the execution of this endpoint. |
||||
| 4 | listenerScope: Provide the listener scope for the endpoint. it can be either ListenerScope.SHARED or ListenerScope.ISOLATED. see stacksaga-kafka-implementation/worker/worker-configuration.adoc#topic_model_stacksaga_kafka_worker |
||||
| 5 | Override the doProcess() method to implement the business logic for executing the query. this method is invoked by the framework when a message is received in the respective topic. you can use the provided ConsumerRecord to access the message key, payload, and other metadata withing the method scope. |
||||
| 6 | You can access the message key from the ConsumerRecord in string format. the key is the transaction id from the orchestrator that is sent along with the message.
|
||||
| 7 | You can access the idempotency key from the payload of the ConsumerRecord. the idempotency key is generated by the framework in the orchestrator side and sent along with the message to ensure that even if the same message is processed multiple times due to retries or duplicates, it will have the same idempotency key, allowing you to implement idempotent processing logic in your endpoint. |
||||
| 8 | You can access the current state of the domain entity from the payload of the ConsumerRecord by using the getCurrentDomainEntityStateForUpdate() method. this method returns an optional value that contains the current state of the domain entity if it is present. in primary execution, the domain entity state is always present because the state is sent from the orchestrator. the Optional provide the safe access.
|
||||
| 9 | Implement the business logic for executing the query. you can use the current state of the domain entity to perform the necessary operations and get the required data. for instance, you can call the internal service class and get the user data from the database based on the username that is provided in the domain entity state. | ||||
| 10 | After getting the required data and performing the necessary operations, you can put the result into the domain entity state for the next topic to use. for instance, you can put a field is_user_validated with value true to indicate that the user has been validated successfully, and also you can put a field validation_note with some note about the validation process.
|
||||
| 11 | This is the most important part in the implementation of the endpoint. you must handle the exceptions properly by throwing them with the appropriate type. if the exception is retryable, you should throw it by wrapping with RetryableExecutorException. if the exception is non-retryable, you should throw it by wrapping with NonRetryableExecutorException.
|
||||
| 12 | when throwing NonRetryableExecutorException, you can also provide additional information by using the put() method to add key-value pairs to the exception. this additional information can be useful for future compensation processing. because in the compensation processing, you can access the exception and get the additional information that you provided in the exception to perform the necessary compensation logic. for instance, if you put an error_code with value USER_VALIDATION_FAILED in the exception, you can access it in the compensation processing and check if the error code is USER_VALIDATION_FAILED to perform specific compensation logic for that error code.
|
Reactive (Non-Blocking) Query Endpoint
@Slf4j
@SagaEndpoint(
//the real topic name will be saga.do.user-service.validate-user
topicNameSuffix = "user-service.get-user",
listenerScope = ListenerScope.SHARED
)
public class ReactiveUserValidateEndpoint extends ReactiveQueryEndpoint {(1)
(2)
@Override
public Mono<Void> doProcess(ConsumerRecord<String, SagaPayload> consumerRecord) {
log.info("Message Key (Transaction Id): {}", consumerRecord.key());
log.info("IdempotencyKey: {}", consumerRecord.value().getIdempotencyKey());
final ObjectNode currentDomainEntityState = consumerRecord
.value()
.getCurrentDomainEntityStateForUpdate()
.orElseThrow();
log.info("Received payload for before user validation: {}", currentDomainEntityState);
String username = currentDomainEntityState.get("username").asText();
return this.internalUserService
.getUserDetails(username)
.flatMap(userDetails -> {
{
//user validation logic
}
{
// put the result into the domain entity state for the next topic to use.
currentDomainEntityState.put("is_user_validated", true);
currentDomainEntityState.put("validation_note", "user validated at " + LocalDateTime.now());
}
return Mono.empty();
})
.onErrorResume(SomeRetryableException.class, e -> {
return Mono.error(RetryableExecutorException.of(e));
})
.onErrorResume(SomeNonRetryableException.class, e -> {
return Mono.error(NonRetryableExecutorException
.buildWith(e)
.put("error_code", "USER_VALIDATION_FAILED")
.build());
});
}
}
| If you are not familiar with how the endpoints are working in the Stacksaga Kafka Orchestrator, it is recommended to refer to the Non-Reactive (Imperative) Query Endpoint before going through the reactive query endpoint because all the concepts and the main points have been explained there in a more detailed way. |
Most of the points are the same as the Non-Reactive (Imperative) Query Endpoint, the main difference is that in the reactive query endpoint, you need to return a Mono<Void> from the doProcess() method, and you can use the reactive programming paradigm to implement the business logic for executing the query. you can use the Mono and Flux types from Project Reactor to handle asynchronous and non-blocking operations in your endpoint implementation. for instance, in the example above, the internalUserService.getUserDetails(username) method returns a Mono<UserDetails> which is a reactive type that represents a single asynchronous value. you can use the map() operator to transform the result and perform the necessary operations, and you can use the onErrorResume() operator to handle exceptions and return the appropriate error types.
Command Endpoint
If an atomic execution makes state change in the database those kinds of executions are executed in Command Endpoints. due to it makes changes in the database, it has compensation action to undo.
stacksaga-kafka-implementation/orchestrator/topics-event-manager.adoc#custom_topic_implementation which creates as the type of SagaEventType.COMMAND_DO_ACTION and SagaEventType.COMMAND_UNDO_ACTION are configured in the command endpoints.
For instance, in the place-order example, the execution of Make Payment execution is a command execution because it makes state change in the payment service’s database by creating a new payment record and updating the user’s balance. in case the entire process should be compensated after successfully executing the Make Payment execution, the compensation action will be triggered to undo the payment.
In the Command endpoints, there are two methods that you need to implement to handle the primary execution and the compensation execution. the doProcess() method is responsible for executing the primary execution and the undoProcess() method is responsible for executing the compensation logic to undo the changes that are made by the primary execution in case of any failure in the next steps after the primary execution. therefore, you need to implement both methods to have a complete command endpoint implementation.
-
in the
doProcess()method, it can be thrownRetryableExecutorExceptionandNonRetryableExecutorExceptionto indicate the failure of the primary execution to the framework. ifRetryableExecutorExceptionis thrown, the framework will schedule the transaction for retrying based on the retry configurations. ifNonRetryableExecutorExceptionis thrown, the framework will trigger the compensation process to undo the transaction. -
If it is thrown
NonRetryableExecutorExceptionfrom thedoProcess()method, the modified values withing the method is not applied to the domain entity state because the compensation process is started and the state will be reverted to the last successful state. see more in details in the example.
Command endpoints can be created in two different ways in StackSaga.
Non-Reactive (Imperative) Command Endpoint
Non-reactive command endpoints are the traditional and blocking way of implementing the command endpoints. in this approach, the method that is responsible for executing the command is implemented in a blocking way.
@Slf4j
@SagaEndpoint(
topicNameSuffix = "payment.service.make-payment",
listenerScope = ListenerScope.SHARED
)
class MakePaymentEndpoint extends CommandEndpoint { (1)
(2)
@Override
public void doProcess(ConsumerRecord<String, SagaPayload> consumerRecord)
throws JustRetryableExecutorException, RetryableExecutorException, NonRetryableExecutorException {
//the theory is the same as the query endpoint.
}
(2)
@Override
public void undoProcess(ConsumerRecord<String, SagaPayload> consumerRecord) throws JustRetryableExecutorException, RetryableExecutorException {
log.info("Message Key (Transaction Id): {}", consumerRecord.key());
log.info("IdempotencyKey: {}", consumerRecord.value().getIdempotencyKey());
(3)
final JsonNode lastDomainEntityState = consumerRecord.value().getDomainEntityState();
log.info("Last domain entity state {}", lastDomainEntityState);
try {
{//undo payment logic
final double totalAmount = lastDomainEntityState.get("total_amount").asDouble();
final String paymentRef = lastDomainEntityState.get("payment_reference_id").asText();
this.internalPaymentSerivce.refundPayment(paymentRef, totalAmount);
//update some hints for the next revert execution if needed.
consumerRecord.value().getHintStore().ifPresent(hintStore -> {(4)
hintStore.put("last_refund_time", LocalDateTime.now().toString());
});
}
} catch (SomeRetryableException e) {(5)
throw RetryableExecutorException.of(e);
}
}
}
The doProcess() method implementation is not provided in the example because the main focus is on the undoProcess() method to explain the compensation processing in the command endpoints. but the theory for implementing the doProcess() method is the same as the query endpoint that is explained in the previous section. read the Non-Reactive (Imperative) Query Endpoint to understand how to implement the doProcess() method.
|
| 1 | Create a custom command endpoint class by extending the CommandEndpoint class. |
||
| 2 | Override the doProcess() method to implement the business logic for executing the command. this is exactly the same as the query endpoint, you can refer Non-Reactive (Imperative) Query Endpoint example to get the details about how to implement the doProcess() method. |
||
| 3 | Override the undoProcess() method to implement the business logic for undoing the command. this method is invoked by the framework when a compensation action is triggered for the respective command execution. you can use the provided ConsumerRecord to access the message key, payload, and other metadata within the method scope.
|
||
| 4 | If you want to send some hints or additional information from the current compensation execution to the next compensation execution in the next step, you can use the getHintStore() method of the payload to access the hint store and put the necessary information in it. the hint store is a key-value store that allows you to share information between different compensation executions. for instance, if you want to put the last refund time as a hint for the next compensation execution, you can use the put() method of the hint store to add that information with a specific key as shown in the example.
|
||
| 5 | Identify the retryable exceptions that can be thrown from the compensation logic and make sure to throw them by wrapping with RetryableExecutorException to enable the retrying mechanism for the compensation process. if any unhandled exception is thrown from the undoProcess() method, the compensation process will be stopped and the transaction will be terminated. therefore, it is important to handle the exceptions properly in the compensation logic to ensure that the compensation process can retry when necessary and to avoid unexpected termination of the transaction. in undoProcess() only can throw JustRetryableExecutorException and RetryableExecutorException. |
The maximum number of retry attempts for compensation executions, retry window durations, boundary guard thresholds, and the behavior for permanently stuck or exhausted compensation transactions (dead-letter handling) are all configured through stacksaga-database-support.
See the stacksaga-database-support documentation for full details on these configurations.
|
Immediate Retry with JustRetryableExecutorException
Immediate retry with JustRetryableExecutorException only support for the non-reactive endpoints because the reactive endpoints already support immediate retry by using the reactive programming paradigm without the need for a specific exception type. in the non-reactive endpoints, you can throw JustRetryableExecutorException to trigger an immediate retry without waiting for the scheduled retry. if the immediate retry is successful, it will avoid the delay that is caused by scheduling, and also it will avoid sending redundant messages to the orchestrator for each retry attempt. in case the immediate retry is failed, you can throw the RetryableExecutorException to trigger the scheduled retry as a fallback mechanism. in the non-reactive endpoints, the retry is handled behind the scenes by the framework by using spring retry-template. so you are able to use all the features that are provided by the spring retry-template for retrying in the non-reactive endpoints.
Here is an example of how the JustRetryableExecutorException can be used for immediate retrying in the non-reactive command endpoint implementation.
Immediate retry with RetryTemplate is supported for doProcess() method in both non-reactive command and query endpoints and also for undoProcess() method in the non-reactive command endpoints.
|
@Slf4j
@SagaEndpoint(
topicNameSuffix = "payment.service.make-payment",
listenerScope = ListenerScope.SHARED
)
public class MakePaymentEndpoint extends CommandEndpoint {
@Override
public void doProcess(ConsumerRecord<String, SagaPayload> consumerRecord)
throws JustRetryableExecutorException, RetryableExecutorException, NonRetryableExecutorException {
consumerRecord
.value()
.getCurrentDomainEntityStateForUpdate()
.ifPresent(currentDomainEntityState -> {
final String paymentReferenceId = currentDomainEntityState.get("payment_reference_id").asText();
final double totalAmount = currentDomainEntityState.get("total_amount").asDouble();
try {
this.internalPaymentService.makePayment(paymentReferenceId, totalAmount);
} catch (SomeRetryableException e) {
RetryContext retryContext = consumerRecord.value().getRetryContext().orElseThrow();
if (retryContext.getRetryCount() >= 3) {
//if the retry count has reached the threshold, we can throw a NonRetryableExecutorException to stop further retries.
throw NonRetryableExecutorException
.buildWith(e)
.put("error_code", "PAYMENT_FAILED_AFTER_RETRIES")
.build();
} else {
throw JustRetryableExecutorException.of("Payment failed, retrying... attempt " + (retryContext.getRetryCount() + 1));
}
} catch (SomeNonRetryableException e) {
throw NonRetryableExecutorException
.buildWith(e)
.put("error_code", "PAYMENT_FAILED")
.build();
}
});
}
@Override
public void undoProcess(ConsumerRecord<String, SagaPayload> consumerRecord) throws
JustRetryableExecutorException, RetryableExecutorException {
}
}
This example shows how to implement immediate retrying in the MakePaymentEndpoint command endpoint by using the JustRetryableExecutorException. in this example, if the makePayment() method throws a retryable exception, it checks the retry count from the RetryContext that is provided in the payload of the ConsumerRecord. although it has been wrapped with Optional you can safely access it because the retry context is always present if the executor is a non-reactive one. if the retry count has reached the threshold (in this example, it is 3), it throws a NonRetryableExecutorException to stop further immediate retries and indicate to the framework to schedule the retry based on the retry configurations. if the retry count has not reached the threshold, it throws a JustRetryableExecutorException with a message indicating to the framework to invoke an immediate retry. if the immediate retry is successful, it will avoid the delay that is caused by scheduling, and also it will avoid sending redundant messages to the orchestrator for each retry attempt.
Configure Custom AbstractSagaEndpointSchedulerProvider for Non-Reactive Endpoints
Due to the fact that StackSaga works in non-blocking and reactive under the hood, If the endpoint is one of non-reactive, it should be configured the Scheduler for executing the blocking code in a non-blocking way. the framework provides a default shared Scheduler for both primary execution and revert execution of the non-reactive endpoints, but you can configure a custom Scheduler for your endpoints by creating a custom AbstractSagaEndpointSchedulerProvider and configuring it in the endpoint. because sometimes the default shared Scheduler may not be suitable for your use case, for instance, if you want to have a dedicated Scheduler for each endpoint or if you want to configure the Scheduler with specific configurations.
The default shared Scheduler that is provided by the framework is a boundedElastic Scheduler with the fallowing configurations:
-
Primary Execution Scheduler
-
threadCap :
Runtime.getRuntime().availableProcessors() * 10 -
queuedTaskCap:
100_000 -
ttlSeconds:
60 -
daemon:
false
-
-
Revert Execution Scheduler
-
threadCap :
Runtime.getRuntime().availableProcessors() * 3 -
queuedTaskCap:
100_000 -
ttlSeconds:
60 -
daemon:
false
-
Override the Default Scheduler Configurations
If you want to override the default Scheduler configurations for the application level, you can provide the custom configurations bean to override the default configurations in your class path.
@Configuration
public class EndpointSchedulerConfig {
@Bean
public AbstractDefaultPrimaryExecutionSchedulerProvider customSharedPrimaryExecutionSchedulerProvider() {
return new AbstractDefaultPrimaryExecutionSchedulerProvider() {
@Override
protected Scheduler scheduler() {
log.info("lazy initialization of custom shared primary execution scheduler for non-reactive(blocking) execution");
return Schedulers.newBoundedElastic(
100,
Integer.MAX_VALUE,
"custom-do-shared",
60,
false
);
}
};
}
@Bean
public AbstractDefaultRevertExecutionSchedulerProvider customSharedRevertExecutionSchedulerProvider() {
return new AbstractDefaultRevertExecutionSchedulerProvider() {
@Override
protected Scheduler scheduler() {
log.info("lazy initialization of custom shared revert execution scheduler for non-reactive(blocking) execution");
return Schedulers.newBoundedElastic(
20,
Integer.MAX_VALUE,
"custom-undo-shared",
60,
false
);
}
};
}
}
Create a Custom AbstractSagaEndpointSchedulerProvider for an Endpoint
Here is an example of how to create a custom AbstractSagaEndpointSchedulerProvider and configure it in the endpoint.
@Component
public class MakePaymentExecutionSchedulerProvider extends AbstractSagaEndpointSchedulerProvider {
@Override
protected Scheduler scheduler() {
return Schedulers.newBoundedElastic(20, Integer.MAX_VALUE, "demo-name");
}
}
@Slf4j
@SagaEndpoint(
listenerScope = ListenerScope.SHARED,
primaryExecutionSchedulerProvider = MakePaymentExecutionSchedulerProvider.class,
revertExecutionSchedulerProvider = MakePaymentExecutionSchedulerProvider.class
)
public class MakePaymentEndpoint extends CommandEndpoint {
//...
}
the scheduler() method is invoked only once when the application starts and the returned the Scheduler and then framework will cache it for the future use.
|
In this example, we created a custom MakePaymentExecutionSchedulerProvider that extends the AbstractSagaEndpointSchedulerProvider and overrides the scheduler() method to provide a custom Scheduler configuration. then we configured the custom SchedulerProvider in the ReactiveMakePaymentEndpoint for both primary execution and revert execution by using the primaryExecutionSchedulerProvider and revertExecutionSchedulerProvider attributes of the @SagaEndpoint annotation. you can configure different SchedulerProvider for primary execution and revert execution if you want to have different configurations for each execution as well.
Configure RetryTemplate for Non-Reactive Endpoints
As we mentioned above, the non-reactive endpoints use RetryTemplate for handling retries, and you can configure the RetryTemplate to customize the retry behavior for your endpoints. you can configure the RetryTemplateBuilder by defining a bean of type AbstractRetryTemplateProvider in your application context and overriding the retryTemplateBuilder() method to provide your custom configuration for the RetryTemplate. here is an example of how to configure the RetryTemplate for non-reactive endpoints.
@Component
public class MakePaymentRetryTemplateProvider extends AbstractRetryTemplateProvider {
@Override
protected RetryTemplateBuilder retryTemplateBuilder() {
SimpleRetryPolicy simpleRetryPolicy = new SimpleRetryPolicy();
simpleRetryPolicy.setMaxAttempts(3);
return RetryTemplate.builder().customPolicy(simpleRetryPolicy);
}
}
The MakePaymentRetryTemplateProvider class is annotated with @Component to make it a Spring bean.
|
After creating the custom RetryTemplateProvider, you can configure it into the endpoint as below.
@Slf4j
@SagaEndpoint(
topicNameSuffix = "payment.service.make-payment",
listenerScope = ListenerScope.SHARED,
primaryExecutionRetryTemplate = MakePaymentRetryTemplateProvider.class, (1)
revertExecutionRetryTemplate = MakePaymentRetryTemplateProvider.class (2)
)
public class MakePaymentEndpoint extends CommandEndpoint {
//...
}
In this example, the MakePaymentRetryTemplateProvider is configured for both primary execution and revert execution by using the primaryExecutionRetryTemplate and revertExecutionRetryTemplate attributes of the @SagaEndpoint annotation. you can configure different RetryTemplateProvider for primary execution and revert execution if you want to have different retry behavior for each execution.
Reactive (Non-Blocking) Command Endpoint
If you are in a reactive programming environment and want to implement the command endpoints in a non-blocking way, you can extend the ReactiveCommandEndpoint class and implement the doProcess() and undoProcess() methods by using the reactive programming paradigm with Project Reactor’s Mono and Flux types. the theory for implementing the reactive command endpoint is the same as the non-reactive command endpoint, but you need to return a Mono<Void> from both methods and use the reactive operators to handle asynchronous operations and exceptions. for instance, you can use the map(), flatMap(), and onErrorResume() operators to implement the business logic and exception handling in a reactive way.
@Slf4j
@SagaEndpoint( (2)
topicNameSuffix = "payment.service.make-payment",
listenerScope = ListenerScope.SHARED
)
class ReactiveMakePaymentEndpoint extends ReactiveCommandEndpoint { (1)
@Override
public Mono<Void> doProcess(ConsumerRecord<String, SagaPayload> consumerRecord) {
//the theory is the same as the query endpoint.
}
@Override
public Mono<Void> undoProcess(ReceiverRecord<String, SagaPayload> consumerRecord) {
log.info("Message Key (Transaction Id): {}", consumerRecord.key());
log.info("IdempotencyKey: {}", consumerRecord.value().getIdempotencyKey());
final JsonNode lastDomainEntityState = consumerRecord.value().getDomainEntityState();
log.info("Last domain entity state {}", lastDomainEntityState);
final double totalAmount = lastDomainEntityState.get("total_amount").asDouble();
final String paymentRef = lastDomainEntityState.get("payment_reference_id").asText();
this.internalPaymentSerivce
.refundPayment(paymentRef, totalAmount)
.flatMap(response -> {
//update some hints for the next revert execution if needed.
consumerRecord.value().getHintStore().ifPresent(hintStore -> {
hintStore.put("last_refund_time", LocalDateTime.now().toString());
});
return Mono.empty();
})
.onErrorResume(SomeRetryableException.class, e -> {(3)
return Mono.error(RetryableExecutorException.of(e));
});
}
}
doProcess() method implementation is not provided in the example because the main focus is on the undoProcess() method to explain the compensation processing in the command endpoints. but the theory for implementing the doProcess() method is the same as the query endpoint that is explained in the previous section. read the Non-Reactive (Imperative) Query Endpoint to understand how to implement the doProcess() method.
|
| 1 | Create a custom reactive command endpoint class by extending the ReactiveCommandEndpoint class. |
| 2 | Annotate the custom reactive command endpoint class with @SagaEndpoint annotation. it primarily marks the class as a spring bean. |
| 3 | Identify the retryable exceptions that can be thrown from the compensation logic and make sure to handle them by using the onErrorResume() operator to return a Mono.error() with RetryableExecutorException to enable the retrying mechanism for the compensation process. if any unhandled exception is thrown from the undoProcess() method, the compensation process will be stopped and the transaction will be terminated. therefore, it is important to handle the exceptions properly in the compensation logic to ensure that the compensation process can retry when necessary and to avoid unexpected termination of the transaction. |
Based on the RetryableExecutorException Stacksaga framework provide only the asynchronous retrying (scheduling retry). if you want to have the immediate retrying( in the same thread), you can manage it withing the pipeline by using the retryWhen() operator of the reactive programming. it is recommended implement the immediate retrying logic first before throwing the RetryableExecutorException to have a better control over the retrying mechanism and to avoid unnecessary scheduling of retries when the retry can be done immediately. in case the immediate retry is exhausted, you can throw the RetryableExecutorException to schedule the retry based on the retry configurations. read more about Retry to better understand the retrying mechanisms.
|
@SagaEndpoint Annotation
@SagaEndpoint is the main annotation that is used to mark the custom endpoint classes in Stacksaga Kafka worker applications. it is a meta-annotation that is used to create custom annotations for different types of endpoints such as query endpoints and command endpoints. the @SagaEndpoint annotation provides several attributes that can be used to configure the behavior of the endpoint, such as the topic name suffix, listener scope, and other configurations. by using this annotation, you can easily create and configure your custom endpoints in a consistent way across your application. here we are going to see all the attributes of the @SagaEndpoint annotation and how to use them to tuning the behavior of your endpoints for your specific use case.
-
value-
Description: This attribute is used to provide a unique name for the endpoint. it is optional and can be used for better readability and debugging purposes. if not provided, the framework will use the class name as the default value for the endpoint name. Spring bean is created with the name provided in this attribute.
-
-
topicNameSuffix-
provide the suffix of the topic name for this endpoint. the framework will create the real topic name by concatenating the provided suffix as
saga.do.user-service.get-user. the prefixsaga.dois added by the framework to indicate that this topic is used for the primary execution. this is the topic theEventManagerwill send the command messages to trigger the execution of this endpoint. therefore, it is important to provide the correct topic name suffix that matches the topic name that is used in theEventManagerto ensure that the messages are sent to the correct topic and consumed by the correct endpoint.Table 1. Quick Reference: Topic Naming Rules Rule Detail Segment separator
Use
.to separate hierarchical segments (e.g.,order.init,user-service.get-user).Hyphens
Permitted within a segment (e.g.,
user-service,make-payment) but not as the separator itself.Underscores
Not permitted anywhere in the topic name.
Prefix auto-added
The framework prepends
saga.do.for primary topics andsaga.undo.for compensation topics. SotopicNameSuffix = "payment.make"produces real topicsaga.do.payment.make.Topic key
Each topic requires a unique
floatkey within the domain. Positive integers (1, 2, 3…) for primary topics; matching negative values (-1, -2, -3…) for compensation topics. Decimal values (e.g.,1.1) are reserved for the upcoming sub-execution feature — avoid them in the current version.See full specification in Topic Name Specification and Topic Key Specification.
-
-
listenerScope-
Provide the listener scope for the endpoint.
listenerScopedefines how the message listeners are created and managed for the endpoint. it can be eitherListenerScope.SHAREDorListenerScope.ISOLATED. if it is set toListenerScope.SHARED, the framework binds the respective topic to the default shared message listener container. if it is set toListenerScope.ISOLATED, the framework creates a dedicated message listener container for the respective topic. if you want to have an isolated listener for a specific endpoint, you can set thelistenerScopetoListenerScope.ISOLATEDfor that endpoint. this allows you to have better control over the message consumption and processing for that specific endpoint without affecting other endpoints that share the same listener. listenerScope tuning is one of the most important configurations that should be highly considered. read the ListenerScope tuning section in the best practices chapter to understand how to choose the appropriate listener scope for your endpoints based on your use case and requirements.
-
-
processingMode-
Provide the processing mode if the
listenerScopeis set toListenerScope.ISOLATED. see stacksaga-kafka-implementation/worker/worker-configuration.adoc#topic_model_stacksaga_kafka_worker for more details.
-
-
primaryExecutionConcurrency-
Specifies the concurrency level used for primary record processing (for
doProcess()) whenprocessingModeis configured asProcessingMode.CONCURRENT. Multiple records may be processed concurrently using the configured number of worker threads.
The value may be provided either as a numeric literal or as a Spring property placeholder.
For example:
primaryExecutionConcurrency = "10"
primaryExecutionConcurrency = "${saga.sample1.primary.concurrency}"When a property placeholder is used, the value is resolved from the application configuration.
saga.sample1.primary.concurrency=10Default: Runtime.getRuntime().availableProcessors() * 2
-
-
revertExecutionConcurrency-
Specifies the concurrency level used for compensation record processing (for
undoProcess()) whenprocessingModeis configured asProcessingMode.CONCURRENT. Multiple records may be processed concurrently using the configured number of worker threads.
The value may be provided either as a numeric literal or as a Spring property placeholder.
For example:
revertExecutionConcurrency = "5"
revertExecutionConcurrency = "${saga.sample1.revert.concurrency}"When a property placeholder is used, the value is resolved from the application configuration.
saga.sample1.revert.concurrency=5Default:
Runtime.getRuntime().availableProcessors()This is not applicable for the
QueryEndpointbecause it does not have compensation processing, so therevertExecutionConcurrencywill be ignored if it is provided in the@SagaEndpointannotation of aQueryEndpoint.
-
-
primaryExecutionRetryTemplate-
Specifies the
RetryTemplateProviderclass to be used for primary execution retries for non-reactiveCommandEndpointandQueryEndpoint’s `doProcess()method. see ConfigureRetryTemplatefor Non-Reactive Endpoints for more details.
-
-
revertExecutionRetryTemplate-
Specifies the
RetryTemplateProviderclass to be used for compensation execution retries for non-reactiveCommandEndpoint’s `undoProcess()method. see ConfigureRetryTemplatefor Non-Reactive Endpoints for more details.
-
-
primaryExecutionSchedulerProvider-
Only applicable with primary executions (
CommandEndpoint.doProcess(ConsumerRecord),QueryEndpoint.doProcess(ConsumerRecord)). Provides a customAbstractPrimaryExecutionSchedulerProviderfor controlling the scheduler on which the primary execution processing occurs. This is important for isolating blocking operations from the Kafka consumer thread to prevent performance degradation and ensure responsiveness. By default, the framework will use a bounded elastic scheduler suitable for blocking workloads viaDefaultSchedulerConfig, but you can provide your own implementation to customize pool size, naming, and other characteristics. It is not recommended to provide separate schedulers for each unless if it is necessary for your use case, as it may lead to resource contention and inefficient thread usage. In most cases, a shared scheduler for all endpoints is sufficient and more efficient. but using shared default scheduler can lead to starvation of tasks. see the Configure CustomAbstractSagaEndpointSchedulerProviderfor Non-Reactive Endpoints section to see how it can be configured a custom scheduler or see the Override the DefaultSchedulerConfigurations section to see how to override the existing default shared scheduler configurations.
-