Last September, I wrote how to migrate from an Imperative Programming codebase to a Reactive Programming one in a step-by-step process. Because of its popularity, I illustrated the post with a Spring Boot demo. The final result uses Mono
and Flux
from Project Reactor. I also made sure that no step in the processing pipeline is blocking thanks to Blockhound.
I wrote the code in Java so that it could be accessible to the largest audience. But I'm still a Kotlin fan at heart. Hence, I ported the codebase to Kotlin.
But Kotlin brings more benefits to the table. In the context of reactive programming, it offers coroutines. I wanted to showcase them, but I found it harder than expected migrating from Project Reactor to coroutines. This is what I want to explain in this post.
The starting point
The following code is our starting point:
class Person // 1
interface PersonRepository : ReactiveSortingRepository<Person, Long> // 2
@Configuration
class PersonRoutes {
@Bean
fun router(repository: PersonRepository) = router {
val handler = PersonHandler(repository)
GET("/person", handler::getAll)
}
}
class PersonHandler(private val repository: PersonRepository) {
fun getAll(req: ServerRequest): Mono<ServerResponse> {
val flux = repository.findAll(Sort.by("lastName", "firstName")) // 3
return ok().body<Person>(flux) // 4
}
}
- The entity. The exact properties are not relevant to this post
- Spring Data R2DBC repository. It offers generic functions that return reactive types e.g.
Mono<Person>
andFlux<Person>
- At runtime, Spring provides an implementation
- Wraps the
Flux<Person>
into aMono<ServerResponse
Coroutines and reactive types are not compatible. At compile-time, the return type of suspended functions is changed from T
to Defered<T>
. Hence, migrating to coroutines requires to change the signature of functions in two ways:
- Add the
suspend
keyword - Change the return type from a reactive type
Flux<T>
(orMono<T>
) to a standard typeList<T>
(orT
)
Migrating the repository
The Coroutines Reactor module allows bridging from deferred types to reactive types with the Deferred.asMono()
extension function. But I found no way to bridge the other way around, from Reactor to coroutines. Because the repository returns reactive types, the whole processing chain needs to use them. The issue becomes how to make the repository coroutine-compatible. For that, Spring Data offers the CoroutineSortingRepository<T, ID>
interface.
We need another dependency:
<dependency>
<groupId>org.jetbrains.kotlinx</groupId>
<artifactId>kotlinx-coroutines-reactor</artifactId>
</dependency>
The code becomes:
interface PersonRepository : CoroutineCrudRepository<Person, Long>
Migrating the handler
This change makes all functions from the repository suspending. We need to propagate this upwards to the handler:
class PersonHandler(private val repository: PersonRepository) {
suspend fun getAll(req: ServerRequest): ServerResponse { // 1-2
val flow = repository.findAll(Sort.by("lastName", "firstName"))
return ok().bodyAndAwait(flow) // 3
}
}
- Add the
suspend
keyword - Return
ServerResponse
instead ofMono<ServerResponse.
- Replace the
body()
function by thebodyAndAwait()
extension function. This is necessary because all standard functions return reactive types.
Migrating the routes
Finally, we need to update the routes accordingly:
@Configuration
class PersonRoutes {
@Bean
fun router(repository: PersonRepository) = coRouter { // 1
val handler = PersonHandler(repository)
GET("/person", handler::getAll)
}
}
- Change from
router
tocoRouter
Bonus: migrating JDK types
I simplified the starting point above. The original code introduces caching between the repository and the handler via Hazelcast.
class CachingService(
private val cache: IMap<Long, Person>,
private val repository: PersonRepository
) {
fun findById(id: Long) = Mono.fromCompletionStage { cache.getAsync(id) } // 1-2
.switchIfEmpty( // 3
repository.findById(id)
.doOnNext { cache.putAsync(it.id, it) } // 4
)
}
- Hazelcast offers an asynchronous API that returns a
CompletionStage
-
Mono
allows to bridge fromCompletionStage
toMono
- If the cache doesn't contain the entity, switch to another branch
- When found, put the entity in the cache for later use
To bridge from CompletionStage<T>
to Deferred<T>
, we need one more dependency:
<dependency>
<groupId>org.jetbrains.kotlinx</groupId>
<artifactId>kotlinx-coroutines-reactor</artifactId>
</dependency>
With it comes the await()
extension function. The code becomes:
class CachingService(
private val cache: IMap<Long, Person>,
private val repository: PersonRepository
) {
suspend fun findById(id: Long) = cache.getAsync(id).await() // 1
?: repository.findById(id)?.also { cache.putAsync(it.id, it)
}
- Transform the
CompletionStage<Person>
type to theDeferred<Person>
type
As an added benefit, we can discard the specific Reactive Programming model to a more traditional Imperative one while keeping the code reactive.
Conclusion
Reactor types and coroutines aim to achieve the same goal but use different paths for that. For that reason, their types are not compatible. Migrating to suspending functions is a bottom-up process. For that, Spring Data R2DBC offers the CoroutineCrudRepository
. Once done, the rest is pretty straightforward.
The complete source code for this post can be found on Github.
Go further:
- Migrating from Imperative to Reactive
- Non-Blocking Spring Boot with Kotlin Coroutines
- How Reactive translates to Coroutines?
Originally published at A Java Geek on November 1st 2020