StackSagaKafkaTemplate and Orchestrator Usage

StackSagaKafkaTemplate

StackSagaKafkaTemplate is the main entry point for accessing the StackSaga-Kafka engine (SEC) functionalities in the orchestrator service. it provides the main APIs for starting a new saga transaction and checking the status of the existing transactions.

StackSagaKafkaTemplate can be used in both reactive and non-reactive programming models. if you are in non-reactive environment, it can be used as below.

StackSagaKafkaTemplate in a Non-Reactive Environment

This demonstrates how to use the StackSagaKafkaTemplate to start a new saga transaction for placing an order and also how to check the current state of the transaction.

@Slf4j
@Component
@RequiredArgsConstructor
public class PlaceOrderHandler {

    private StackSagaKafkaTemplate<OrderDomainEntity, PlaceOrderTopic> stackSagaKafkaTemplate; (1)

    public String handle(String username, double amount, List<String> items) {
        String transactionId = this.stackSagaKafkaTemplate
                (2)
                .init(() -> {
                    OrderDomainEntity orderDomainEntity = new OrderDomainEntity();
                    {//initializing the domain entity with the required properties for the saga execution.
                        orderDomainEntity.setUsername(username);
                        orderDomainEntity.setTotalAmount(amount);
                        //...
                    }
                    return orderDomainEntity;
                })
                (3)
                .peek(orderDomainEntity -> {
                    log.info("transactionId {}:", orderDomainEntity.getTransactionId());
                })
                (4)
                .startWith(PlaceOrderTopic.DO_FETCH_USER_DETAILS, OrderEventManager.class)
                (5)
                .execute();
        log.info("Started place order saga with transactionId: {}", transactionId);
        return transactionId;
    }

    public void printCurrentState(String transactionId) {
        TransactionState<OrderDomainEntity> state = this
                .stackSagaKafkaTemplate
                (6)
                .getCurrentState(transactionId)
                .execute();

        TransactionCompleteStatus currentStatus = state.getCurrentStatus(); (7)
        log.info("Current status of transaction {}: {}", transactionId, currentStatus);
        NavigableMap<Integer, SagaExecutionEvent> executionHistory = state.getExecutionHistory();(8)
        log.info("Execution history of transaction {}: {}", transactionId, executionHistory);
        OrderDomainEntity currentDomainEntity = state.getCurrentDomainEntity(); (9)
        log.info("Current domain entity state: {}", currentDomainEntity); (10)
        ZonedDateTime startedDateTime = state.getStartedDateTime(); (11)
        log.info("Transaction {} started at: {}", transactionId, startedDateTime.toLocalDateTime());
    }
}
1 Autowire the StackSagaKafkaTemplate in your class. you have to provide the custom DomainEntity and the custom Topic class as the generic parameters of the StackSagaKafkaTemplate.
2 Use the init operator to initialize the transaction by providing a supplier of the custom DomainEntity. this method is used to create the initial state of the transaction by providing the required properties for the saga execution in the DomainEntity.
3 Use the peek operator (optional) to perform any action with the initialized DomainEntity before starting the execution. the usage of this operator is that is to see the generated transactionId after initializing the DomainEntity because the transactionId is generated and set in the DomainEntity by the framework at the time of passing via init operator. therefore, you can use this operator to see the generated transactionId before starting the execution.
4 Use the startWith operator to specify the first topic that should be triggered to start the execution and also provide the EventManager class that is related to the transaction. this operator is used to indicate the starting point of the execution by providing the first topic that should be triggered and also the EventManager class that is responsible for navigating the execution flow.
5 Use the execute operator to start the execution of the transaction in blocking way.
6 Use the getCurrentState operator to get the current state of the transaction by providing the transactionId. this method is used to retrieve the current state of the transaction at any point in time by providing the transactionId. yu can see all the details about the transaction state in the returned TransactionState object such as the current status of the transaction, the execution history, the current state of the domain entity, the started time and many more details.
7 Get the current status of the transaction from the TransactionState object.
8 Get the execution history of the transaction from the TransactionState object. the execution history is a navigable map that contains all the executed endpoints in the transaction flow (primary and compensation) with their metadata such as the execution status, completed time event name and more.
9 Get the current state of the domain entity from the TransactionState object. this is the latest state of the domain entity after applying all the changes that are done by the executed endpoints in the transaction flow.
10 Log the current state of the domain entity.
11 Get the started time of the transaction from the TransactionState object and log it.
There is another way to being updated about the transaction status changes by using the EventListener.
Go through the Javadoc of the TransactionState class to see all the features that are provided in the transaction state object to get more insights about the transaction.

