Асинхронные потоки

Введение в асинхронные потоки

Последнее обновление: 22.01.2022

Корутины позволяют возвращать одиночные значения. Для этого мы можем, к примеру, создавать корутину с помощью построителя async. Но Kotlin также позволяет создавать асинхронные потоки (Asynchronous Flow), которые возвращают набор объектов.

В принципе для получения набора объектов мы могли бы в корутине возвращать коллекцию элементов, например, список List, наподобие следующего:

import kotlinx.coroutines.*

suspend fun main() = coroutineScope<Unit>{
    launch {
        getUsers().forEach { user -> println(user) }
    }
}

suspend fun getUsers(): List<String> {
    delay(1000L)  // имитация продолжительной работы
    return listOf("Tom", "Bob", "Sam")
}

Однако проблема таких коллекций в том, что они одномоментно возвращают все объекты. Например, если в списке ожидается 1000 объектов, то соответственно пока функция getUsers() не возвратит список из 1000 объектов (например, получая их из базы данных или из внешнего интернет-ресурса), мы не сможем манипулировать объектами из этого списка.

Эту проблему в Kotlin как раз и позволяют решить асинхронные потоки. Изменим пример выше с применением асинхронных потоков:

import kotlinx.coroutines.flow.*
import kotlinx.coroutines.*

suspend fun main(){
    getUsers().collect { user -> println(user) }
}

fun getUsers(): Flow<String> = flow {
    val database = listOf("Tom", "Bob", "Sam")  // условная база данных
    var i = 1;
    for (item in database){
        delay(400L) // имитация продолжительной работы
        println("Emit $i item")
        emit(item) // емитируем значение
        i++
    }
}

Для создания асинхронного потока данных применяется интерфейс Flow. То есть по сути асинхронный поток - это объект Flow. Он типизируется типом тех данных, которые должны передаваться в потоке. В данном случае передаем строки, поэтому Flow типизируется типом String.

fun getUsers(): Flow<String>

При этом при определении функции-потока (в данном случае функции getUsers) необязательно использовать модификатор suspend.

Для создания объекта Flow применяется специальная функция flow()

fun getUsers(): Flow<String> = flow {
    
	// создание асинхронного потока в функции flow
}

В самой функции в данном случае имитируется получение объектов из условной базы данных, коей здесь для простоты служит список List. В цикле пробегаемся по этому списку и отправляем в поток текущий объект с помощью функции emit():

emit(item) // передаем значение в поток

Это ключевой момент. Благодаря этому внешний код сможет получит переданное через emit() в поток значение и использовать его.

Для индикации номера отправляемого объекта я добавил переменную-счетчик i, которая увеличивается при переходе к другому объекту списка. Вывод номера отправляемого объекта позволяет увидеть, что получение внешним кодом объектов из списка происходит по мере его передачи в поток с помощью функции emit(), а не когда будут отправлены все объекты из списка.

Во внешнем коде в функции main вызываем функцию-поток getUsers(). Для управления объектами из потока для интерфейса Flow определен ряд функций, одной из которых является функция collect(). В качестве параметра она принимает функцию, в которую передает эмитируемый объект из потока. Так, в данном случае это просто функция вывода на консоль:

getUsers().collect { user -> println(user) }

В итоге мы получим следующий консольный вывод:

Emit 1 item
Tom
Emit 2 item
Bob
Emit 3 item
Sam

Таким образом, программа не ждет, когда функция getUsers возвратит все строки. А получает строки по мере их отправки в поток через функцию emit().

Другой пример - создадим и используем асинхронный поток чисел:

import kotlinx.coroutines.flow.*

suspend fun main(){
    getNumbers().collect { number -> println(number) }
}

fun getNumbers(): Flow<Int> = flow{
    for(item in 1..5){
        emit(item * item)
    }
}

Здесь в принципе все то же самое. Функция getNumbers() представляет асинхронный поток объектов Int. В качестве объектов в поток добавляются квадраты чисел от 1 до 5. Консольный вывод программы:

1
4
9
16
25

Запуск Flow

Стоит отметить, что асинхронный поток не запускается, пока не будет применена терминальная операция над получаемыми даными, например, функция collect():

import kotlinx.coroutines.flow.*

suspend fun main(){

    val numberFlow = getNumbers()		// поток создан, но не запущен
    println("numberFlow has created")
    println("launch collect function")
    numberFlow.collect { number -> println(number) }	  // запуск потока
}

fun getNumbers() = flow{
    println("numberFlow has started")
    for(item in 1..5){
        emit(item * item)
    }
}

Консольный вывод программы:

numberFlow has created
launch collect function
numberFlow has started
1
4
9
16
25
Помощь сайту
Юмани:
410011174743222
Перевод на карту
Номер карты:
4048415020898850