Endpoint Listener Models and Configuration

Endpoint Topic Listener Models

  • Endpoint Topics
    The topics that are used to send the command messages to the worker services are called as Endpoint Topics. these are the topics that you register in the EventManager by overriding the registerTopics() method in stacksaga-kafka-orchestrator and also the topics that are registered via the KafkaEndpoint annotation in the stacksaga-kafka-worker. these If the topics are not exist in the Kafka cluster and auto-creation of topics is enabled framework will create the topics automatically during the application startup. otherwise, you need to create the topics manually in the Kafka cluster before starting the application. The special thing is that both stacksaga-kafka-orchestrator and stacksaga-kafka-worker try to create the topics if they are not exist in the Kafka cluster and auto-creation of topics is enabled to avoid the delay of metadata polling that is caused by the first time topic creation. the reason for both trying to create the topics is the stacksaga-kafka-orchestrator has the awareness of all the configured topics via the EventManager and also the stacksaga-kafka-worker has the awareness of the topics that are relevant to their service via the `SagaEndpoint`s.

Even though there are separate topics in kafka, the framework provides two different listener models for consuming messages from the topics in the worker application based on the listenerScope configuration in the @SagaEndpoint annotation. The fallowing are the two listener models for consuming messages from the topics in the worker application.

Let’s dive into the details of each listener model.

SHARED

  • If the @SagaEndpoint is configured with listenerScope=ListenerScope.SHARED, the framework binds the relevant topic(s) to the shared receiver to consume messages for the respective endpoint. all the endpoints that are configured with listenerScope=ListenerScope.SHARED are bind into this shared KafkaReceiver.

Here is the architecture of the shared listener model for the endpoint topics in stacksaga-kafka-worker application.

Shared kafka receiver in stacksaga kafka worker
  • The consumer group is configured using the following naming convention: {serviceName}-ws where ws stands for worker service. for instance, if the service name is payment-service, the consumer group name will be payment-service-ws.

  • The shared listener/receiver is configured with: auto.offset.reset=earliest This ensures that previously published messages can be consumed after a service restart when no committed offset is available.

  • Message consumption is configured using At-Least-Once delivery semantics to ensure that messages are not lost, even in the event of service downtime or unexpected failures.

  • To guarantee At-Least-Once delivery semantics, each message is acknowledged only after it has been successfully processed.

  • Acknowledged offsets are committed periodically using the framework’s default time-based commit configuration. Therefore, if a failure occurs after a message has been processed and acknowledged but before the offset has been committed, the message may be delivered again and reprocessed after recovery.

  • Processing mode: The shared listener can be configured via stacksaga.kafka.worker.shared-topic-listener.processing-mode property to control the processing mode for the messages consumed by the shared listener. it can be either SharedListenerProcessingMode.CONCURRENT or SharedListenerProcessingMode.PARTITION_ORDERED. the default processing mode for the shared listener is SharedListenerProcessingMode.PARTITION_ORDERED to ensure that messages from the same partition of the same topic are processed in order. in SharedListenerProcessingMode.CONCURRENT mode, messages are processed concurrently without preserving the order of messages from the same partition. And
    In SharedListenerProcessingMode.PARTITION_ORDERED mode, messages from the same partition of the same topic are processed sequentially to preserve the order, while messages from different partitions can be processed concurrently. The default processing mode for the shared listener is SharedListenerProcessingMode.PARTITION_ORDERED to ensure that messages from the same partition of the same topic are processed in order, which is important for maintaining data consistency and integrity in many use cases. however, you can change it to SharedListenerProcessingMode.CONCURRENT if your use case allows for concurrent processing without strict ordering requirements, which can improve throughput and performance. See more relevant Configuration Properties Of stacksaga-kafka-worker

ISOLATED

If the @SagaEndpoint is configured with listenerScope=ListenerScope.ISOLATED, the framework creates a dedicated KafkaReceiver for the respective topic(s) to consume messages for that endpoint. this approach allows for better isolation and control over the message consumption and processing for that specific endpoint without affecting other endpoints that share the same listener. but it can increase the resource consumption as well.

Here is the architecture of the isolated listener model for the endpoint topics in stacksaga-kafka-worker application.