StackSagaKafkaTemplate in a Reactive Environment

If you are in a reactive environment, you can use the StackSagaKafkaTemplate in a non-blocking way by using the reactive operators provided by the template as below.

@Component
@RequiredArgsConstructor
class ReactivePlaceOrderHandler {
    private StackSagaKafkaTemplate<OrderDomainEntity, PlaceOrderTopic> stackSagaKafkaTemplate;

    public Mono<String> handle(String username, double amount, List<String> items) {
        return this
                .stackSagaKafkaTemplate
                .init(() -> {
                    OrderDomainEntity orderDomainEntity = new OrderDomainEntity();
                    {//initializing the domain entity with the required properties for the saga execution.
                        orderDomainEntity.setUsername(username);
                        orderDomainEntity.setTotalAmount(amount);
                        //...
                    }
                    return orderDomainEntity;
                })
                .peek(orderDomainEntity -> {
                    log.info("transactionId {}:", orderDomainEntity.getTransactionId());
                })
                .startWith(PlaceOrderTopic.DO_FETCH_USER_DETAILS, OrderEventManager.class)
                .executeAsync()
                .doOnNext(transactionId -> {
                    log.info("Started place order saga with transactionId: {}", transactionId);
                });
    }
    public Mono<Void> printAndCurrentState(String transactionId) {
        return this
                .stackSagaKafkaTemplate
                .getCurrentState(transactionId)
                .executeAsync()
                .doOnNext(state -> {
                    TransactionCompleteStatus currentStatus = state.getCurrentStatus();
                    log.info("Current status of transaction {}: {}", transactionId, currentStatus);
                    NavigableMap<Integer, SagaExecutionEvent> executionHistory = state.getExecutionHistory();
                    log.info("Execution history of transaction {}: {}", transactionId, executionHistory);
                    OrderDomainEntity currentDomainEntity = state.getCurrentDomainEntity();
                    log.info("Current domain entity state: {}", currentDomainEntity);
                    ZonedDateTime startedDateTime = state.getStartedDateTime();
                    log.info("Transaction {} started at: {}", transactionId, startedDateTime.toLocalDateTime());
                })
                .then();

    }
}
All the things are the same except the usage of executeAsync instead of execute to start the execution in a non-blocking way and also to get the current state in a non-blocking way. read the blocking example to get the details explained about the usage of the StackSagaKafkaTemplate.

Transaction State Change Listeners

After handing over the transaction to the StackSaga-Kafka engine by calling the execute or executeAsync method of the StackSagaKafkaTemplate, the transaction is being executed and updated the status time to time after each execution (primary and compensation) in the transaction flow. so if you want to keep listening to the transaction status changes and perform any action based on that, framework provides the way to do that.

It also can be implemented based on the environment.

  1. TransactionEventListener for non-reactive environment.

  2. ReactiveTransactionEventListener for reactive environment.

If you want to get notified about the transaction status changes, there are two ways are available in the framework.

  1. Using StackSagaKafkaTemplate.getCurrentState

  2. Using TransactionEventListener or ReactiveTransactionEventListener.

using StackSagaKafkaTemplate.getCurrentState method to get the current state of the transaction by providing the transactionId approach is not efficient because when you call this method, the engine fetches the current state of the transaction from the event-store. but if you are using the listeners, you will be notified about the transaction status changes after each execution in the transaction flow without the need to call any method to get the current state of the transaction because the current state of the transaction is being passed to the callback method of those listeners realtime. due to it is happening realtime you don’t want to pool the transaction state. for instance in the place-order example, after executing you can push the notification to update the user about the order status changes by using those listeners. but in some cases you might want to get the current state of the transaction even after the transaction is completed. for instance after use might need to see the details after placing order. then you can use the StackSagaKafkaTemplate.getCurrentState method to get the current state of the transaction by providing the transactionId. The use cases are different for both approaches, so you can choose the one that is suitable for your use case.
While you are using the onChange method of the listeners, be aware that the method is invoked after each execution in the transaction flow, so make sure to perform only the required action in that method and avoid doing any heavy processing in that method because it can be caused to delay the entire transaction flow. if you want to do some heavy processing based on the transaction status changes, it is recommended make a deep copy of the TransactionState object and then perform the heavy processing with a separate thread by letting the saga thread to continue the execution without being unnecessarily delayed. or you can use the StackSagaKafkaTemplate.getCurrentState method to get the current state of the transaction and then perform the heavy processing based on that.

TransactionEventListener

