Корутины позволяют возвращать одиночные значения. Для этого мы можем, к примеру, создавать корутину с помощью построителя async. Но Kotlin также позволяет создавать асинхронные потоки (Asynchronous Flow), которые возвращают набор объектов.
В принципе для получения набора объектов мы могли бы в корутине возвращать коллекцию элементов, например, список List, наподобие следующего:
import kotlinx.coroutines.* suspend fun main() = coroutineScope<Unit>{ launch { getUsers().forEach { user -> println(user) } } } suspend fun getUsers(): List<String> { delay(1000L) // имитация продолжительной работы return listOf("Tom", "Bob", "Sam") }
Однако проблема таких коллекций в том, что они одномоментно возвращают все объекты. Например, если в списке ожидается 1000 объектов, то соответственно пока функция getUsers()
не возвратит список из 1000 объектов (например, получая их из базы данных или из внешнего интернет-ресурса), мы не сможем манипулировать объектами из этого списка.
Эту проблему в Kotlin как раз и позволяют решить асинхронные потоки. Изменим пример выше с применением асинхронных потоков:
import kotlinx.coroutines.flow.* import kotlinx.coroutines.* suspend fun main(){ getUsers().collect { user -> println(user) } } fun getUsers(): Flow<String> = flow { val database = listOf("Tom", "Bob", "Sam") // условная база данных var i = 1; for (item in database){ delay(400L) // имитация продолжительной работы println("Emit $i item") emit(item) // емитируем значение i++ } }
Для создания асинхронного потока данных применяется интерфейс Flow. То есть по сути асинхронный поток - это объект Flow. Он типизируется
типом тех данных, которые должны передаваться в потоке. В данном случае передаем строки, поэтому Flow
типизируется типом String
.
fun getUsers(): Flow<String>
При этом при определении функции-потока (в данном случае функции getUsers
) необязательно использовать модификатор suspend.
Для создания объекта Flow применяется специальная функция flow()
fun getUsers(): Flow<String> = flow { // создание асинхронного потока в функции flow }
В самой функции в данном случае имитируется получение объектов из условной базы данных, коей здесь для простоты служит список List. В цикле пробегаемся по этому списку и отправляем в поток текущий объект с помощью функции emit():
emit(item) // передаем значение в поток
Это ключевой момент. Благодаря этому внешний код сможет получит переданное через emit()
в поток значение и использовать его.
Для индикации номера отправляемого объекта я добавил переменную-счетчик i
, которая увеличивается при переходе к другому объекту списка.
Вывод номера отправляемого объекта позволяет увидеть, что получение внешним кодом объектов из списка происходит по мере его передачи в поток с помощью функции emit()
, а не когда будут отправлены
все объекты из списка.
Во внешнем коде в функции main вызываем функцию-поток getUsers()
. Для управления объектами из потока для интерфейса Flow определен ряд
функций, одной из которых является функция collect()
. В качестве параметра она принимает функцию, в которую передает эмитируемый объект из потока. Так, в данном случае
это просто функция вывода на консоль:
getUsers().collect { user -> println(user) }
В итоге мы получим следующий консольный вывод:
Emit 1 item Tom Emit 2 item Bob Emit 3 item Sam
Таким образом, программа не ждет, когда функция getUsers возвратит все строки. А получает строки по мере их отправки в поток через функцию emit()
.
Другой пример - создадим и используем асинхронный поток чисел:
import kotlinx.coroutines.flow.* suspend fun main(){ getNumbers().collect { number -> println(number) } } fun getNumbers(): Flow<Int> = flow{ for(item in 1..5){ emit(item * item) } }
Здесь в принципе все то же самое. Функция getNumbers()
представляет асинхронный поток объектов Int. В качестве объектов в поток добавляются
квадраты чисел от 1 до 5. Консольный вывод программы:
1 4 9 16 25
Стоит отметить, что асинхронный поток не запускается, пока не будет применена терминальная операция над получаемыми даными, например, функция collect()
:
import kotlinx.coroutines.flow.* suspend fun main(){ val numberFlow = getNumbers() // поток создан, но не запущен println("numberFlow has created") println("launch collect function") numberFlow.collect { number -> println(number) } // запуск потока } fun getNumbers() = flow{ println("numberFlow has started") for(item in 1..5){ emit(item * item) } }
Консольный вывод программы:
numberFlow has created launch collect function numberFlow has started 1 4 9 16 25