При двунаправленной потоковой передачи клиент и сервер могут одновременно посылать и получать поток сообщений. Рассмотрим подобное взаимодействие на простом примере
В проекте сервера определим следующий файл metanit.proto:
syntax = "proto3"; package metanit; message Request{ string content = 1; } message Response{ string content=1; } service Messenger{ rpc DataStream (stream Request) returns (stream Response); }
Сервис Messenger определяет метод DataStream
, в котором сервис получает в потоке от клиента сообщения Request и отправляет в потоке клиенту сообщения Response.
Затем также в проекте сервера определим следующий класс MessengerService:
using Grpc.Core; using Metanit; // пространство имен сервиса MessengerService public class MessengerService : Messenger.MessengerBase { // сообшения для отправки string[] messages = { "Привет", "Норм", "...", "Нет", "пока" }; public override async Task DataStream(IAsyncStreamReader<Request> requestStream, IServerStreamWriter<Response> responseStream, ServerCallContext context) { // считываем входящие сообщения в фоновой задаче var readTask = Task.Run(async () => { await foreach (Request message in requestStream.ReadAllAsync()) { Console.WriteLine($"Client: {message.Content}"); } }); foreach (var message in messages) { // Посылаем ответ, пока клиент не закроет поток if (!readTask.IsCompleted) { await responseStream.WriteAsync(new Response { Content = message }); Console.WriteLine(message); await Task.Delay(2000); } } await readTask; // ожидаем завершения задачи чтения } }
Метод сервиса в C# принимает три параметра. С помощью первого параметра
IAsyncStreamReader<Request> requestStream
сервис считывает поток сообщений от клиента
С помощью второго параметра:
IServerStreamWriter<Response> responseStream
сервис отправляет поток сообщений клиенту
Когда мы имеем дело одновременно с считыванием и отправкой данных, то типовое решение состоит в том, чтобы выделить как минимум одно из этих действий в отдельный поток/задачу. И в данном случае считывание данных выделяется в отдельную задачу:
var readTask = Task.Run(async () => { await foreach (Request message in requestStream.ReadAllAsync()) { Console.WriteLine($"Client: {message.Content}"); } });
Здесь получаем данные от клиента с помощью метода ReadAllAsync()
, который возвращает асинхронный стрим (объект IAsyncEnumerable). Используя цикл await foreach
считываем из стрима все сообшения Request.
После запуска задачи на получение данных также запускаем в цикле отправку сообщений клиенту из массива messages:
foreach (var message in messages) { // Посылаем ответ, пока клиент не закроет поток if (!readTask.IsCompleted) { await responseStream.WriteAsync(new Response { Content = message }); Console.WriteLine(message); await Task.Delay(2000); } }
Причем отправка идет, пока не завершится задача по приему сообщений из клиентского потока, то есть пока клиент не завершит отправку своих сообщений.
Далее в файле Program.cs подключим сервис MessengerService в приложение:
var builder = WebApplication.CreateBuilder(args); builder.Services.AddGrpc(); var app = builder.Build(); // встраиваем MessengerService в обработку запроса app.MapGrpcService<MessengerService>(); app.MapGet("/", () => "Hello World!"); app.Run();
В конце запустим проект сервера.
Для тестирования сервиса определим проект консольного приложения. Добавим в него из проекта сервера файл metanit.proto. И в проекте консольного клиента в файле Program.cs определим следующий код:
using Grpc.Core; using Grpc.Net.Client; using Metanit; // пространство имен класса Messenger.MessengerClient // данные для отправки string[] messages = { "Привет", "Как дела?", "Че молчишь?", "Ты че, спишь?", "Ну пока" }; // создаем канал для обмена сообщениями с сервером // параметр - адрес сервера gRPC using var channel = GrpcChannel.ForAddress("https://localhost:7229"); // создаем клиент var client = new Messenger.MessengerClient(channel); // получаем объект AsyncDuplexStreamingCall var call = client.DataStream(); var readTask = Task.Run(async () => { await foreach (var response in call.ResponseStream.ReadAllAsync()) { Console.WriteLine($"Server: {response.Content}"); } }); foreach(var message in messages) { await call.RequestStream.WriteAsync(new Request { Content = message }); Console.WriteLine(message); await Task.Delay(2000); } // завершаем отправку сообщений на сервер await call.RequestStream.CompleteAsync(); await readTask;
После создания клиента начинаем взаимодействие с сервером:
var call = client.DataStream();
Этот метод возвращает объект Grpc.Core.AsyncDuplexStreamingCall
. Его свойство RequestStream представляет поток отправки и
с помощью метода WriteAsync() позволяет отправить на сервер в потоке сообщения, а его свойство ResponseStream
представляет поток для получения сообщений и позволяет получить сообщения с помощью методов ResponseStream.MoveNext()
и ResponseStream.ReadAllAsync()
.
И здесь мы сталкиваемся с той же задачей, что и в случае с сервером - если мы хотим в проекте клиента одновременно получать и отправлять данные, то как минимум одно из действий выносится в фоновый поток/задачу. В данном случае вначале запускаем поток на считывание сообщений:
var readTask = Task.Run(async () => { await foreach (var response in call.ResponseStream.ReadAllAsync()) { Console.WriteLine($"Server: {response.Content}"); } });
Затем последовательно отправляем все строки из массива messages:
foreach(var message in messages) { await call.RequestStream.WriteAsync(new Request { Content = message }); Console.WriteLine(message); await Task.Delay(2000); }
Завершаем потоковую отправку сообщений методом CompleteAsync()
await call.RequestStream.CompleteAsync();
И ждем завершения фоновой задачи чтения сообщений:
await readTask;
Двунаправленная потокавая передача завершается, когда ResponseStream больше имеет сообщений.
Запустим сначала сервер, а потом клиент. и они обменяются между собой набором сообщений:
Обратите внимание, что в данном примере нет обмена сообщений: сервер получает поток клиента, а клиент - поток сервер независимо друг от друга, поэтому вывод сообщений не детерминирован. Но поскольку на клиенте и сервере действует задержка в 2 секунды, может создаться квазивпечатление, что происходит обмен сообщениями.