StackSaga Demo (Core concept practice)
Overview
In this demo, we are focusing on how to implement the StackSaga things in the spring boot in microservice architecture.
IF you are new to StackSaga, it is recommended to read the architecture and reference documentation for reading individual components.
In this example, we have 4 utility microservices.
,payment-service
and stock-service.
and as the proxy server we use Spring Cloud Api Gateway.
The requirement is the clients can place an order and then the order request should be completed go through these 4 services.
Each services' responsibilities as follows.
order-service
|
Order service is responsible for accepting the place order request and managing the entire transaction as the orchestrator. and also as a utility service, the service is responsible for storing the order related specific data in the order-service database. |
user-service
|
To make the process we want to have the user details like delivery address. User-service provides the endpoints to fetch the user’s data. |
payment-service
|
In the placing order process, the user service involves to execute to main steps.
|
stock-service
|
After making the pre-auth process, we should update the real stock.
|
There are 3 utility services in the cluster called order-service,
user-service
and stock-service.
as well as Spring Cloud Api Gateway is used as the (In K8s it is can be called ingress service) Application Load Balancer (ALB) and as well as the Reverse proxy server.
The order-service
act as the StackSaga orchestrator service here.
As mentioned above, Eureka or any of Discovery clients are not used for Service-Discovery and Service-Registry.
To overcome the capability of Service-Discovery, it is used inbuilt Kubernetes Service.
High-level Implementation Diagram.
stack-saga-icon.svg[width=20] You can see below what we are going to implement from the general perspective.
-
In the diagram, StackSaga icons are shown which parts StackSaga interact.
-
Red lines are shown how the StackSaga Admin interacts with the process and the request-flow.
-
Black lines are shown how the regular requests are done.
High-level implementation diagram with Kubernetes Flags.
-
The Kubernetes related blue icons indicate how Kubernetes interacts with the deployment.
Implementation
In this demo we are not going to implement all the functionalities that StackSaga provides in-detail. Here we focus how a StackSaga application set is deployd in the Kubernetes cluster and how the communication part is done. If you want to practice all the features in detail, you can refer this comprehensive demo and it will describe all the topics step by step. |
As the first step, let’s create our services one by one.
Create Order-Service (Orchestrator Service)
The order-service
is little bit special than the other utility services, because the order-service
is the responsible for managing the entire transaction here.
That’s why it called as Orchestrator Service.
And the next reason is that StackSaga involves that kind of service in your entire system.
Selecting a service as an Orchestrator service or not depends on your requirement.
Create order the project with spring initializer
Go to the spring initializer and create your project with the following Dependencies.
-
spring-boot-starter-web
-
spring-boot-starter-data-jpa
-
mysql-connector-java
-
lombok
In general, you would add spring-eureka-client but in this example eureka is not used.
|
After creating your project, add the StackSaga dependencies in the dependencies section. There are 3 dependencies to be added based on your implementation.
-
You can add the
stacksaga-spring-boot-starter
dependency based on the spring version that you wish to use. In this example, we will be using spring2.X.- Spring 2.X
-
<dependency> <groupId>org.stacksaga</groupId> <artifactId>stacksaga-spring-boot-2-starter</artifactId> <version>1.0.0</version> </dependency>
- Spring 3.X
-
<dependency> <groupId>org.stacksaga</groupId> <artifactId>stacksaga-spring-boot-2-starter</artifactId> <version>1.0.0</version> </dependency>
-
Due to the database has been selected as Mysql, you have to add
stacksaga-mysql-support
dependency.- :
-
<dependency> <groupId>org.stacksaga</groupId> <artifactId>stacksaga-mysql-support</artifactId> <version>1.0.0</version> </dependency>
-
Due to we are going to deploy the application in the Kubernetes cluster environment,
stacksaga-connect-k8s-support
dependency should be added.- :
-
<dependency> <groupId>org.stacksaga</groupId> <artifactId>stacksaga-connect-k8s-support</artifactId> <version>1.0.0</version> </dependency>
Finally, the pom.xml
file will be like below.
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.0</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>org.example</groupId>
<artifactId>order-service</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<spring-cloud.version>2021.0.3</spring-cloud.version>
</properties>
<dependencies>
...
...
<!--othe dependencies-->
<!--StackSaga related dependacies -->
<dependency>
<groupId>org.stacksaga</groupId>
<artifactId>stacksaga-spring-boot-2-starter</artifactId>
<version>1.0.0</version>
</dependency>
<dependency>
<groupId>org.stacksaga</groupId>
<artifactId>stacksaga-mysql-support</artifactId>
<version>1.0.0</version>
</dependency>
<dependency>
<groupId>org.stacksaga</groupId>
<artifactId>stacksaga-connect-k8s-support</artifactId>
<version>1.0.0</version>
</dependency>
</dependencies>
</project>
Order-Service Overview.
You know that we have to access more services besides this order-service
to fully complete the entire place-order request.
To access those services in general, you would use service-layer and call those services manually by using any httpclient like RestTemplate, Feign client, Okhttp, and so on.
Instead of that manual unorganized way, StackSaga offers better wrapper blocks for each execution.
It is called as executors
.
Read the reference documentation to have a better understanding and best practices regarding the
SagaExecutors
.
According to the Saga design pattern, you know that we should have a Compensating process for all the execution (All the executions that make some changes in a database). Due to the fact that the StackSaga framework is one of Saga implementations, it should be provided the primary execution and also a Compensating execution to make a Compensating when the entire process is failed at any point.
Based on having a Compensating or not, the executors are two types called command-executor
and query-executor
.
command-executor
have both primary execution and the Compensating execution.
If it is query-executor
, it has only primary execution.
How StackSaga combines with Controller Service and Repository architecture?
In spring applications we follow controller
, service
and repository
layered architecture.
StackSaga suggest another middle layer between the controller
layer and the service
layer called executor-layer
.
The executor-layer
is the place the executors are created.
Let’s see step by step how StackSaga involves the process.
Here you can see the entire process in a diagram from based on the order-service (orchestrator-service).
In brief, The client makes an order place request and the request comes to the order-service via the api-cloud-gateway.
And then we initialize a special object called Aggregator
and it is handed over to the SEC (Saga Execution coordinator), and the SEC will handle the entire execution by executing each every atomic execution.
The atomic executions have been provided inside the executors.
Here you can see the summarized table for your convenience.
Executor Class Name | Execution_Requirement | Target_Service | Type | Process-Execution | Revert-Execution |
---|---|---|---|---|---|
Fetch the user’s details |
|
QUERY_EXECUTOR |
|
- |
|
Initialize the order at the first. |
|
COMMAND_EXECUTOR |
|
|
|
Make the Pre-Auth process. |
|
COMMAND_EXECUTOR |
|
|
|
Update the stock from the store. |
|
COMMAND_EXECUTOR |
|
|
|
Finalize the order by making the real payment. |
|
COMMAND_EXECUTOR |
|
? |
Creating the aggregator
You know that the request comes to the order-service at the first (for downstream services), and it tries to handle the entire request by calling to other utility services.
To start the execute the request, the order-service
should have an Aggregator class that extend from the SagaAggregator
class.
And as well as to verify the aggregator object is serializable well or not, we should provide a sagaSerializable's implementation.
Read the reference documentation to have a better understanding and best practices regarding the
SagaAggregator
.
package org.example.aggregator;
import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.Getter;
import lombok.Setter;
import org.example.aggregator.dto.OrderItem;
import org.stacksaga.Aggregator;
import org.stacksaga.SagaSerializable;
import org.stacksaga.core.annotation.SagaAggregator;
import org.stacksaga.core.annotation.SagaAggregatorVersion;
import java.util.List;
@SagaAggregator(
version = @SagaAggregatorVersion(major = 1, minor = 0, patch = 0),
name = "PlaceOrderAggregator",
sagaSerializable = PlaceOrderAggregatorSerializer.class
)
@Getter
@Setter
@ToString
public class PlaceOrderAggregator extends Aggregator {
/**
* The relevant user to the order.
*/
@JsonProperty("user_id")
private String userId;
/**
* The item list that user order.
*/
@JsonProperty("items")
private List<OrderItem> items;
/**
* The amount to be paid.
*/
@JsonProperty("amount")
private Double amount;
/**
* Pre-Auth Reference for release or make the payment.
*/
@JsonProperty("pre_auth_ref")
private String preAuthRef;
/**
* Payment ID after the payment.
*/
@JsonProperty("payment_id")
private String paymentId;
@JsonProperty("executions")
private List<String> executions = new ArrayList<>();
@JsonIgnore
private Boolean hasRevertError = false;
public PlaceOrderAggregator() {
super(PlaceOrderAggregator.class);
}
}
class PlaceOrderAggregatorSerializer extends SagaSerializable<PlaceOrderAggregator> {
public PlaceOrderAggregatorSerializer() {
PlaceOrderAggregator placeOrderAggregator = new PlaceOrderAggregator();
placeOrderAggregator.setUserId("user-1");
placeOrderAggregator.setAmount(100.00);
placeOrderAggregator.setItems(new ArrayList<>());
placeOrderAggregator.getItems()
.add(OrderItem.builder()
.itemId("item-1")
.itemPrice(100.00)
.build()
);
this.put("sample-1", placeOrderAggregator);
}
}
To make the code more readable and clear, I have used lombok
and builder pattern for creating the objects through the demo.
It is not required, and you are free to use the traditional way by creating getters and setters manual.
|
In the PlaceOrderAggregator
has the data that we want to access while the entire transaction process.
Those data will be updated time to time from each execution.
Now the aggregator is ready to store and carry out the data throughout the entire process.
Creating OrderController (API Endpoint).
As per the diagram, you know that the request comes to the /order/place
endpoint and also the handing over the execution to the stacksaga is happened in this controller class.
@RestController
@RequestMapping("/order")
@RequiredArgsConstructor
public class OrderController {
(1)
private final SagaTemplate<PlaceOrderAggregator> placeOrderAggregatorSagaTemplate;
@PostMapping("/place")
@ResponseStatus(HttpStatus.ACCEPTED)
public PlaceOrderDto.ResponseBody createOrder(@RequestBody PlaceOrderDto.RequestBody requestBody) {
(2)
final PlaceOrderAggregator placeOrderAggregator = new PlaceOrderAggregator();
placeOrderAggregator.setUserId(requestBody.getUserId());
placeOrderAggregator.setItems(requestBody.getItems());
placeOrderAggregator.setAmount(
requestBody.getItems()
.stream()
.map(orderItem -> orderItem.getItemPrice() * orderItem.getQty())
.mapToDouble(Double::doubleValue)
.sum()
);
placeOrderAggregator.setHasRevertError(requestBody.getHasRevertError());
placeOrderAggregator.getExecutions().add("INIT:" + LocalDateTime.now());
log.debug("placeOrderAggregator:{}", placeOrderAggregator);
(3)
placeOrderAggregatorSagaTemplate.process(
placeOrderAggregator,
UserDetailExecutor.class
);
(4)
return PlaceOrderDto.ResponseBody.builder()
.orderId(placeOrderAggregator.getAggregatorTransactionId())
.message("order has been submitted successfully")
.build();
}
}
1 | Autowire the SagaTemplate for handing over the execution to the framework.
Your target aggregator class should be provided to the SagaTemplate as a type. |
2 | Initialize the PlaceOrderAggregator [] and set the data that should be added initially. In our case, the necessary data that are given from the endpoint are added to the aggregator object. You can see the initialized data at the dashboard like this. |
3 | Handing over the execution to the framework by passing the initialized aggregator object, and the executor class that the execution should be started. From here the execution will be handled by the StackSaga execution coordinator (SEC) asynchronously. |
4 | Build the response object to the client by using the aggregatorTransactionId .
aggregatorTransactionId can access through your aggregator object due to that id comes from the Aggregator super. |
Related Classes' links
Creating OrderInitializeExecutor
You know that the request comes to the OrderController
and the request is transferred to the UserDetailExecutor
as the first step to collect the user details.
And next, we are going to initiate the order in the own service (order-service).
To initiate the order will be creating an executor called OrderInitializeExecutor
.
The OrderInitializeExecutor should be a command-executor because the primary execution (updating order database) makes some changes in the database, and then if a failure occurred at some point after executing the order initialization process, it should be recovered again.
(1)
@SagaExecutor(executeFor = "order-service", liveCheck = false, value = "OrderInitializeExecutor")
@RequiredArgsConstructor
public class OrderInitializeExecutor implements CommandExecutor<PlaceOrderAggregator> { (2)
(3)
private final OrderService orderService;
private final ObjectMapper objectMapper;
@Override (4)
public ProcessStepManager<PlaceOrderAggregator> doProcess(PlaceOrderAggregator currentAggregator, ProcessStepManagerUtil<PlaceOrderAggregator> stepManager) throws RetryableExecutorException, NonRetryableExecutorException {
try {
currentAggregator.getExecutions().add(this.getClass().getSimpleName() + ":" + LocalDateTime.now());
(5)
this.orderService.initialize(
OrderEntity
.builder()
.orderId(currentAggregator.getAggregatorTransactionId())
.userId(currentAggregator.getUserId())
.createAt(LocalDateTime.now())
.isCancelled(false)
.items(objectMapper.writeValueAsString(currentAggregator.getItems()))
.build()
);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
(6)
return stepManager.next(PreAuthExecutor.class, OrderStatus.INITIALIZED_ORDER);
}
@Override
public SagaExecutionEventName doRevert(NonRetryableExecutorException processException, PlaceOrderAggregator finalAggregatorState, RevertHintStore revertHintStore) throws RetryableExecutorException {
(7)
this.orderService.cancelOrder(finalAggregatorState.getAggregatorTransactionId());
(8)
revertHintStore.put(
String.valueOf(System.currentTimeMillis()),
this.getClass().getSimpleName() + ":doRevert"
);
(9)
if (finalAggregatorState.getHasRevertError()) {
throw new RuntimeException("dummy error for revert-failed.");
}
(10)
return OrderStatus.CANCELED_ORDER;
}
}
1 | Annotate the OrderInitializeExecutor class as a SagaExecutor by using @SagaExecutor annotation.
Read more about |
||
2 | The class has been extended from the CommandExecutor .
Read more about |
||
3 | Autowire the OrderService service to make the execution.
(ObjectMapper has been Autowired just for getting the JSON string) |
||
4 | Overridden the doProcess and doRevert methods. |
||
5 | Initialize the order-details by calling the orderService. | ||
6 | After Initializing the order successfully, SEC is navigated to the next executor by using stepManager.next .
The next executor is PreAuthExecutor and also the current executed is marked as INITIALIZED_ORDER in the event-store. |
||
7 | The order is marked as canceled one if the primary execution should be reverted. |
||
8 | Update the hint-store object by adding current millisecond and the revert method name with the executor name while the revert process. In your real application, the hint-store can be used for adding some data regarding the revert process. It can be accessed from the next revert process. |
||
9 | We used a variable called hasRevertError in the aggregator to test a revert-failed transaction.
If we have passed the hasRevertError value as true , throw an exception to terminate the transaction.
|
||
10 | If the canceling process is successfully executed, return the event name as CANCELED_ORDER. |
Creating UserDetailExecutor
In the OrderController
class The UserDetailExecutor
has been mentioned as the initial executor.
As per the diagram, you know that executor should be a Query-Executor
.
Because the requirement is only fetching the user details.
Fetching user details doesn’t change any data.
(1)
@SagaExecutor(executeFor = "user-service", liveCheck = false, value = "UserDetailExecutor")
@RequiredArgsConstructor
public class UserDetailExecutor implements QueryExecutor<PlaceOrderAggregator> { (2)
(3)
private final UserService userService;
@Override
public ProcessStepManager<PlaceOrderAggregator> doProcess(
PlaceOrderAggregator currentAggregator, (4)
ProcessStepManagerUtil<PlaceOrderAggregator> stepManager (5)
) throws
RetryableExecutorException, NonRetryableExecutorException (6)
{
currentAggregator.getExecutions().add(this.getClass().getSimpleName()+":"+ LocalDateTime.now());
(7)
UserDetailDto.ResponseBody userDetails = this.userService.getUserDetails(currentAggregator.getUserId());
(8)
currentAggregator.setDeliveryDetails(userDetails.getDeliveryDetails());
return stepManager.next(OrderInitializeExecutor.class, OrderStatus.USER_DETAILS_FETCHED); (9)
}
}
1 | Annotate the UserDetailExecutor class as a SagaExecutor by using @SagaExecutor annotation.
|
2 | Implement the UserDetailExecutor from the QueryExecutor by passing the target Aggregator.
|
3 | Autowire the UserService
to make the execution by calling the user-service . |
4 | you will have the currentAggregator object through the method parameter. due to this executor is the 1st executor, the currentAggregator object contains the initialized values from the OrderController class. |
5 | stepManager helps you to navigate the execution to the next executor. |
6 | If you throw an exception, that is how the framework knows that the executor has an exception at this moment.
Based on the exception type that you have thrown, The framework decides that the process should be started the revert process or either process should be stopped temporally.
IF the exception is a
|
7 | By using the UserService’s getUserDetails() method, fetches the user’s data from user user-service . |
8 | After receiving the user’s detail the currentAggregator is updated with user’s details.
Therefore, you can access the user’s detail within the next executor. |
9 | By using the stepManager , navigates the execution to the next executor called and the event name marked as USER_DETAILS_FETCHED OrderInitializeExecutor. |
Creating UserService.
Userservice
is a regular spring service that creates for calling user-service
.
Due to the user-service
is another utility service we have to call it through http or any other protocol that target service supports (Most probably API endpoints are exposed by http protocol).
This example’s user-service
also support http endpoint to access the API.
Therefore, we will be using rest template as http client.
But you can use any Http client like Feign-client, Okhttp, Spring-webclient and so on.
@Service
@RequiredArgsConstructor
public class UserService {
private final RestTemplate restTemplate;
public UserDetailDto.ResponseBody getUserDetails(String userId)
throws
NonRetryableExecutorException, RetryableExecutorException (1)
{
try {
(2)
UserDetailDto.ResponseBody responseBody = this.restTemplate.getForObject(
"http://user-service/users/{userId}",
UserDetailDto.ResponseBody.class,
userId
);
assert responseBody != null;
return responseBody;
} catch (HttpClientErrorException ex) { (3)
// This exception is thrown for HTTP 4xx errors (Client errors)
// You can handle specific HTTP error codes here
if (ex.getStatusCode().equals(HttpStatus.NOT_FOUND)) {
(4)
throw NonRetryableExecutorException
.buildWith(
new RuntimeException("User not found"),
""
)
.put("reason", "User not found")
.build();
} else {
(5)
throw NonRetryableExecutorException.buildWith(ex, "").build();
}
} catch (HttpServerErrorException ex) { (6)
// This exception is thrown for HTTP 5xx errors (Server errors)
// You can handle specific HTTP error codes here
if (ex.getStatusCode().equals(HttpStatus.INTERNAL_SERVER_ERROR)) {
(7)
throw NonRetryableExecutorException.buildWith(ex, "").build();
} else {
(8)
//502 , 503, 504, 509 etc.
throw RetryableExecutorException.buildWith(ex).build();
}
} catch (RestClientException ex) { (9)
// This exception is a generic RestClientException
// Handle other types of exceptions here
throw ex; (10)
} catch (IllegalArgumentException illegalArgumentException) { (11)
throw RetryableExecutorException.buildWith(illegalArgumentException).build();
} catch (RunT) { (11)
throw RetryableExecutorException.buildWith(illegalArgumentException).build();
}
}
}
Handling the exceptions is the most important task in StackSaga framework.
You have to handle the exception, and decide what should be the NonRetryableExecutorException and what should be the RetryableExecutorException carefully.
For example, in this demo you saw that we handled NOT_FOUND status.
the NOT_FOUND can be thrown if the endpoint not found that you are looking for.
Or otherwise it can be passed if the user does not exist.
Then, if you have not any awareness about what the target service returns, you will not be able to catch the real error.
In this example, we know exactly there is an endpoint /users/{userId} in the user-service therefore no worries.
But be careful if you access third party APIs.
Read the API documentation in detail.
|
1 | We have thrown both NonRetryableExecutorException ,and RetryableExecutorException that UserDetailExecutor’s doPrcess() method expects.
That’s why it was mentioned in above.
The handling exception part is done in the service layer.
[ Read the TIP ] |
2 | Call the http request to the user-service. |
3 | Catch the 4xx HTTP errors to determine if the exception is a NonRetryableExecutorException or RetryableExecutorException . |
4 | Due to the http error code is equal to NOT_FOUND (404), the process cannot be done anymore.
Therefore, a NonRetryableExecutorException is thrown by wrapping with the real exception.
If you want to put some data based on the exception, you can use the put("key","value") method for that.
The data can be accessed from any revert-exceptions. |
5 | Other 4xx errors are thrown as the NonRetryableExecutorException by wrapping the real error. |
6 | Catch the 5xx HTTP errors to determine if the exception is a NonRetryableExecutorException or RetryableExecutorException .
Most probably 5xx errors can be retried, but there are some cases it can not. |
7 | Check the 5xx error is equal to INTERNAL_SERVER_ERROR .
Because if there is an internal server in this case, we know that we cannot go ahead and the process should be stopped going forward.
Therefore, NonRetryableExecutorException is thrown by wrapping the real letter. |
8 | If the 5xx is not equal to INTERNAL_SERVER_ERROR , then other errors like 502, 503, 504, 509 error codes are caught as RetryableExecutorException and therefore a RetryableExecutorException is thrown by wrapping the real exception. |
9 | Cathe the other exceptions. |
10 | In this example, that other error codes are not considered because we assume that errors cannot be happened.
Therefore, that error just throws without wrapping with NonRetryableExecutorException .
IF you want to wrap, you can do as usual but is not required if you don’t consider those errors.
Because internally the framework wraps the all RuntimeExceptions with NonRetryableExecutorException by default. |
11 | Due to we are using spring-cloud-load-balancer , when we make a request via the RestTemplate internally load balancer check is there any registered services in the local cache.
Then, if there is no instance in the cache, it throws and exception with IllegalArgumentException .
But in our case, actuality it is also a retryable exception.
Because when an instance is registered, that execution can be invoked.
Therefore, that error is thrown as RetryableExecutorException . |
The reason for handling the exception is that this is where the http client does the invocation and the special this is most probably the exceptions are different to each other even though the http status code is the same. |
- Case-1
-
IF you change the Rest-Client (For instance, you move to RestTemplate to Feign-client), all the exceptions are changed. Then you have to change all the codes in the executor if you have handled the exceptions inside the executor. But in this way nothing to do anything.
- Case-2
-
If you have to change the protocol like HttpRest to GRPC, you have nothing to do in the executor layer.
Creating PreAuthExecutor (Command-Executor)
After initiating the order details, next we will be creating an executor called PreAuthExecutor
.
It is responsible for making PreAuth
by calling to the utility payment-service
.
Due to the execution should be reverted if the entire process is not successful as wished, the PreAuthExecutor
is implemented from the CommandExecutor
interface.
TO make the PreAuth request, we should provide the userId
, amount
and orderId
.
And to revert the process we have to provide the PreAuthRef
back.
(1)
@SagaExecutor(executeFor = "payment-service", liveCheck = false, value = "PreAuthExecutor")
@RequiredArgsConstructor
public class PreAuthExecutor implements CommandExecutor<PlaceOrderAggregator> { (2)
private final PaymentService paymentService;
@Override
public ProcessStepManager<PlaceOrderAggregator> doProcess(
PlaceOrderAggregator currentAggregator, (3)
ProcessStepManagerUtil<PlaceOrderAggregator> stepManager
) throws RetryableExecutorException, NonRetryableExecutorException {
currentAggregator.getExecutions().add(this.getClass().getSimpleName()+":"+ LocalDateTime.now());
(4)
final String preAuthRef = this.paymentService.makePreAuth(
PreAuthDto
.RequestBody
.builder()
.userId(currentAggregator.getUserId())
.amount(currentAggregator.getAmount())
.orderId(currentAggregator.getAggregatorTransactionId())
.build()
);
(5)
currentAggregator.setPreAuthRef(preAuthRef);
(6)
return stepManager.next(StockUpdateExecutor.class, OrderStatus.MADE_PRE_AUTH);
}
@Override
public SagaExecutionEventName doRevert(
NonRetryableExecutorException processException,
PlaceOrderAggregator finalAggregatorState,
RevertHintStore revertHintStore
)
throws RetryableExecutorException { (7)
this.paymentService.releasePreAuth(
PreAuthReleaseDto.RequestBody.builder()
(8)
.pareAuthRef(finalAggregatorState.getPreAuthRef())
(9)
.reason(processException.get("reason").orElse(""))
.build()
);
revertHintStore.put(
String.valueOf(System.currentTimeMillis()),
this.getClass().getSimpleName() + ":doRevert"
);(10)
return return OrderStatus.RELEASED_PRE_AUTH; (11)
}
}
1 | Annotate the PreAuthExecutor class as a SagaExecutor by using @SagaExecutor annotation.
|
2 | Implement the PreAuthExecutor from the CommandExecutor by passing the target Aggregator.
Read more about |
3 | you will have the currentAggregator object through the method parameter. due to this executor is the 3rd executor, the currentAggregator object has been updated by the OrderController (initialization), UserDetailExecutor ,and OrderInitializeExecutor . |
4 | make the pre-auth process by invoking the PaymentService’s makePreAuth() method. |
5 | Update the currentAggregator with preAuthRef data that received as the response. |
6 | Navigates the execution by using stepManager to the next executor called StockUpdateExecutor .
And also passed the status as MADE_PRE_AUTH . |
7 | RetryableExecutorException As per the StackSaga design pattern the revert process cannot have any NonRetryableExecutorException error.
It only can have RetryableExecutorException . |
8 | For Calling the PaymentService’s releasePreAuth() we have to provide the pareAuthRef data.
So we have the last aggregator data that was updated until the end of the process to forward.
Therefore, that contains the pareAuthRef and we can access it from the finalAggregatorState . |
9 | And As well as a reason should be given for the releasing PreAuth .
All the time the process can be failed (With NonRetryableExecutorException )we have added a data called reason with the NonRetryableExecutorException .
This is the time that can be used. |
10 | Update the hint-store object by adding current millisecond and the revert method name with the executor name while the revert process. In your real application, the hint-store can be used for adding some data regarding the revert process. It can be accessed from the next revert process. |
11 | Return the status as RELEASED_PRE_AUTH finally. |
Creating PaymentService
PaymentService
is a regular spring service that creates for calling utility payment-service
.
Due to the payment-service
is another utility service we have to call it through http or any other protocol that target service supports (Most probably API endpoints are exposed by http protocol).
This example’s payment-service
also support http endpoint to access the API.
Therefore, we will be using rest template as http client.
But you can use any Http client like Feign-client, Okhttp, Spring-webclient and so on.
The relationship between the executor and service should not be one-to-one.
A service can be used for many executors.
In this example, the PaymentService class is used for PreAuthExecutor both executors.
|
@Service
@RequiredArgsConstructor
public class PaymentService {
private final RestTemplate restTemplate;
public String makePreAuth(PreAuthDto.RequestBody requestBody)
throws RetryableExecutorException, NonRetryableExecutorException { (1)
try {
(2)
PreAuthDto.ResponseBody responseBody = this.restTemplate.postForObject(
"http://payment-service/pre-auth",
requestBody,
PreAuthDto.ResponseBody.class
);
assert responseBody != null;
(3)
return responseBody.getPreAuthRef();
} catch (HttpClientErrorException ex) { (4)
// This exception is thrown for HTTP 4xx errors (Client errors)
// You can handle specific HTTP error codes here
if (ex.getStatusCode().equals(HttpStatus.FORBIDDEN)) {
(5)
throw NonRetryableExecutorException
.buildWith(
new InsufficientBalanceException("Balance not sufficient"),
""
)
.build();
} else {
(6)
throw NonRetryableExecutorException.buildWith(ex, "").build();
}
} catch (HttpServerErrorException ex) { (7)
// This exception is thrown for HTTP 5xx errors (Server errors)
// You can handle specific HTTP error codes here
if (ex.getStatusCode().equals(HttpStatus.INTERNAL_SERVER_ERROR)) {
(8)
throw NonRetryableExecutorException.buildWith(ex, "").build();
} else {
//502 , 503, 504, 509 etc.
(9)
throw RetryableExecutorException.buildWith(ex).build();
}
} catch (RestClientException ex) { (10)
// This exception is a generic RestClientException
// Handle other types of exceptions here
(11)
throw ex;
} catch (IllegalArgumentException illegalArgumentException) {
throw RetryableExecutorException.buildWith(illegalArgumentException).build();
}
}
public void releasePreAuth(PreAuthReleaseDto.RequestBody requestBody) throws RetryableExecutorException {
try {
(12)
this.restTemplate.put(
"http://payment-service/pre-auth/release",
requestBody
);
} catch (HttpServerErrorException ex) {(13)
// This exception is thrown for HTTP 5xx errors (Server errors)
// You can handle specific HTTP error codes here
if (ex.getStatusCode().equals(HttpStatus.INTERNAL_SERVER_ERROR)) {
(14)
throw NonRetryableExecutorException.buildWith(ex, "").build();
} else {
//502 , 503, 504, 509 etc.
(15)
throw RetryableExecutorException.buildWith(ex).build();
}
} catch (RestClientException ex) { (16)
// This exception is thrown for HTTP 4xx errors (Client errors)
// You can handle specific HTTP error codes here
(17)
throw ex;
} catch (IllegalArgumentException illegalArgumentException) {
throw RetryableExecutorException.buildWith(illegalArgumentException).build();
} catch (RuntimeException restOfExceptions) { (18)
log.error("Unhanded exception : {}", restOfExceptions.getMessage());
log.warn("Unhanded exception was occurred and ignored while releasing pre-auth: {}", restOfExceptions.getMessage());
}
}
}
1 | We have thrown both NonRetryableExecutorException ,and RetryableExecutorException that PreAuthExecutor’s doPrcess() method expects.
The handling exception part is done in the service layer.
[ Read the TIP ] |
2 | Call the http request to the utility payment-service. |
3 | Returns the authRef that received as the response to the PreAuthExecutor . |
4 | Catch the 4xx HTTP errors to determine if the exception is a NonRetryableExecutorException or RetryableExecutorException . |
5 | An error can be thrown by the payment-service when we try to make a pre-auth if the user has no enough balance for making the pre-auth.
Therefore, the FORBIDDEN error code is filtered and throws it as NonRetryableExecutorException wrapping with a new exception called InsufficientBalanceException . |
6 | Other 4xx errors are thrown as NonRetryableExecutorException . |
7 | Catch the 5xx HTTP errors to determine if the exception is a NonRetryableExecutorException or RetryableExecutorException .
Most probably 5xx errors can be retried, but there are some cases it can not. |
8 | Check the 5xx error is equal to INTERNAL_SERVER_ERROR .
Because if there is an internal server in this case, we know that we cannot go ahead and the process should be stopped going forward.
Therefore, NonRetryableExecutorException is thrown by wrapping the real letter. |
9 | If the 5xx is not equal to INTERNAL_SERVER_ERROR , then other errors like 502, 503, 504, 509 error codes are caught as RetryableExecutorException and therefore a RetryableExecutorException is thrown by wrapping the real exception. |
10 | Cathe the other exceptions. |
11 | In this example, other error codes are not considered because we assume that errors cannot be happened.
Therefore, that error just throws without wrapping with NonRetryableExecutorException .
IF you want to wrap, you can do as usual but is not required if you don’t consider those errors.
Because internally the framework wraps the all RuntimeExceptions with NonRetryableExecutorException by default. |
12 | Making the request to the utility payment-service to release the PreAuth that we made.
This method is the Compensating of the makePreAuth . |
13 | Catch the 5xx HTTP errors to determine if the exception is a NonRetryableExecutorException or RetryableExecutorException .
Most probably 5xx errors can be retried, but there are some cases it can not. |
14 | If the 5xx is not equal to INTERNAL_SERVER_ERROR , then other errors like 502, 503, 504, 509 error codes are caught as RetryableExecutorException and therefore a RetryableExecutorException is thrown by wrapping the real exception. |
15 | If the 5xx is not equal to INTERNAL_SERVER_ERROR , then other errors like 502, 503, 504, 509 error codes are caught as RetryableExecutorException and therefore a RetryableExecutorException is thrown by wrapping the real exception. |
16 | Cathe the other exceptions. |
17 | In this example, other error codes are not considered because we assume that errors cannot be happened.
Therefore, that error just throws without wrapping with NonRetryableExecutorException .
IF you want to wrap, you can do as usual but is not required if you don’t consider those errors.
Because internally the framework wraps the all RuntimeExceptions with NonRetryableExecutorException by default. |
18 | Ignore other all unknown (Unhandled) exceptions to avoid the transaction termination. |
If you think that, even if the revert execution is failed for some reason (due to an unhandled exception,) the rest of revert executions should be run without terminating the transaction, you can flow this. |
Creating StockUpdateExecutor (Command-Executor)
StockUpdateExecutor
is responsible for updating (reduce) the stock by connecting with the utility stock-service.
After reducing the stock to place the order, an error occurred that reduced stock should be restored because the order will be canceled.
That means that the execution has Compensating.
Therefore, the executor should be a Command-Executor
(1)
@SagaExecutor(executeFor = "stock-service", liveCheck = false, value = "StockUpdateExecutor")
@RequiredArgsConstructor
public class StockUpdateExecutor implements CommandExecutor<PlaceOrderAggregator> { (2)
private final StockService stockService;
@Override
public ProcessStepManager<PlaceOrderAggregator> doProcess(
PlaceOrderAggregator currentAggregator, (3)
ProcessStepManagerUtil<PlaceOrderAggregator> stepManager
)
throws RetryableExecutorException, NonRetryableExecutorException {
currentAggregator.getExecutions().add(this.getClass().getSimpleName()+":"+ LocalDateTime.now());
(4)
this.stockService.updateStock(
UpdateStockDto.RequestBody
.builder()
.orderId(currentAggregator.getAggregatorTransactionId())
.items(currentAggregator
.getItems()
.stream()
.collect(
Collectors.toMap(
OrderItem::getItemId,
OrderItem::getQty
)
)
)
.build()
);
(5)
return stepManager.next(MakePaymentExecutor.class, OrderStatus.UPDATED_STOCK);
}
@Override
public String doRevert(
NonRetryableExecutorException processException,
PlaceOrderAggregator finalAggregatorState,
RevertHintStore revertHintStore
) throws RetryableExecutorException { (6)
(7)
this.stockService.restoreStock(
RestoreStockDto.RequestBody
.builder()
.reason(processException
.get("reason")
.orElse("")
)
.build()
);
(8)
revertHintStore.put(
String.valueOf(System.currentTimeMillis()),
this.getClass().getSimpleName() + ":doRevert"
);
(9)
return OrderStatus.RESTORED_STOCK;
}
}
Highlights
1 | Annotate the StockUpdateExecutor class as a SagaExecutor by using @SagaExecutor annotation.
|
2 | Implement the StockUpdateExecutor from the CommandExecutor by passing the target Aggregator.
Read more about |
3 | you will have the currentAggregator object through the method parameter. due to this executor is the 4th executor, the currentAggregator object has been updated by the OrderController (initialization), UserDetailExecutor , OrderInitializeExecutor ,and PreAuthExecutor . |
4 | updating the stock by invoking the StockService’s updateStock() method. the orderId and items are passed accessing from the currentAggregator . |
5 | Navigates the execution by using stepManager to the next executor called MakePaymentExecutor .
And also passed the status as UPDATED_STOCK . |
6 | RetryableExecutorException As per the StackSaga design pattern the revert process cannot have any NonRetryableExecutorException error.
It only can have RetryableExecutorException . |
7 | Calling the StockService’s restoreStock() method to restoring the items that has been reduced in the doProcess() method.
To restore the Stock, we have to pass a reason.
Due to we have put a reason all the possible times when an NonRetryableExecutorException is occurred in doProcess executions.
If the value doesn’t exist in the exception object we have passed just empty String there. |
8 | Update the hint-store object by adding current millisecond and the revert method name with the executor name while the revert process. In your real application, the hint-store can be used for adding some data regarding the revert process. It can be accessed from the next revert process. |
9 | Returns the status as RESTORED_STOCK finally. |
Related classes
Creating StockService
We called the StockService
in the StockUpdateExecutor
.
The service StockService
class is responsible for updating the stock or if there is an error, restore the stock by connecting with the utility stock-service
.
So let’s create the service now.
@Service
@RequiredArgsConstructor
public class StockService {
private final RestTemplate restTemplate;
public void updateStock(UpdateStockDto.RequestBody requestBody)
throws
RetryableExecutorException, NonRetryableExecutorException {
try {
(1)
this.restTemplate.put(
"http://stock-service/stock",
requestBody
);
} catch (HttpServerErrorException ex) { (2)
// This exception is thrown for HTTP 5xx errors (Server errors)
// You can handle specific HTTP error codes here
if (ex.getStatusCode().equals(HttpStatus.INTERNAL_SERVER_ERROR)) {
throw ex;
} else {
//502 , 503, 504, 509 etc.
throw RetryableExecutorException.buildWith(ex).build();
}
} catch (RestClientException ex) { (3)
// This exception is a generic RestClientException
// Handle other types of exceptions here
throw ex;
}
}
public void restoreStock(RestoreStockDto.RequestBody requestBody) throws RetryableExecutorException {
try {
(4)
this.restTemplate.put(
"http://stock-service/stock/restore",
requestBody
);
} catch (HttpServerErrorException ex) { (5)
// This exception is thrown for HTTP 5xx errors (Server errors)
// You can handle specific HTTP error codes here
if (ex.getStatusCode().equals(HttpStatus.INTERNAL_SERVER_ERROR)) {
throw ex;
} else {
//502 , 503, 504, 509 etc.
throw RetryableExecutorException.buildWith(ex).build();
}
} catch (RestClientException ex) { (6)
// This exception is a generic RestClientException
// Handle other types of exceptions here
throw ex;
}
}
}
This service is also pretty much the same as the service that has been created so far.
Highlights
1 | Call the endpoint to update the stock. |
2 | Catch the 5xx HTTP errors to determine if the exception is a NonRetryableExecutorException or RetryableExecutorException .
Most probably 5xx errors can be retried, but there are some cases it can not. |
3 | Cathe the other exceptions (with 4xx errors as well). |
4 | Call the endpoint to restore the updated stock. |
5 | Catch the 5xx HTTP errors to determine if the exception is a NonRetryableExecutorException or RetryableExecutorException .
Most probably 5xx errors can be retried, but there are some cases it can not. |
6 | Cathe the other exceptions (with 4xx errors as well). |
Creating MakePaymentExecutor (Command-Executor)
MakePaymentExecutor
is the final executor as per the diagram. this executor interacts with the utility payment-service
.
in the PreAuthExecutor
also we accessed the utility payment-service
for making the pre-auth.
his executor is responsible for making the real payment for that pre-auth
.
(1)
@SagaExecutor(executeFor = "stock-service", liveCheck = false, value = "StockUpdateExecutor")
@RequiredArgsConstructor
public class MakePaymentExecutor implements CommandExecutor<PlaceOrderAggregator> { (2)
private final PaymentService paymentService; (3)
@Override
public ProcessStepManager<PlaceOrderAggregator> doProcess(
PlaceOrderAggregator currentAggregator,
ProcessStepManagerUtil<PlaceOrderAggregator> stepManager
) throws RetryableExecutorException, NonRetryableExecutorException {
currentAggregator.getExecutions().add(this.getClass().getSimpleName()+":"+ LocalDateTime.now());
(4)
String paymentId = this.paymentService.makePayment(
MakePaymentDto
.RequestBody
.builder()
.amount(currentAggregator.getAmount())
.pareAuthRef(currentAggregator.getPreAuthRef())
.userId(currentAggregator.getUserId())
.build()
);
(5)
currentAggregator.setPaymentId(paymentId);
(6)
return stepManager.complete(OrderStatus.MADE_PAYMENT);
}
@Override
public String doRevert(NonRetryableExecutorException processException, PlaceOrderAggregator finalAggregatorState, RevertHintStore revertHintStore) throws RetryableExecutorException {
throw new UnsupportedOperationException(); (7)
}
}
Highlights
1 | Annotate the MakePaymentExecutor class as a SagaExecutor by using @SagaExecutor annotation.
|
||
2 | Implement the MakePaymentExecutor from the CommandExecutor by passing the target Aggregator.
Read more about |
||
3 | Autowire the PaymentService .
The old payment service is used this as well.
(The new methods will be updated below.) |
||
4 | Calling the PaymentService’s makePayment() method for making the payment.
To make the payment, we have to pass the following data.
|
||
5 | Updates the payment in the currentAggregator . |
||
6 | Due to this step is the finale execution, mentions the compiled signal to the SEC by using stepManager.complete method with the status as MADE_PAYMENT .
By mentioning compiled signal, the SEC stop the execution and mark the transaction has been successfully completed. |
||
7 | All other command-executors had a Compensating but in this executor have no any execution for Compensating.
The reason is that this is the final execution of this entire process and there is no any next executor can be failed.
You know that all the revert executions are called as the descending order from where the NonRetryableExecutorException is thrown.
Even this execution is failed for some reason, no need to do a Compensating because that execution already failed.
If this execution failed for some reason, all other executed (successfully executed so far), executor’s Compensatings should be invoked.
|
Refer the reference documentation to see how the executions are invoked.
Related Classes
Updating the PaymentService.
To make the payment, we have to create a new method in the existing PaymentService
.
public String makePayment(MakePaymentDto.RequestBody requestBody) throws RetryableExecutorException, NonRetryableExecutorException {
try {
MakePaymentDto.ResponseBody responseBody = this.restTemplate.postForObject(
"http://payment-service/pay",
requestBody,
MakePaymentDto.ResponseBody.class
);
assert responseBody != null;
log.debug(responseBody.getMessage());
return responseBody.getPaymentRef();
} catch (HttpClientErrorException ex) {
// This exception is thrown for HTTP 4xx errors (Client errors)
// You can handle specific HTTP error codes here
throw ex;
} catch (HttpServerErrorException ex) {
// This exception is thrown for HTTP 5xx errors (Server errors)
// You can handle specific HTTP error codes here
if (ex.getStatusCode().equals(HttpStatus.INTERNAL_SERVER_ERROR)) {
throw ex;
} else {
//502 , 503, 504, 509 etc.
throw RetryableExecutorException.buildWith(ex).build();
}
} catch (RestClientException ex) {
// This exception is a generic RestClientException
// Handle other types of exceptions here
throw ex;
} catch (IllegalArgumentException illegalArgumentException) {
throw RetryableExecutorException.buildWith(illegalArgumentException).build();
}
}
That method is also pretty much the same as the service methods we have been implementing so far. Just call the target utility service and handle the exception.
Related Classes' links