Endpoint Listener Models and Configuration
Endpoint Topic Listener Models
-
Endpoint Topics
The topics that are used to send the command messages to the worker services are called asEndpoint Topics. these are the topics that you register in theEventManagerby overriding theregisterTopics()method instacksaga-kafka-orchestratorand also the topics that are registered via theKafkaEndpointannotation in thestacksaga-kafka-worker. these If the topics are not exist in the Kafka cluster and auto-creation of topics is enabled framework will create the topics automatically during the application startup. otherwise, you need to create the topics manually in the Kafka cluster before starting the application. The special thing is that bothstacksaga-kafka-orchestratorandstacksaga-kafka-workertry to create the topics if they are not exist in the Kafka cluster and auto-creation of topics is enabled to avoid the delay ofmetadata pollingthat is caused by the first time topic creation. the reason for both trying to create the topics is thestacksaga-kafka-orchestratorhas the awareness of all the configured topics via theEventManagerand also thestacksaga-kafka-workerhas the awareness of the topics that are relevant to their service via the `SagaEndpoint`s.
Even though there are separate topics in kafka, the framework provides two different listener models for consuming messages from the topics in the worker application based on the listenerScope configuration in the @SagaEndpoint annotation.
The fallowing are the two listener models for consuming messages from the topics in the worker application.
Let’s dive into the details of each listener model.
SHARED
-
If the
@SagaEndpointis configured withlistenerScope=ListenerScope.SHARED, the framework binds the relevant topic(s) to the shared receiver to consume messages for the respective endpoint. all the endpoints that are configured withlistenerScope=ListenerScope.SHAREDare bind into this sharedKafkaReceiver.
Here is the architecture of the shared listener model for the endpoint topics in stacksaga-kafka-worker application.
-
The consumer group is configured using the following naming convention:
{serviceName}-wswherewsstands for worker service. for instance, if the service name ispayment-service, the consumer group name will bepayment-service-ws. -
The shared listener/receiver is configured with:
auto.offset.reset=earliestThis ensures that previously published messages can be consumed after a service restart when no committed offset is available. -
Message consumption is configured using At-Least-Once delivery semantics to ensure that messages are not lost, even in the event of service downtime or unexpected failures.
-
To guarantee At-Least-Once delivery semantics, each message is acknowledged only after it has been successfully processed.
-
Acknowledged offsets are committed periodically using the framework’s default time-based commit configuration. Therefore, if a failure occurs after a message has been processed and acknowledged but before the offset has been committed, the message may be delivered again and reprocessed after recovery.
-
Processing mode: The shared listener can be configured via
stacksaga.kafka.worker.shared-topic-listener.processing-modeproperty to control the processing mode for the messages consumed by the shared listener. it can be eitherSharedListenerProcessingMode.CONCURRENTorSharedListenerProcessingMode.PARTITION_ORDERED. the default processing mode for the shared listener isSharedListenerProcessingMode.PARTITION_ORDEREDto ensure that messages from the same partition of the same topic are processed in order. inSharedListenerProcessingMode.CONCURRENTmode, messages are processed concurrently without preserving the order of messages from the same partition. And
InSharedListenerProcessingMode.PARTITION_ORDEREDmode, messages from the same partition of the same topic are processed sequentially to preserve the order, while messages from different partitions can be processed concurrently. The default processing mode for the shared listener isSharedListenerProcessingMode.PARTITION_ORDEREDto ensure that messages from the same partition of the same topic are processed in order, which is important for maintaining data consistency and integrity in many use cases. however, you can change it toSharedListenerProcessingMode.CONCURRENTif your use case allows for concurrent processing without strict ordering requirements, which can improve throughput and performance. See more relevant Configuration Properties Ofstacksaga-kafka-worker
ISOLATED
If the @SagaEndpoint is configured with listenerScope=ListenerScope.ISOLATED, the framework creates a dedicated KafkaReceiver for the respective topic(s) to consume messages for that endpoint.
this approach allows for better isolation and control over the message consumption and processing for that specific endpoint without affecting other endpoints that share the same listener.
but it can increase the resource consumption as well.
Here is the architecture of the isolated listener model for the endpoint topics in stacksaga-kafka-worker application.
-
The consumer group is configured using the following naming convention:
{serviceName}-wswherewsstands for worker service. for instance, if the service name ispayment-service, the consumer group name will bepayment-service-ws. -
Each isolated listener is configured with:
auto.offset.reset=earliestThis ensures that previously published messages can be consumed after a service restart when no committed offset is available. -
Message consumption is configured using At-Least-Once delivery semantics to ensure that messages are not lost, even in the event of service downtime or unexpected failures.
-
To guarantee At-Least-Once delivery semantics, each message is acknowledged only after it has been successfully processed.
-
Acknowledged offsets are committed periodically using the framework’s default time-based commit configuration. Therefore, if a failure occurs after a message has been processed and acknowledged but before the offset has been committed, the message may be delivered again and reprocessed after recovery.
-
Scheduler Config For Non-Reactive Endpoint: If the endpoint is non-reactive one, it can be provided a separate
Schedulerfor the primary execution and compensation execution by using theprimaryExecutionSchedulerProviderandrevertExecutionSchedulerProviderattributes in the@SagaEndpointannotation. by default a shared twoSchedulersare provided for all the endpoints in the application viaAbstractDefaultPrimaryExecutionSchedulerProviderandAbstractDefaultRevertExecutionSchedulerProviderwhich are configured to use bounded elastic schedulers that are suitable for blocking workloads. but you can provide your own implementation of theAbstractSagaEndpointSchedulerProviderto primary and compensation separately for better isolation and control over the thread management for each execution type of each endpoint. see stacksaga-kafka-implementation/worker/worker-endpoints.adoc#configure_custom_scheduler_for_non_reactive_endpoints for the implementation.
-
-
Processing mode: The isolated listener can be configured via
processingModeattribute in the@SagaEndpointannotation which determines how messages are processed in the isolated listener. it can be configured for either fallowing modes,-
IsolatedListenerProcessingMode.CONCURRENTmode, messages are processed concurrently without preserving the order of messages from the same partition.If the processing mode IsolatedListenerProcessingMode.CONCURRENT, the concurrency can be customized by using theprimaryExecutionConcurrencyandrevertExecutionConcurrencyattributes in the@SagaEndpointannotation. it allows to control the number of concurrent threads for processing primary and compensation executions respectively. -
IsolatedListenerProcessingMode.PARTITION_ORDERED: In this mode, messages are processed sequentially per partition while allowing concurrent execution across multiple partitions.
The default processing mode for the isolated listener isPARTITION_ORDEREDto ensure a balance between throughput and consistency while preserving the order of messages within each partition. -
IsolatedListenerProcessingMode.SEQUENTIAL: In this mode, messages are processed sequentially without any concurrency. this mode is used when the order of processing is critical and must be preserved across all partitions.
TheSEQUENTIALprocessing mode is not recommended for high-throughput scenarios due to its sequential nature, which can become a bottleneck. it is recommended to use this mode only when the strict ordering of message processing is a hard requirement and the expected message volume is low to moderate.
-
Configuration Properties Of stacksaga-kafka-worker
| Property | DataType | Default Value | Description |
|---|---|---|---|
|
|
|
Whether the worker should automatically create the necessary Kafka topics (the topics that configured via `SagaEndpoint`s) if they do not exist. |
|
|
|
The processing mode for the shared endpoint message listener. Default is |
|
|
|
The concurrency level for the shared endpoint message listener when the processing mode is set to |
Retry Configurations for Non-Reactive Endpoints : Primary execution |
|||
|
|
|
The maximum number of retry attempts. |
|
|
|
The initial interval between retry attempts. |
|
|
|
The maximum interval between retry attempts. |
|
|
|
The multiplier for the retry interval. For example, with an initial interval of 1 second and a multiplier of 2.0, the retry intervals would be 1s, 2s, 4s, etc., up to the maximum interval. |
Retry Configurations for Non-Reactive Endpoints: Revert execution |
|||
|
|
|
The maximum number of retry attempts. |
|
|
|
The initial interval between retry attempts. |
|
|
|
The maximum interval between retry attempts. |
|
|
|
The multiplier for the retry interval. For example, with an initial interval of 1 second and a multiplier of 2.0, the retry intervals would be 1s, 2s, 4s, etc., up to the maximum interval. |