Transaction rollback in a reactive application

I am using RxJava 1.1 to compose an observable sequence from inside a Spring application that looks like the following:

@Transaction
public Observable<Event> create(Event event) {
     return Observable.just(event)
            .flatMap(event -> {
                //save event to db (blocking JPA operation)
                Event event = eventRepository.save(event); 
                return Observable.just(event);
            })
            //async REST call to service A
            .flatMap(this::sendEventToServiceA) <---- may execute on different thread
            //async REST call to service B
            .flatMap(this::sendEventToServiceB) <---- may execute on different thread
            .doOnError( throwable -> {
                // ? rollback initally created transaction?
            })
}

An event reaches the service layer of my application from some controller class and this propagates through a chain of operations built with RxJava's flatMap() function. The event is first stored in database (Spring Data) and next two asynchronous HTTP requests are executed one after the other using Spring's AsyncRestTemplate library behind the scenes.

In case an error/exception is thrown anywhere in the pipeline, I would like to be able to rollback the database transaction so that the event is NOT stored in database. I found this is not easy to do since in Spring the transaction context is associated with the particular thread of execution. So if the code reaches the doOnError callback on a different thread (AsyncRestTemplate uses its own AsyncTaskExecutor), it is not possible to rollback the initially created transaction.

Can you please advise any mechanism to achieve transactions across a multi-threaded application composed of several asynchronous operations written in this way?

I have also tried to create a transaction programmatically with:

TransactionStatus status = transactionManager.getTransaction(new DefaultTransactionDefinition());

and then send the transactionStatus object along with the event across the pipeline, but again when a error occurs and I invoke "platformTransactionManager.rollback(status);", I get "transaction synchronization is not active" since this is running on a different thread I guess.

ps The sendEventToServiceA / sendEventToServiceB methods look similar to this:

public Observable<Event> sendEventToServiceA(event) {
    ..........
    ListenableFuture<ResponseEntity<String>> listenableFuture = asyncRestTemplate.exchange(
              "/serviceA/create?event_id=" + event.id,
              HttpMethod.POST, requestEntity, String.class);

    return ObservableUtil.toRxObservable(listenableFuture);
}

这样做的一种方法是确保在与db保存相同的线程上观察到错误:

@Transaction
public Observable<Event> create(Event event) {

     Scheduler scheduler = Schedulers.from(Executors.newSingleThreadExecutor());
     return Observable.just(event)
            .flatMap(event -> {
                //save event to db (blocking JPA operation)
                Event event = eventRepository.save(event); 
                return Observable.just(event);
            })
            .subscribeOn(scheduler)
            //async REST call to service A
            .flatMap(this::sendEventToServiceA) <---- may execute on different thread
            //async REST call to service B
            .flatMap(this::sendEventToServiceB) <---- may execute on different thread
            .observeOn(scheduler)
            .doOnError( throwable -> {
                // ? rollback initally created transaction?
            })
}
链接地址: http://www.djcxy.com/p/32784.html

上一篇: 在TypeScript中创建一个Swagger Web服务的正确方法是什么?

下一篇: 事务回滚在被动应用程序中