Потоковая передача сервера

Последнее обновление: 27.11.2022

Рассмотрим потоковую передачу сообщений сервером и их получение клиентом.

Проект сервера

Сначала в проекте сервера определим следующий файл metanit.proto:

syntax = "proto3";

package metanit;

message Request{ }

message Response{
	string content = 1;
}

service Messenger{
  // серверная потоковая передача
  rpc ServerDataStream (Request) returns (stream Response);
}

Здесь сервис Messenger определяет метод ServerDataStream, который для простоты получает от клиента пустое сообщение Request. А в ответ клиенту возвращает сообщение Response. Чтобы указать, что сервер посылает данные в потоке, перед типом ответного сообщения указывается ключевое слово stream.

Далее в проекте сервера определим следующий класс MessengerService:

using Grpc.Core;
using Metanit;  // пространство имен сервиса MessengerService

public class MessengerService : Messenger.MessengerBase
{
    string[] messages = { "Привет", "Как дела?", "Че молчишь?", "Ты че, спишь?", "Ну пока" };
    public override async Task ServerDataStream(Request request,
        IServerStreamWriter<Response> responseStream,
        ServerCallContext context)
    {
        foreach (var message in messages)
        {
            await responseStream.WriteAsync(new Response { Content = message });
            // для имитации работы делаем задержку в 1 секунду
            await Task.Delay(TimeSpan.FromSeconds(1));
        }
    }
}

Если метод сервера отправляет поток, то такой метод в C# принимает три параметра: сообщение от клиента (здесь объект Request), контекст ServerCallContext и объект записи потока ответа IServerStreamWriter. Объект потока ответа IServerStreamWriter типизируется типом ответа, то есть в примере выше это объект IServerStreamWriter<Response>

Для непосредственной отправки ответа у IServerStreamWriter вызывает метод WriteAsync, в который передается отправляемое сообщение. В данном же случае отправляем в цикле все строки из массива messages:

foreach (var message in messages)
{
    await responseStream.WriteAsync(new Response { Content=message});
    // для имитации работы делаем задержку в 1 секунду
    await Task.Delay(TimeSpan.FromSeconds(1));
}

Потоковая передача сервера завершается, когда происходит выход из метода. То есть в данном случае это произойдет, когда будет отправлена последняя строка из массива messages.

В файле Program.cs подключим сервис MessengerService в приложение:

var builder = WebApplication.CreateBuilder(args);

builder.Services.AddGrpc();

var app = builder.Build();

// встраиваем MessengerService в обработку запроса
app.MapGrpcService<MessengerService>();
app.MapGet("/", () => "Hello World!");

app.Run();

И запустим проект сервера.

Проект клиента

Для тестирования сервиса определим проект консольного приложения. Добавим в него из проекта сервера файл metanit.proto. И в проекте консольного клиента в файле Program.cs определим следующий код:

using Grpc.Net.Client;
using Metanit;  // пространство имен класса Messenger.MessengerClient

// создаем канал для обмена сообщениями с сервером
// параметр - адрес сервера gRPC
using var channel = GrpcChannel.ForAddress("https://localhost:7229");

// создаем клиент
var client = new Messenger.MessengerClient(channel);

// посылаем пустое сообщение и получаем набор сообщений
var serverData = client.ServerDataStream(new Request());

// получаем поток сервера
var responseStream = serverData.ResponseStream;
// с помощью итераторов извлекаем каждое сообщение из потока
while (await responseStream.MoveNext(new CancellationToken()))
{

    Response response = responseStream.Current;
    Console.WriteLine(response.Content);
}

После создания клиента посылаем серверу пустое сообщение:

var serverData = client.ServerDataStream(new Request());

Этот метод возвращает объект Grpc.Core.AsyncServerStreamingCall<Response>, из которого с помощью свойства ResponseStream можно получить непосредственно поток сервера (объект Grpc.Core.IAsyncStreamReader<Response>):

var responseStream = serverData.ResponseStream;

Для считывания данных из потока можно использовать разные стратегии. В данном случае применяем итераторы: вызываем метод MoveNext() для получаения следующего сообщения в потоке. И пока он возвращает true, с помощью свойства Current получаем текущее сообщение из потока

while (await responseStream.MoveNext(new CancellationToken()))
{

    Response response = responseStream.Current;
    Console.WriteLine(response.Content);
}

Запустим клиент. И после отправки запроса к серверу, консольный клиент получит все отправленные сообщения:

Привет
Как дела?
Че молчишь?
Ты че, спишь?
Ну пока

Для упрощения получения данных на строне клиента можно использовать метод ReadAllAsync()

using Grpc.Core;
using Grpc.Net.Client;
using Metanit;  // пространство имен класса Messenger.MessengerClient

// параметр - адрес сервера gRPC
using var channel = GrpcChannel.ForAddress("https://localhost:7229");

var client = new Messenger.MessengerClient(channel);

// посылаем пустое сообщение и получаем набор сообщений
var serverData = client.ServerDataStream(new Request());

// получаем поток сервера
var responseStream = serverData.ResponseStream;

await foreach(var response in responseStream.ReadAllAsync())
{
    Console.WriteLine(response.Content);
}
Помощь сайту
Юмани:
410011174743222
Перевод на карту
Номер карты:
4048415020898850