Search code examples
kotlinkotlin-coroutinesterminologykotlin-flow

What are upstream/downstream flows in the Kotlin language


In the flow documentation they mentioned an upstream and a downstream flows:

Flows can be transformed with operators, just as you would with collections and sequences. Intermediate operators are applied to an upstream flow and return a downstream flow.

Given code (from the same source):

(1..3).asFlow() // a flow of requests
        .map { request -> performRequest(request) }
        .collect { response -> println(response) 

Do upstream and downstream mean an input and an output (a return value)? For example when the programming language starts to execute the .map part, a value of (1..3).asFlow() is an upstream flow and a return value of the .map {...} is a downstream flow?


Solution

  • Kotlin's oficial definition:

    interface Flow<out T>

    An asynchronous data stream that sequentially emits values and completes normally or with an exception.

    Intermediate operators on the flow such as map, filter, take, zip, etc are functions that are applied to the upstream flow or flows and return a downstream flow where further operators can be applied to. Intermediate operations do not execute any code in the flow and are not suspending functions themselves. They only set up a chain of operations for future execution and quickly return. This is known as a cold flow property.

    Terminal operators on the flow are either suspending functions such as collect, single, reduce, toList, etc. or launchIn operator that starts collection of the flow in the given scope. They are applied to the upstream flow and trigger execution of all operations. Execution of the flow is also called collecting the flow and is always performed in a suspending manner without actual blocking. Terminal operators complete normally or exceptionally depending on successful or failed execution of all the flow operations in the upstream. The most basic terminal operator is collect, for example:

    try {
        flow.collect { value ->
            println("Received $value")
        }
    } catch (e: Exception) {
        println("The flow has thrown an exception: $e")
    }