Двунаправленная потоковая передача

Последнее обновление: 27.11.2022

При двунаправленной потоковой передачи клиент и сервер могут одновременно посылать и получать поток сообщений. Рассмотрим подобное взаимодействие на простом примере

Определение сервера

В проекте сервера определим следующий файл metanit.proto:

syntax = "proto3";

package metanit;

message Request{ 
	string content = 1;
}

message Response{ 
	string content=1;
}

service Messenger{
  rpc DataStream (stream Request) returns (stream Response);
}

Сервис Messenger определяет метод DataStream, в котором сервис получает в потоке от клиента сообщения Request и отправляет в потоке клиенту сообщения Response.

Затем также в проекте сервера определим следующий класс MessengerService:

using Grpc.Core;
using Metanit;  // пространство имен сервиса MessengerService

public class MessengerService : Messenger.MessengerBase
{
    // сообшения для отправки
    string[] messages = { "Привет", "Норм", "...", "Нет", "пока" };

    public override async Task DataStream(IAsyncStreamReader<Request> requestStream, 
        IServerStreamWriter<Response> responseStream, 
        ServerCallContext context)
    {
        // считываем входящие сообщения в фоновой задаче
        var readTask = Task.Run(async () =>
        {
            await foreach (Request message in requestStream.ReadAllAsync())
            {
                Console.WriteLine($"Client: {message.Content}");
            }
        });

        foreach (var message in messages)
        {
            // Посылаем ответ, пока клиент не закроет поток
            if (!readTask.IsCompleted)
            {
                await responseStream.WriteAsync(new Response { Content = message });
                Console.WriteLine(message);
                await Task.Delay(2000);
            }
        }
        await readTask; // ожидаем завершения задачи чтения
    }
}

Метод сервиса в C# принимает три параметра. С помощью первого параметра

IAsyncStreamReader<Request> requestStream

сервис считывает поток сообщений от клиента

С помощью второго параметра:

IServerStreamWriter<Response> responseStream

сервис отправляет поток сообщений клиенту

Когда мы имеем дело одновременно с считыванием и отправкой данных, то типовое решение состоит в том, чтобы выделить как минимум одно из этих действий в отдельный поток/задачу. И в данном случае считывание данных выделяется в отдельную задачу:

var readTask = Task.Run(async () =>
{
    await foreach (Request message in requestStream.ReadAllAsync())
    {
        Console.WriteLine($"Client: {message.Content}");
    }
});

Здесь получаем данные от клиента с помощью метода ReadAllAsync(), который возвращает асинхронный стрим (объект IAsyncEnumerable). Используя цикл await foreach считываем из стрима все сообшения Request.

После запуска задачи на получение данных также запускаем в цикле отправку сообщений клиенту из массива messages:

foreach (var message in messages)
{
    // Посылаем ответ, пока клиент не закроет поток
    if (!readTask.IsCompleted)
    {
        await responseStream.WriteAsync(new Response { Content = message });
        Console.WriteLine(message);
        await Task.Delay(2000);
    }
}

Причем отправка идет, пока не завершится задача по приему сообщений из клиентского потока, то есть пока клиент не завершит отправку своих сообщений.

Далее в файле Program.cs подключим сервис MessengerService в приложение:

var builder = WebApplication.CreateBuilder(args);

builder.Services.AddGrpc();

var app = builder.Build();

// встраиваем MessengerService в обработку запроса
app.MapGrpcService<MessengerService>();
app.MapGet("/", () => "Hello World!");

app.Run();

В конце запустим проект сервера.

Определение проекта клиента

Для тестирования сервиса определим проект консольного приложения. Добавим в него из проекта сервера файл metanit.proto. И в проекте консольного клиента в файле Program.cs определим следующий код:

using Grpc.Core;
using Grpc.Net.Client;
using Metanit;  // пространство имен класса Messenger.MessengerClient

// данные для отправки
string[] messages = { "Привет", "Как дела?", "Че молчишь?", "Ты че, спишь?", "Ну пока" };

// создаем канал для обмена сообщениями с сервером
// параметр - адрес сервера gRPC
using var channel = GrpcChannel.ForAddress("https://localhost:7229");

// создаем клиент
var client = new Messenger.MessengerClient(channel);

// получаем объект AsyncDuplexStreamingCall
var call = client.DataStream();

var readTask = Task.Run(async () =>
{
    await foreach (var response in call.ResponseStream.ReadAllAsync())
    {
        Console.WriteLine($"Server: {response.Content}");
    }
});
foreach(var message in messages)
{
    await call.RequestStream.WriteAsync(new Request { Content = message });
    Console.WriteLine(message);
    await Task.Delay(2000);
}

// завершаем отправку сообщений на сервер
await call.RequestStream.CompleteAsync();
await readTask;

После создания клиента начинаем взаимодействие с сервером:

var call = client.DataStream();

Этот метод возвращает объект Grpc.Core.AsyncDuplexStreamingCall. Его свойство RequestStream представляет поток отправки и с помощью метода WriteAsync() позволяет отправить на сервер в потоке сообщения, а его свойство ResponseStream представляет поток для получения сообщений и позволяет получить сообщения с помощью методов ResponseStream.MoveNext() и ResponseStream.ReadAllAsync().

И здесь мы сталкиваемся с той же задачей, что и в случае с сервером - если мы хотим в проекте клиента одновременно получать и отправлять данные, то как минимум одно из действий выносится в фоновый поток/задачу. В данном случае вначале запускаем поток на считывание сообщений:

var readTask = Task.Run(async () =>
{
    await foreach (var response in call.ResponseStream.ReadAllAsync())
    {
        Console.WriteLine($"Server: {response.Content}");
    }
});

Затем последовательно отправляем все строки из массива messages:

foreach(var message in messages)
{
    await call.RequestStream.WriteAsync(new Request { Content = message });
    Console.WriteLine(message);
    await Task.Delay(2000);
}

Завершаем потоковую отправку сообщений методом CompleteAsync()

await call.RequestStream.CompleteAsync();

И ждем завершения фоновой задачи чтения сообщений:

await readTask;

Двунаправленная потокавая передача завершается, когда ResponseStream больше имеет сообщений.

Запустим сначала сервер, а потом клиент. и они обменяются между собой набором сообщений:

Двунаправленная потоковая передача в grpc в C#

Обратите внимание, что в данном примере нет обмена сообщений: сервер получает поток клиента, а клиент - поток сервер независимо друг от друга, поэтому вывод сообщений не детерминирован. Но поскольку на клиенте и сервере действует задержка в 2 секунды, может создаться квазивпечатление, что происходит обмен сообщениями.

Помощь сайту
Юмани:
410011174743222
Перевод на карту
Номер карты:
4048415020898850