SagaEventNavigator

EventNavigator is the where you can navigate the execution after each execution is done. Stacksaga-Kafka supports fully dynamic execution navigation. So you can navigate the execution based on the aggregator state, primary execution-exception, revert HintStore etc.

In SagaEventNavigator, there’s two methods that are invoked by the Stacksaga engine as follows.

  1. onNext()

  2. onNextRevert()

onNext() method

onNext() method is executed by stacksaga-engine after each primary execution is succeeded by providing the update state of the aggregator until the last successful execution, and the recent successful topic name. It gives you the chance to determine what topic should be triggered next. Not only next, the following actions can be taken conditionally in oneNext method.

  1. SagaPrimaryEventAction.next(nextTopic): As we mentioned above you can pass the next topic along with this static method.

  2. SagaPrimaryEventAction.error(exception) Conditionally if you want to stop going forward and start compensation you can return an exception with this method.

    As an alternative approach you can just throw an exception as well but as a best practice make sure to return your exception by wrapping with SagaPrimaryEventAction.error() method.
  3. SagaPrimaryEventAction.done() if all the spans are processed as you expected you can indicate the transaction is successfully done by using this method.

As a best practice, avoid performing any I/O-intensive or CPU-intensive operations inside the event navigator. This method should be used only to evaluate conditions based on the provided parameters.
The reason is that the Stacksaga-engine internally relies on Project Reactor’s reactive pipelines, which run on non-blocking event-loop threads. Executing expensive operations here could block those threads, leading to performance bottlenecks and impacting the entire application.

onNextRevert() method

onNextRevert() is another method that is executed by the framework to get the next action after successfully processing each compensating operation. It is not necessary to be overridden if you have not advanced navigation while reverting. Because all the default reverting (COMMAND_UNDO_ACTION) for the COMMAND_DO_ACTION executions are done in the correct order even you have not overridden and implemented the method manually. It provides you access to the last aggregator state that was the final successful primary execution, primary execution exception that caused while executing primary execution, revert hints-tore and recent successfully reverted topic.

Manual navigation of revert topics in onNextRevert() method.

If you want to add more extra sub executions apart from the main COMMAND_UNDO_ACTION(default revert), you can create extra sub revert executions before and after the default revert execution.
For instance, if the primary execution is DO_MAKE_PAYMENT it should have the default revert as UNDO_MAKE_PAYMENT. by the way if you want to add additional executions before or after UNDO_MAKE_PAYMENT, you can define your topic in CustomSagaTopic and use it for navigating. if the execution should be done before the main compensation execution(UNDO_MAKE_PAYMENT) you can create topics with type of SagaTopicType.COMMAND_UNDO_BEFORE_ACTION and if the execution should be done after the main compensation execution you can create topics with type of SagaTopicType.COMMAND_UNDO_AFTER_ACTION. read more…​

  • Create your custom EventNavigator class and implement it from SagaEventNavigator<A,T> interface.

(1)
@SagaStepExecutor(
        (2)
        rootTopic = "PlaceOrderEventNavigator",
        (3)
        value = "PlaceOrderEventNavigator",
        (4)
        partitions = {0}
)
public class PlaceOrderEventNavigator implements SagaEventNavigator<PlaceOrderAggregator, PlaceOrderTopic> { (5)

    (6)
    @Override
    public SagaPrimaryEventAction<PlaceOrderTopic> onNext(
            (7)
            PlaceOrderTopic recentTopic,
            (8)
            PlaceOrderAggregator aggregator
    ) {
        return switch (recentTopic) {
            case DO_USER_VALIDATE -> SagaPrimaryEventAction.next(PlaceOrderTopic.DO_UPDATE_STOCK);
            case DO_UPDATE_STOCK -> SagaPrimaryEventAction.next(PlaceOrderTopic.DO_MAKE_PAYMENT);
            case DO_MAKE_DELIVERY -> SagaPrimaryEventAction.done();
            default -> SagaPrimaryEventAction.error(new IllegalStateException("Unexpected value: " + recentTopic));
        };
    }

    (9)
    @Override
    public SagaRevertEventAction<PlaceOrderTopic> onNextRevert(
            (10)
            PlaceOrderTopic lastRevertedEvent,
            (11)
            PlaceOrderAggregator aggregator,
            (12)
            NonRetryableExecutorException exception,
            (13)
            RevertHintStore revertHintStore
    ) {
        return switch (lastEvent) {
            case UNDO_UPDATE_STOCK -> SagaRevertEventAction.next(PlaceOrderTopic.UNDO_MAKE_PAYMENT_SUB_BEFORE_1);
            case UNDO_MAKE_PAYMENT_SUB_BEFORE_1 -> SagaRevertEventAction.next(PlaceOrderTopic.UNDO_MAKE_PAYMENT_SUB_BEFORE_2);
            case UNDO_MAKE_PAYMENT -> SagaRevertEventAction.next(PlaceOrderTopic.UNDO_MAKE_PAYMENT_SUB_AFTER_1);
            default -> SagaRevertEventAction.autopilot();
        };
    }
}
1 Annotate the class with @SagaStepExecutor. it enables to acts as