refactor: return a true stream instead of a vector in the dynamic response

This commit is contained in:
JasterV 2026-03-01 19:52:48 +01:00
parent 91ca259f71
commit 8d63209b5f
5 changed files with 24 additions and 18 deletions

View file

@ -4,7 +4,7 @@
//! but uses a local, in-memory `DescriptorPool` (Static schema) to resolve messages.
use super::{DynamicRequest, DynamicResponse, GrancClient, OnlineWithoutReflection};
use crate::{BoxError, client::OfflineReflectionState, grpc::client::GrpcRequestError};
use futures_util::{Stream, StreamExt};
use futures_util::{Stream, StreamExt, stream};
use http_body::Body as HttpBody;
use std::fmt::Debug;
@ -76,8 +76,10 @@ where
.server_streaming(method, request.body, request.headers)
.await?
{
Ok(stream) => Ok(DynamicResponse::Streaming(Ok(stream.collect().await))),
Err(status) => Ok(DynamicResponse::Streaming(Err(status))),
Ok(stream) => Ok(DynamicResponse::Streaming(stream.boxed())),
Err(status) => Ok(DynamicResponse::Streaming(
stream::once(async { Err(status) }).boxed(),
)),
},
(true, false) => {
let input_stream =
@ -98,8 +100,10 @@ where
.bidirectional_streaming(method, input_stream, request.headers)
.await?
{
Ok(stream) => Ok(DynamicResponse::Streaming(Ok(stream.collect().await))),
Err(status) => Ok(DynamicResponse::Streaming(Err(status))),
Ok(stream) => Ok(DynamicResponse::Streaming(stream.boxed())),
Err(status) => Ok(DynamicResponse::Streaming(
stream::once(async { Err(status) }).boxed(),
)),
}
}
}

View file

@ -1,3 +1,4 @@
use futures_util::stream::BoxStream;
use prost_reflect::{EnumDescriptor, MessageDescriptor, ServiceDescriptor};
use std::fmt::Debug;
@ -17,12 +18,11 @@ pub struct DynamicRequest {
}
/// The result of a dynamic gRPC call.
#[derive(Debug, Clone)]
pub enum DynamicResponse {
/// A single response message (for Unary and Client Streaming calls).
Unary(Result<serde_json::Value, tonic::Status>),
/// A stream of response messages (for Server Streaming and Bidirectional calls).
Streaming(Result<Vec<Result<serde_json::Value, tonic::Status>>, tonic::Status>),
Streaming(BoxStream<'static, Result<serde_json::Value, tonic::Status>>),
}
/// A generic wrapper for different types of Protobuf descriptors.

View file

@ -104,10 +104,7 @@ where
method: MethodDescriptor,
payload: serde_json::Value,
headers: Vec<(String, String)>,
) -> Result<
Result<impl Stream<Item = Result<serde_json::Value, tonic::Status>>, tonic::Status>,
GrpcRequestError,
> {
) -> Result<Result<tonic::Streaming<serde_json::Value>, tonic::Status>, GrpcRequestError> {
self.client
.ready()
.await
@ -163,10 +160,7 @@ where
method: MethodDescriptor,
payload_stream: impl Stream<Item = serde_json::Value> + Send + 'static,
headers: Vec<(String, String)>,
) -> Result<
Result<impl Stream<Item = Result<serde_json::Value, tonic::Status>>, tonic::Status>,
GrpcRequestError,
> {
) -> Result<Result<tonic::Streaming<serde_json::Value>, tonic::Status>, GrpcRequestError> {
self.client
.ready()
.await

View file

@ -2,6 +2,7 @@ use echo_service_impl::EchoServiceImpl;
use granc_core::client::{DynamicRequest, DynamicResponse, GrancClient, Online, online};
use granc_core::reflection::client::ReflectionResolveError;
use granc_test_support::echo_service::{EchoServiceServer, FILE_DESCRIPTOR_SET};
use tokio_stream::StreamExt;
use tonic::Code;
use tonic::service::Routes;
@ -62,7 +63,9 @@ async fn test_reflection_server_streaming_success() {
let res = client.dynamic(req).await.unwrap();
match res {
DynamicResponse::Streaming(Ok(stream)) => {
DynamicResponse::Streaming(stream) => {
let stream: Vec<_> = stream.collect().await;
assert_eq!(stream.len(), 3);
assert_eq!(stream[0].as_ref().unwrap()["message"], "stream - seq 0");
assert_eq!(stream[1].as_ref().unwrap()["message"], "stream - seq 1");

View file

@ -1,4 +1,5 @@
use echo_service_impl::EchoServiceImpl;
use futures_util::StreamExt;
use granc_core::client::{
DynamicRequest, DynamicResponse, GrancClient, OnlineWithoutReflection,
online_without_reflection,
@ -50,7 +51,9 @@ async fn test_dynamic_server_streaming_success() {
let res = client.dynamic(req).await.unwrap();
match res {
DynamicResponse::Streaming(Ok(stream)) => {
DynamicResponse::Streaming(stream) => {
let stream: Vec<_> = stream.collect().await;
assert_eq!(stream.len(), 3);
assert_eq!(stream[0].as_ref().unwrap()["message"], "stream - seq 0");
assert_eq!(stream[1].as_ref().unwrap()["message"], "stream - seq 1");
@ -101,7 +104,9 @@ async fn test_dynamic_bidirectional_streaming_success() {
let res = client.dynamic(req).await.unwrap();
match res {
DynamicResponse::Streaming(Ok(stream)) => {
DynamicResponse::Streaming(stream) => {
let stream: Vec<_> = stream.collect().await;
assert_eq!(stream.len(), 2);
assert_eq!(stream[0].as_ref().unwrap()["message"], "echo: Ping");
assert_eq!(stream[1].as_ref().unwrap()["message"], "echo: Pong");