事务回滚在被动应用程序中

我使用RxJava 1.1在Spring应用程序中编写了一个可观察的序列,如下所示:

@Transaction
public Observable<Event> create(Event event) {
     return Observable.just(event)
            .flatMap(event -> {
                //save event to db (blocking JPA operation)
                Event event = eventRepository.save(event); 
                return Observable.just(event);
            })
            //async REST call to service A
            .flatMap(this::sendEventToServiceA) <---- may execute on different thread
            //async REST call to service B
            .flatMap(this::sendEventToServiceB) <---- may execute on different thread
            .doOnError( throwable -> {
                // ? rollback initally created transaction?
            })
}

事件从某个控制器类到达我的应用程序的服务层,并通过使用RxJava的flatMap()函数构建的操作链传播。 事件首先被存储在数据库(Spring Data)中,接下来的两个异步HTTP请求在后台使用Spring的AsyncRestTemplate库一个接一个地执行。

如果在管道中的任何地方引发错误/异常,我希望能够回滚数据库事务,以便事件不会存储在数据库中。 我发现这并不容易,因为在Spring中,事务上下文与特定的执行线程相关联。 因此,如果代码在另一个线程上达到doOnError回调(AsyncRestTemplate使用自己的AsyncTaskExecutor),则不可能回滚最初创建的事务。

您能否建议任何机制实现跨多线程应用程序的事务,这些应用程序由以这种方式编写的多个异步操作组成?

我也尝试通过编程方式创建一个事务:

TransactionStatus status = transactionManager.getTransaction(new DefaultTransactionDefinition());

然后发送transactionStatus对象以及管道上的事件,但是当错误发生时我再次调用“platformTransactionManager.rollback(status);”,因为它在不同的线程上运行,所以我得到“事务同步不活动”我猜。

ps sendEventToServiceA / sendEventToServiceB方法与此类似:

public Observable<Event> sendEventToServiceA(event) {
    ..........
    ListenableFuture<ResponseEntity<String>> listenableFuture = asyncRestTemplate.exchange(
              "/serviceA/create?event_id=" + event.id,
              HttpMethod.POST, requestEntity, String.class);

    return ObservableUtil.toRxObservable(listenableFuture);
}

这样做的一种方法是确保在与db保存相同的线程上观察到错误:

@Transaction
public Observable<Event> create(Event event) {

     Scheduler scheduler = Schedulers.from(Executors.newSingleThreadExecutor());
     return Observable.just(event)
            .flatMap(event -> {
                //save event to db (blocking JPA operation)
                Event event = eventRepository.save(event); 
                return Observable.just(event);
            })
            .subscribeOn(scheduler)
            //async REST call to service A
            .flatMap(this::sendEventToServiceA) <---- may execute on different thread
            //async REST call to service B
            .flatMap(this::sendEventToServiceB) <---- may execute on different thread
            .observeOn(scheduler)
            .doOnError( throwable -> {
                // ? rollback initally created transaction?
            })
}
链接地址: http://www.djcxy.com/p/32783.html

上一篇: Transaction rollback in a reactive application

下一篇: How to read cookie from org.eclipse.swt.browser.Browser?