You can implement the TransactionEventListener interface to listen to the transaction events in a non-reactive environment as below.

@Slf4j
@Component
@RequiredArgsConstructor
public class PlaceOrderHandler implements TransactionEventListener<OrderDomainEntity> {
    @Override
    public void onStateChanged(TransactionState<OrderDomainEntity> state) {
        TransactionCompleteStatus currentStatus = state.getCurrentStatus();
        log.info("Current status of transaction: {}", currentStatus);
        NavigableMap<Integer, SagaExecutionEvent> executionHistory = state.getExecutionHistory();
        log.info("Execution history of transaction: {}", executionHistory);
        OrderDomainEntity currentDomainEntity = state.getCurrentDomainEntity();
        log.info("Current domain entity state: {}", currentDomainEntity);
        ZonedDateTime startedDateTime = state.getStartedDateTime();
        log.info("Transaction started at: {}", startedDateTime.toLocalDateTime());
    }
}

ReactiveTransactionEventListener

You can implement the ReactiveTransactionEventListener interface to listen to the transaction events in reactive environment as below.

@Slf4j
@Component
@RequiredArgsConstructor
public class ReactivePlaceOrderHandler implements ReactiveTransactionEventListener<OrderDomainEntity> {
    @Override
    public Mono<Void> onStateChanged(TransactionState<OrderDomainEntity> transactionState) {
        return Mono
                .just(transactionState)
                .doOnNext(state -> {
                    TransactionCompleteStatus currentStatus = state.getCurrentStatus();
                    log.info("Current status of transaction: {}", currentStatus);
                    NavigableMap<Integer, SagaExecutionEvent> executionHistory = state.getExecutionHistory();
                    log.info("Execution history of transaction: {}", executionHistory);
                    OrderDomainEntity currentDomainEntity = state.getCurrentDomainEntity();
                    log.info("Current domain entity state: {}", currentDomainEntity);
                    ZonedDateTime startedDateTime = state.getStartedDateTime();
                    log.info("Transaction started at: {}", startedDateTime.toLocalDateTime());
                })
                .flatMap(orderDomainEntityTransactionState -> {
                    //simulate some async processing here.
                    return Mono.delay(Duration.ofSeconds(1))
                            .then();
                });
    }
}

StackSaga team suggests to implement the saga transaction related things in a separate class called Handler that is responsible for starting, listening and checking the status of the saga transactions. The below code snippet is showing the recommended architecture for the Handler.

@Slf4j
@Component
@RequiredArgsConstructor
public class PlaceOrderHandler implements TransactionEventListener<OrderDomainEntity> {

    private StackSagaKafkaTemplate<OrderDomainEntity, PlaceOrderTopic> stackSagaKafkaTemplate;

    public String handle(String username, double amount, List<String> items) { (1)
        return this
                .stackSagaKafkaTemplate
                .init(() -> {
                  //....
                })
                .....
    }

    public CustomOrderStatusView getCurrentState(String orderId) { (2)
        return this
                .stackSagaKafkaTemplate
                .getCurrentState(orderId)
                ......

    }
    @Override
    public void onStateChanged(TransactionState<OrderDomainEntity> transactionState) { (3)
        //....
    }
}
1 The handle method is responsible for starting the saga transaction by using the StackSagaKafkaTemplate and providing the required parameters to start the transaction.
2 Fetch the current state of the transaction by using the StackSagaKafkaTemplate
3 Implement the onStateChanged method to listen to the transaction status changes by implementing the TransactionEventListener interface and override the onStateChanged method.

Configuration Properties Of stacksaga-kafka-orchestrator

Property DataType Default Value Description

stacksaga.kafka.orchestrator.domain-entity-scan

String[]

[]

A comma-separated list of package names to scan for @SagaDomainEntity annotated classes. for instance, com.example.domain,com.example.anotherdomain.

stacksaga.kafka.orchestrator.auto-create

boolean

true

A flag to enable or disable automatic creation of Kafka topics when the application starts.

stacksaga.kafka.orchestrator.shared-response-listener.processing-mode

SharedListenerProcessingMode

PARTITION_ORDERED`

The processing mode for the shared response listener. see more stacksaga-kafka-implementation/orchestrator/topics-event-manager.adoc#isolated_listener_models_for_orchestrator for more details about the processing modes.

stacksaga.kafka.orchestrator.shared-response-listener.concurrency

int

128

if the shared-response-listener.processing-mode is set to CONCURRENT, this property defines the level of concurrency for processing messages in the shared response listener. it is ignored for other processing modes.