diff --git a/docs/learning_grpc.md b/docs/learning_grpc.md index a98448b..94f716a 100644 --- a/docs/learning_grpc.md +++ b/docs/learning_grpc.md @@ -489,17 +489,17 @@ Highlight the fact that at the end of the day the gRPC server will be listening ```rust // Auto-generated client stub use es_policy_grpc::policy_service::v1::PolicyManagementServiceClient as PolicyManagementClientStub; +use tonic::{metadata::MetadataValue, Request}; use es_policy_grpc::messages::decline_renewal::request::v1::{ DeclineRenewalRequest, DeclineRenewalReason, CustomerDeclineRenewalReason }; -use tonic::{metadata::MetadataValue, Request}; let mut client = PolicyManagementClientStub::connect("http://[::1]:50051").await?; let mut request = Request::new(DeclineRenewalRequest { - policy_id: Uuid::new_v4(), + policy_id: uuid::Uuid::new_v4(), requested_at: DateTime::now(), description: Some("dummy".into()), reason: DeclineRenewalReason::Customer( @@ -567,9 +567,26 @@ The server has full control over the access of the health checking service. ## Health service definition ```protobuf +syntax = "proto3"; + +package grpc.health.v1; + +message HealthCheckRequest { + string service = 1; +} + +message HealthCheckResponse { + enum ServingStatus { + UNKNOWN = 0; + SERVING = 1; + NOT_SERVING = 2; + SERVICE_UNKNOWN = 3; // Used only by the Watch method. + } + ServingStatus status = 1; +} + service Health { rpc Check(HealthCheckRequest) returns (HealthCheckResponse); - rpc Watch(HealthCheckRequest) returns (stream HealthCheckResponse); } ``` @@ -581,13 +598,17 @@ This definition is provided by the official gRPC docs, each language runtime mig ## Enabling the health service ```rust +use es_policy_grpc::policy_service::v1::PolicyManagementServiceServer as PolicyManagementServerStub; +use tonic_health::server::health_reporter; +use tonic::Server as GrpcServer; + let (health_reporter, health_service) = health_reporter(); health_reporter - .set_serving::>() + .set_serving::>() .await; -Server::builder() +GrpcServer::builder() // Add other layers .layer(..) .add_service(health_service) @@ -628,11 +649,8 @@ pub trait Service { type Error; type Future: Future>; - fn poll_ready( - &mut self, - cx: &mut Context<'_>, - ) -> Poll>; - + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll>; + fn call(&mut self, req: Request) -> Self::Future; } ``` @@ -654,7 +672,7 @@ The processing may depend on calling other services. At some point in the future ```rust pub trait Layer { - type Service; // This can be a middleware + type Service; fn layer(&self, inner: S) -> Self::Service; } @@ -696,10 +714,9 @@ Auth0 M2M authorization ## Authentication service ```rust -// Given a Json Web Key Set client and an audience, -// it will look for an AUTHENTICATION header and try to validate it against the key set. -pub struct JwtAuth { - jwks_client: JwksClient, +// Tower Service used as a JWT Auth middleware. +pub struct JwtAuth { + jwt_decoder: Arc, audience: String, inner: S, } @@ -717,20 +734,19 @@ The JwksClient contains the public keys to verify the signature of incoming toke ## Authentication service -```rust -impl JwtAuth { - async fn authorize(&self, req: Request) -> Result, Response> +```rust +impl JwtAuth { + async fn authorize(&self, req: http::Request) -> Result, http::Response> where Res: Default, { - // Error handling has been omitted for simplicity purposes - let token = req.headers().get(AUTHORIZATION).strip_prefix("Bearer "); + let token = req.headers() + .get(http::AUTHORIZATION) + .ok_or_else(make_unauthorized_response)? + .strip_prefix("Bearer ") + .ok_or_else(make_unauthorized_response)? - if let Err(_err) = self - .jwks_client - .decode(token, &[self.audience.clone()]) - .await - { + if let Err(_err) = self.jwt_decoder.decode::(token, &self.audience).await { return Err(make_unauthorized_response()); } @@ -752,14 +768,24 @@ We assume that `make_unauthorized_response` will build a gRPC unauthorized respo ## Authentication service ```rust -impl Service> for JwtAuth -where - S: Service, Response = http::Response>, -{ - // poll_ready is mandatory but we omit it here - // as well as type definitions. +use std::task::{Context, Poll}; +use http::{Request, Response}; - fn call(&mut self, req: http::Request) -> Self::Future { +impl Service> for JwtAuth +where + S: Service, Response = Response>, + T: JwtDecoder, + // .. Skipping other constraints +{ + type Response = S::Response; + type Error = S::Error; + type Future = BoxFuture<'static, Result>; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.inner.poll_ready(cx) + } + + fn call(&mut self, req: Request) -> Self::Future { let mut this = self.clone(); async move { @@ -775,7 +801,7 @@ where note: -Finally implementing the service, pretty straight-forward +Note the use of `async move` inside `call` given that `call` is not defined as an async function on the trait definition --- @@ -784,15 +810,15 @@ Finally implementing the service, pretty straight-forward ```rust // Reusable Tower Layer meant to wrap // a JWT Auth middleware Service around a generic service -pub struct JwtAuthLayer { - jwks_client: JwksClient, +pub struct JwtAuthLayer { + jwt_decoder: Arc, audience: String, } -impl JwtAuthLayer { - pub fn new(jwks_client: JwksClient, audience: &str) -> Self { +impl JwtAuthLayer { + pub fn new(jwt_decoder: T, audience: impl Into) -> Self { Self { - jwks_client, + jwt_decoder: Arc::new(jwt_decoder), audience: audience.into(), } } @@ -808,12 +834,15 @@ Although confusing, the purpose of the layer is to make the usage of the middlew ## Authentication layer ```rust -impl Layer for JwtAuthLayer { - type Service = JwtAuth; +impl Layer for JwtAuthLayer +where + T: JwtDecoder, +{ + type Service = JwtAuth; fn layer(&self, inner: S) -> Self::Service { JwtAuth { - jwks_client: self.jwks_client.clone(), + jwt_decoder: self.jwt_decoder.clone(), audience: self.audience.clone(), inner, } @@ -829,13 +858,19 @@ What is done inside of the `layer` function could just be done manually, but it ## Attaching it to our gRPC server ```rust +use es_policy_grpc::policy_service::v1::PolicyManagementServiceServer as PolicyManagementServerStub; +use tonic::Server as GrpcServer; +use tower::ServiceBuilder; + +// ... + let authenticated_apis = ServiceBuilder::new() - .layer(JwtAuthLayer::new(jwks_client, AUD_POLICY_MANAGEMENT)) - .service(PolicyManagementServiceServer::new( + .layer(JwtAuthLayer::new(jwks_client, AUDIENCE)) + .service(PolicyManagementServerStub::new( PolicyManagementServiceImpl::new(application), )); -let server = Server::builder().add_service(authenticated_apis); +let server = GrpcServer::builder().add_service(authenticated_apis); ``` note: @@ -895,6 +930,7 @@ Explain how this is a simplified version of the real implementation in `prima_to ```rust fn on_response(response: &http::Response, span: &tracing::Span) { let mut headers = response.headers().clone(); + redact_sensitive_headers(&mut headers); let code = tonic::Status::from_header_map(&headers) .map(|status| status.code()) @@ -903,11 +939,18 @@ fn on_response(response: &http::Response, span: &tracing::Span) { span.record("rpc.grpc.status_code", code as i32); span.record("grpc.response.header", format!("{:?}", headers)); - // The match has been simplified for the slides purpose - if matches!(code, tonic::Code::Unknown) { + if matches!( + code, + tonic::Code::Unknown + | tonic::Code::DeadlineExceeded + | tonic::Code::Unimplemented + | tonic::Code::Internal + | tonic::Code::Unavailable + | tonic::Code::DataLoss + ) { span.record("otel.status_code", "ERROR"); } -} +} ``` note: @@ -919,9 +962,11 @@ We will see in a second how the span we receive by parameters is the same span w ## Tracing service ```rust -pub struct OpenTelemetryServerTracing { +// Tower Service acting as a Tracing middleware +// for gRPC requests and responses +pub struct OpenTelemetryTracer { inner: S, -} +} ``` note: @@ -933,30 +978,36 @@ We need to implement the service that will act as the tracing middleware ## Tracing service ```rust -impl Service> for OpenTelemetryServerTracing +use std::task::{Context, Poll}; +use http::{Request, Response}; +use opentelemetry_http::HeaderExtractor; +use opentelemetry_sdk::propagation::TraceContextPropagator; + +impl Service> for OpenTelemetryTracer where S: Service, Response = Response>, + S::Future: Send + 'static, { type Response = S::Response; type Error = S::Error; - type Future = Future>; + type Future = BoxFuture<'static, Result>; - #[inline] fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { self.inner.poll_ready(cx) } - fn call(&mut self, req: ::http::Request) -> Self::Future { + fn call(&mut self, req: Request) -> Self::Future { let parent_context = TraceContextPropagator::new().extract(&HeaderExtractor(req.headers())); + let span = make_span(&req); span.set_parent(parent_context); - self.inner - .call(req) - .instrument(span.clone()) - .inspect_ok(move |response| on_response(response, &span)) + self.inner.call(req).instrument(span.clone()).inspect_ok(move |response| { + on_response(response, &span); + }) + .boxed() } -} +} ``` note: @@ -971,9 +1022,9 @@ Then that span is used as the parent span for the inner service call. ## Tracing layer ```rust -pub struct OpenTelemetryServerTracingLayer {} +pub struct OpenTelemetryTracingLayer {} -impl OpenTelemetryServerTracingLayer { +impl OpenTelemetryTracingLayer { pub fn new() -> Self { Self {} } @@ -990,8 +1041,8 @@ In this case we don't need any data to be added to the layer. ## Tracing layer ```rust -impl Layer for OpenTelemetryServerTracingLayer { - type Service = OpenTelemetryServerTracing; +impl Layer for OpenTelemetryTracingLayer { + type Service = OpenTelemetryTracer; fn layer(&self, inner: S) -> Self::Service { OpenTelemetryServerTracing { inner } @@ -1003,8 +1054,10 @@ impl Layer for OpenTelemetryServerTracingLayer { ## Attaching it to our gRPC server ```rust -Server::builder() - .layer(OpenTelemetryServerTracingLayer::new()) +use tonic::Server as GrpcServer; + +GrpcServer::builder() + .layer(OpenTelemetryTracingLayer::new()) // layer other services to benefit from tracing .serve(addr) .await?;