Search code examples
springreactive-programmingrx-javarx-androidnetflix

What is the correct way of managing transactionality in RxJava Services?


I have recently started experimenting with RxJava and came across a presentation by a Netflix engineer that suggested moving our business APIs to Observable APIs, for instance:

public interface VideoService {
    Observable<VideoBasicInfo> createVideoBasicInfo(VideoBasicInfo videoBasic);
    Observable<VideoBasicInfo> getVideoBasicInfo(Integer videoId);
    Observable<VideoRating> getVideoRating(Integer videoId);
}

However I haven't found any place that explained how transactionality should be managed in this services. At first I just annotated my service implementation with @Transactional

@Service
@Transactional
public class VideoServiceImpl implements VideoService{

    @Autowired
    private VideoBasicInfoRepository basicInfoRepo;
    @Autowired
    private VideoRatingRepository ratingRepo;

    public Observable<VideoBasicInfo> createVideoBasicInfo(VideoBasicInfo videoBasic){
        return Observable.create( s -> {
            s.onNext(basicInfoRepo.save(videBasic));
        });
    }

What we would want is that the execution of all the code inside the Object.create lambda (s -> { // This code }) happened in a transaction. HOWEVER, what actually happens is that:

  1. The call to createVideoBasicInfo() executes in a transactional way, returning the cold observable.
  2. The save() executes as an atomic transaction.

Obviously it makes sense since the Spring proxy applies to the serviceImpl methods. I have thought of ways to do what I really expect such as starting a programmatic transaction:

return Observable.create( s -> {
    VideoBasicInfo savedBasic = transactionTemplate.execute( status -> {
        VideoBasicInfo basicInfo = basicInfoRepo.save(videoBasicInfo);
        return basicInfo;
    });
    s.onNext(savedBasic);
});

Is this the recommended way of managing transactions when working with reactive APIs?


Solution

  • Spring Data JpaRepository method signatures are already marked @Transactional, so if you are using just one then you don't need to do anything special:

    public interface PersonRepository extends JpaRepository<Person, Integer> {
    }
    

    @RunWith(SpringJUnit4ClassRunner.class)
    @SpringApplicationConfiguration(classes = {RepositoryConfiguration.class})
    public class PersonRepositoryTest {
        private PersonRepository personRepository;
    
        @Autowired
        public void setPersonRepository(PersonRepository PersonRepository) {
            this.personRepository = PersonRepository;
        }
    
        @Test
        public void testReactiveSavePerson() {
            Person person = new Person("Jane", "Doe");
            assertNull(person.getId()); //null before save
    
            //save person
            Observable.create(s -> {
                s.onNext(personRepository.save(person));
            }).subscribe();
    
            //fetch from DB
            Person fetchedPerson = personRepository.findOne(person.getId());
    
            //should not be null
            assertNotNull(fetchedPerson);
    
            //should equal
            assertEquals(person.getId(), fetchedPerson.getId());
            assertEquals(person.getFirstName(), fetchedPerson.getFirstName());
        }
    }
    

    If you need to combine multiple repositories into one transaction, you could use something like the class below:

    @Component()
    public class ObservableTxFactory {
        public final <T> Observable<T> create(Observable.OnSubscribe<T> f) {
            return new ObservableTx<>(this, f);
        }
    
        @Transactional
        public void call(Observable.OnSubscribe onSubscribe, Subscriber subscriber) {
            onSubscribe.call(subscriber);
        }
    
        private static class ObservableTx<T> extends Observable<T> {
    
            public ObservableTx(ObservableTxFactory observableTxFactory, OnSubscribe<T> f) {
                super(new OnSubscribeDecorator<>(observableTxFactory, f));
            }
        }
    
        private static class OnSubscribeDecorator<T> implements Observable.OnSubscribe<T> {
    
            private final ObservableTxFactory observableTxFactory;
            private final Observable.OnSubscribe<T> onSubscribe;
    
            OnSubscribeDecorator(final ObservableTxFactory observableTxFactory, final Observable.OnSubscribe<T> s) {
                this.onSubscribe = s;
                this.observableTxFactory = observableTxFactory;
            }
    
            @Override
            public void call(Subscriber<? super T> subscriber) {
                observableTxFactory.call(onSubscribe, subscriber);
            }
        }
    }
    

    The factory bean needs to be defined as well:

    @Bean
    ObservableTxFactory observableTxFactory() {
        return new ObservableTxFactory();
    }
    

    Service:

    @Service
    public class PersonService {
        @Autowired
        PersonRepository personRepository;
        @Autowired
        ObservableTxFactory observableTxFactory;
    
        public Observable<Person> createPerson(String firstName, String lastName) {
            return observableTxFactory.create(s -> {
                Person p = new Person(firstName, lastName);
                s.onNext(personRepository.save(p));
            });
        }
    }
    

    Test:

    @RunWith(SpringJUnit4ClassRunner.class)
    @SpringApplicationConfiguration(classes = {RepositoryConfiguration.class})
    public class PersonServiceTest {
        @Autowired
        PersonRepository personRepository;
        @Autowired
        ObservableTxFactory observableTxFactory;
    
        @Test
        public void testPersonService() {
            final PersonService service = new PersonService();
            service.personRepository = personRepository;
            service.observableTxFactory = observableTxFactory;
    
            final Observable<Person> personObservable = service.createPerson("John", "Doe");
            personObservable.subscribe();
    
            //fetch from DB
            final Person fetchedPerson = StreamSupport.stream(personRepository.findAll().spliterator(), false)
                    .filter(p -> p.getFirstName().equals("John") && p.getLastName().equals("Doe"))
                    .findFirst()
                    .get();
    
            //should not be null
            assertNotNull(fetchedPerson);
        }
    
    }
    

    Screenshot showing proxy: enter image description here