事务回滚在被动应用程序中
我使用RxJava 1.1在Spring应用程序中编写了一个可观察的序列,如下所示:
@Transaction
public Observable<Event> create(Event event) {
return Observable.just(event)
.flatMap(event -> {
//save event to db (blocking JPA operation)
Event event = eventRepository.save(event);
return Observable.just(event);
})
//async REST call to service A
.flatMap(this::sendEventToServiceA) <---- may execute on different thread
//async REST call to service B
.flatMap(this::sendEventToServiceB) <---- may execute on different thread
.doOnError( throwable -> {
// ? rollback initally created transaction?
})
}
事件从某个控制器类到达我的应用程序的服务层,并通过使用RxJava的flatMap()函数构建的操作链传播。 事件首先被存储在数据库(Spring Data)中,接下来的两个异步HTTP请求在后台使用Spring的AsyncRestTemplate库一个接一个地执行。
如果在管道中的任何地方引发错误/异常,我希望能够回滚数据库事务,以便事件不会存储在数据库中。 我发现这并不容易,因为在Spring中,事务上下文与特定的执行线程相关联。 因此,如果代码在另一个线程上达到doOnError回调(AsyncRestTemplate使用自己的AsyncTaskExecutor),则不可能回滚最初创建的事务。
您能否建议任何机制实现跨多线程应用程序的事务,这些应用程序由以这种方式编写的多个异步操作组成?
我也尝试通过编程方式创建一个事务:
TransactionStatus status = transactionManager.getTransaction(new DefaultTransactionDefinition());
然后发送transactionStatus对象以及管道上的事件,但是当错误发生时我再次调用“platformTransactionManager.rollback(status);”,因为它在不同的线程上运行,所以我得到“事务同步不活动”我猜。
ps sendEventToServiceA / sendEventToServiceB方法与此类似:
public Observable<Event> sendEventToServiceA(event) {
..........
ListenableFuture<ResponseEntity<String>> listenableFuture = asyncRestTemplate.exchange(
"/serviceA/create?event_id=" + event.id,
HttpMethod.POST, requestEntity, String.class);
return ObservableUtil.toRxObservable(listenableFuture);
}
这样做的一种方法是确保在与db保存相同的线程上观察到错误:
@Transaction
public Observable<Event> create(Event event) {
Scheduler scheduler = Schedulers.from(Executors.newSingleThreadExecutor());
return Observable.just(event)
.flatMap(event -> {
//save event to db (blocking JPA operation)
Event event = eventRepository.save(event);
return Observable.just(event);
})
.subscribeOn(scheduler)
//async REST call to service A
.flatMap(this::sendEventToServiceA) <---- may execute on different thread
//async REST call to service B
.flatMap(this::sendEventToServiceB) <---- may execute on different thread
.observeOn(scheduler)
.doOnError( throwable -> {
// ? rollback initally created transaction?
})
}
链接地址: http://www.djcxy.com/p/32783.html
上一篇: Transaction rollback in a reactive application
下一篇: How to read cookie from org.eclipse.swt.browser.Browser?