Isolated kafka receiver in stacksaga kafka worker
  • The consumer group is configured using the following naming convention: {serviceName}-ws where ws stands for worker service. for instance, if the service name is payment-service, the consumer group name will be payment-service-ws.

  • Each isolated listener is configured with: auto.offset.reset=earliest This ensures that previously published messages can be consumed after a service restart when no committed offset is available.

  • Message consumption is configured using At-Least-Once delivery semantics to ensure that messages are not lost, even in the event of service downtime or unexpected failures.

  • To guarantee At-Least-Once delivery semantics, each message is acknowledged only after it has been successfully processed.

  • Acknowledged offsets are committed periodically using the framework’s default time-based commit configuration. Therefore, if a failure occurs after a message has been processed and acknowledged but before the offset has been committed, the message may be delivered again and reprocessed after recovery.

    • Scheduler Config For Non-Reactive Endpoint: If the endpoint is non-reactive one, it can be provided a separate Scheduler for the primary execution and compensation execution by using the primaryExecutionSchedulerProvider and revertExecutionSchedulerProvider attributes in the @SagaEndpoint annotation. by default a shared two Schedulers are provided for all the endpoints in the application via AbstractDefaultPrimaryExecutionSchedulerProvider and AbstractDefaultRevertExecutionSchedulerProvider which are configured to use bounded elastic schedulers that are suitable for blocking workloads. but you can provide your own implementation of the AbstractSagaEndpointSchedulerProvider to primary and compensation separately for better isolation and control over the thread management for each execution type of each endpoint. see stacksaga-kafka-implementation/worker/worker-endpoints.adoc#configure_custom_scheduler_for_non_reactive_endpoints for the implementation.

  • Processing mode: The isolated listener can be configured via processingMode attribute in the @SagaEndpoint annotation which determines how messages are processed in the isolated listener. it can be configured for either fallowing modes,

    • IsolatedListenerProcessingMode.CONCURRENT mode, messages are processed concurrently without preserving the order of messages from the same partition.

      If the processing mode IsolatedListenerProcessingMode.CONCURRENT, the concurrency can be customized by using the primaryExecutionConcurrency and revertExecutionConcurrency attributes in the @SagaEndpoint annotation. it allows to control the number of concurrent threads for processing primary and compensation executions respectively.
    • IsolatedListenerProcessingMode.PARTITION_ORDERED: In this mode, messages are processed sequentially per partition while allowing concurrent execution across multiple partitions.
      The default processing mode for the isolated listener is PARTITION_ORDERED to ensure a balance between throughput and consistency while preserving the order of messages within each partition.

    • IsolatedListenerProcessingMode.SEQUENTIAL: In this mode, messages are processed sequentially without any concurrency. this mode is used when the order of processing is critical and must be preserved across all partitions.
      The SEQUENTIAL processing mode is not recommended for high-throughput scenarios due to its sequential nature, which can become a bottleneck. it is recommended to use this mode only when the strict ordering of message processing is a hard requirement and the expected message volume is low to moderate.

Configuration Properties Of stacksaga-kafka-worker

Property DataType Default Value Description

stacksaga.kafka.worker.auto-create-topics

boolean

true

Whether the worker should automatically create the necessary Kafka topics (the topics that configured via `SagaEndpoint`s) if they do not exist.

stacksaga.kafka.worker.shared-topic-listener.processing-mode

SharedListenerProcessingMode

PARTITION_ORDERED

The processing mode for the shared endpoint message listener. Default is PARTITION_ORDERED, which ensures that messages from the same partition of the same topic are processed in order.

stacksaga.kafka.worker.shared-topic-listener.concurrency

int

Runtime.getRuntime().availableProcessors() * 3

The concurrency level for the shared endpoint message listener when the processing mode is set to CONCURRENT. This defines how many messages can be processed in parallel.

Retry Configurations for Non-Reactive Endpoints : Primary execution

stacksaga.kafka.worker.retry.primary.max-attempts

int

3

The maximum number of retry attempts.

stacksaga.kafka.worker.retry.primary.initial-interval

Duration

1s

The initial interval between retry attempts.

stacksaga.kafka.worker.retry.primary.max-interval

Duration

1s

The maximum interval between retry attempts.

stacksaga.kafka.worker.retry.primary.multiplier

double

2.0

The multiplier for the retry interval. For example, with an initial interval of 1 second and a multiplier of 2.0, the retry intervals would be 1s, 2s, 4s, etc., up to the maximum interval.

Retry Configurations for Non-Reactive Endpoints: Revert execution

stacksaga.kafka.worker.retry.revert.max-attempts

int

3

The maximum number of retry attempts.

stacksaga.kafka.worker.retry.revert.initial-interval

Duration

1s

The initial interval between retry attempts.

stacksaga.kafka.worker.retry.revert.max-interval

Duration

1s

The maximum interval between retry attempts.

stacksaga.kafka.worker.retry.revert.multiplier

double

2.0

The multiplier for the retry interval. For example, with an initial interval of 1 second and a multiplier of 2.0, the retry intervals would be 1s, 2s, 4s, etc., up to the maximum interval.