Update code in slides

This commit is contained in:
JasterV 2025-06-15 23:43:34 +02:00
parent fc7326bc4e
commit 8557563bee

View file

@ -489,17 +489,17 @@ Highlight the fact that at the end of the day the gRPC server will be listening
```rust ```rust
// Auto-generated client stub // Auto-generated client stub
use es_policy_grpc::policy_service::v1::PolicyManagementServiceClient as PolicyManagementClientStub; use es_policy_grpc::policy_service::v1::PolicyManagementServiceClient as PolicyManagementClientStub;
use tonic::{metadata::MetadataValue, Request};
use es_policy_grpc::messages::decline_renewal::request::v1::{ use es_policy_grpc::messages::decline_renewal::request::v1::{
DeclineRenewalRequest, DeclineRenewalRequest,
DeclineRenewalReason, DeclineRenewalReason,
CustomerDeclineRenewalReason CustomerDeclineRenewalReason
}; };
use tonic::{metadata::MetadataValue, Request};
let mut client = PolicyManagementClientStub::connect("http://[::1]:50051").await?; let mut client = PolicyManagementClientStub::connect("http://[::1]:50051").await?;
let mut request = Request::new(DeclineRenewalRequest { let mut request = Request::new(DeclineRenewalRequest {
policy_id: Uuid::new_v4(), policy_id: uuid::Uuid::new_v4(),
requested_at: DateTime::now(), requested_at: DateTime::now(),
description: Some("dummy".into()), description: Some("dummy".into()),
reason: DeclineRenewalReason::Customer( reason: DeclineRenewalReason::Customer(
@ -567,9 +567,26 @@ The server has full control over the access of the health checking service.
## Health service definition ## Health service definition
```protobuf ```protobuf
syntax = "proto3";
package grpc.health.v1;
message HealthCheckRequest {
string service = 1;
}
message HealthCheckResponse {
enum ServingStatus {
UNKNOWN = 0;
SERVING = 1;
NOT_SERVING = 2;
SERVICE_UNKNOWN = 3; // Used only by the Watch method.
}
ServingStatus status = 1;
}
service Health { service Health {
rpc Check(HealthCheckRequest) returns (HealthCheckResponse); rpc Check(HealthCheckRequest) returns (HealthCheckResponse);
rpc Watch(HealthCheckRequest) returns (stream HealthCheckResponse); rpc Watch(HealthCheckRequest) returns (stream HealthCheckResponse);
} }
``` ```
@ -581,13 +598,17 @@ This definition is provided by the official gRPC docs, each language runtime mig
## Enabling the health service ## Enabling the health service
```rust ```rust
use es_policy_grpc::policy_service::v1::PolicyManagementServiceServer as PolicyManagementServerStub;
use tonic_health::server::health_reporter;
use tonic::Server as GrpcServer;
let (health_reporter, health_service) = health_reporter(); let (health_reporter, health_service) = health_reporter();
health_reporter health_reporter
.set_serving::<PolicyManagementServiceServer<PolicyManagementServiceImpl>>() .set_serving::<PolicyManagementServerStub<PolicyManagementServiceImpl>>()
.await; .await;
Server::builder() GrpcServer::builder()
// Add other layers // Add other layers
.layer(..) .layer(..)
.add_service(health_service) .add_service(health_service)
@ -628,11 +649,8 @@ pub trait Service<Request> {
type Error; type Error;
type Future: Future<Output = Result<Self::Response, Self::Error>>; type Future: Future<Output = Result<Self::Response, Self::Error>>;
fn poll_ready( fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>;
&mut self,
cx: &mut Context<'_>,
) -> Poll<Result<(), Self::Error>>;
fn call(&mut self, req: Request) -> Self::Future; fn call(&mut self, req: Request) -> Self::Future;
} }
``` ```
@ -654,7 +672,7 @@ The processing may depend on calling other services. At some point in the future
```rust ```rust
pub trait Layer<S> { pub trait Layer<S> {
type Service; // This can be a middleware type Service;
fn layer(&self, inner: S) -> Self::Service; fn layer(&self, inner: S) -> Self::Service;
} }
@ -696,10 +714,9 @@ Auth0 M2M authorization
## Authentication service ## Authentication service
```rust ```rust
// Given a Json Web Key Set client and an audience, // Tower Service used as a JWT Auth middleware.
// it will look for an AUTHENTICATION header and try to validate it against the key set. pub struct JwtAuth<T: JwtDecoder, S> {
pub struct JwtAuth<S> { jwt_decoder: Arc<T>,
jwks_client: JwksClient<WebSource>,
audience: String, audience: String,
inner: S, inner: S,
} }
@ -717,20 +734,19 @@ The JwksClient contains the public keys to verify the signature of incoming toke
## Authentication service ## Authentication service
```rust ```rust
impl<S> JwtAuth<S> { impl<T: JwtDecoder, S> JwtAuth<T, S> {
async fn authorize<Req, Res>(&self, req: Request<Req>) -> Result<Request<Req>, Response<Res>> async fn authorize<Req, Res>(&self, req: http::Request<Req>) -> Result<http::Request<Req>, http::Response<Res>>
where where
Res: Default, Res: Default,
{ {
// Error handling has been omitted for simplicity purposes let token = req.headers()
let token = req.headers().get(AUTHORIZATION).strip_prefix("Bearer "); .get(http::AUTHORIZATION)
.ok_or_else(make_unauthorized_response)?
.strip_prefix("Bearer ")
.ok_or_else(make_unauthorized_response)?
if let Err(_err) = self if let Err(_err) = self.jwt_decoder.decode::<serde_json::Value>(token, &self.audience).await {
.jwks_client
.decode(token, &[self.audience.clone()])
.await
{
return Err(make_unauthorized_response()); return Err(make_unauthorized_response());
} }
@ -752,14 +768,24 @@ We assume that `make_unauthorized_response` will build a gRPC unauthorized respo
## Authentication service ## Authentication service
```rust ```rust
impl<Req, Res, S> Service<http::Request<Req>> for JwtAuth<S> use std::task::{Context, Poll};
where use http::{Request, Response};
S: Service<http::Request<Req>, Response = http::Response<Res>>,
{
// poll_ready is mandatory but we omit it here
// as well as type definitions.
fn call(&mut self, req: http::Request<Req>) -> Self::Future { impl<Req, Res, S, T> Service<Request<Req>> for JwtAuth<T, S>
where
S: Service<Request<Req>, Response = Response<Res>>,
T: JwtDecoder,
// .. Skipping other constraints
{
type Response = S::Response;
type Error = S::Error;
type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_ready(cx)
}
fn call(&mut self, req: Request<Req>) -> Self::Future {
let mut this = self.clone(); let mut this = self.clone();
async move { async move {
@ -775,7 +801,7 @@ where
note: note:
Finally implementing the service, pretty straight-forward Note the use of `async move` inside `call` given that `call` is not defined as an async function on the trait definition
--- ---
@ -784,15 +810,15 @@ Finally implementing the service, pretty straight-forward
```rust ```rust
// Reusable Tower Layer meant to wrap // Reusable Tower Layer meant to wrap
// a JWT Auth middleware Service around a generic service // a JWT Auth middleware Service around a generic service
pub struct JwtAuthLayer { pub struct JwtAuthLayer<T: JwtDecoder> {
jwks_client: JwksClient<WebSource>, jwt_decoder: Arc<T>,
audience: String, audience: String,
} }
impl JwtAuthLayer { impl<T: JwtDecoder> JwtAuthLayer<T> {
pub fn new(jwks_client: JwksClient<WebSource>, audience: &str) -> Self { pub fn new(jwt_decoder: T, audience: impl Into<String>) -> Self {
Self { Self {
jwks_client, jwt_decoder: Arc::new(jwt_decoder),
audience: audience.into(), audience: audience.into(),
} }
} }
@ -808,12 +834,15 @@ Although confusing, the purpose of the layer is to make the usage of the middlew
## Authentication layer ## Authentication layer
```rust ```rust
impl<S> Layer<S> for JwtAuthLayer { impl<T, S> Layer<S> for JwtAuthLayer<T>
type Service = JwtAuth<S>; where
T: JwtDecoder,
{
type Service = JwtAuth<T, S>;
fn layer(&self, inner: S) -> Self::Service { fn layer(&self, inner: S) -> Self::Service {
JwtAuth { JwtAuth {
jwks_client: self.jwks_client.clone(), jwt_decoder: self.jwt_decoder.clone(),
audience: self.audience.clone(), audience: self.audience.clone(),
inner, inner,
} }
@ -829,13 +858,19 @@ What is done inside of the `layer` function could just be done manually, but it
## Attaching it to our gRPC server ## Attaching it to our gRPC server
```rust ```rust
use es_policy_grpc::policy_service::v1::PolicyManagementServiceServer as PolicyManagementServerStub;
use tonic::Server as GrpcServer;
use tower::ServiceBuilder;
// ...
let authenticated_apis = ServiceBuilder::new() let authenticated_apis = ServiceBuilder::new()
.layer(JwtAuthLayer::new(jwks_client, AUD_POLICY_MANAGEMENT)) .layer(JwtAuthLayer::new(jwks_client, AUDIENCE))
.service(PolicyManagementServiceServer::new( .service(PolicyManagementServerStub::new(
PolicyManagementServiceImpl::new(application), PolicyManagementServiceImpl::new(application),
)); ));
let server = Server::builder().add_service(authenticated_apis); let server = GrpcServer::builder().add_service(authenticated_apis);
``` ```
note: note:
@ -895,6 +930,7 @@ Explain how this is a simplified version of the real implementation in `prima_to
```rust ```rust
fn on_response<B>(response: &http::Response<B>, span: &tracing::Span) { fn on_response<B>(response: &http::Response<B>, span: &tracing::Span) {
let mut headers = response.headers().clone(); let mut headers = response.headers().clone();
redact_sensitive_headers(&mut headers);
let code = tonic::Status::from_header_map(&headers) let code = tonic::Status::from_header_map(&headers)
.map(|status| status.code()) .map(|status| status.code())
@ -903,11 +939,18 @@ fn on_response<B>(response: &http::Response<B>, span: &tracing::Span) {
span.record("rpc.grpc.status_code", code as i32); span.record("rpc.grpc.status_code", code as i32);
span.record("grpc.response.header", format!("{:?}", headers)); span.record("grpc.response.header", format!("{:?}", headers));
// The match has been simplified for the slides purpose if matches!(
if matches!(code, tonic::Code::Unknown) { code,
tonic::Code::Unknown
| tonic::Code::DeadlineExceeded
| tonic::Code::Unimplemented
| tonic::Code::Internal
| tonic::Code::Unavailable
| tonic::Code::DataLoss
) {
span.record("otel.status_code", "ERROR"); span.record("otel.status_code", "ERROR");
} }
} }
``` ```
note: note:
@ -919,9 +962,11 @@ We will see in a second how the span we receive by parameters is the same span w
## Tracing service ## Tracing service
```rust ```rust
pub struct OpenTelemetryServerTracing<S> { // Tower Service acting as a Tracing middleware
// for gRPC requests and responses
pub struct OpenTelemetryTracer<S> {
inner: S, inner: S,
} }
``` ```
note: note:
@ -933,30 +978,36 @@ We need to implement the service that will act as the tracing middleware
## Tracing service ## Tracing service
```rust ```rust
impl<Req, Res, S> Service<Request<Req>> for OpenTelemetryServerTracing<S> use std::task::{Context, Poll};
use http::{Request, Response};
use opentelemetry_http::HeaderExtractor;
use opentelemetry_sdk::propagation::TraceContextPropagator;
impl<Req, Res, S> Service<Request<Req>> for OpenTelemetryTracer<S>
where where
S: Service<Request<Req>, Response = Response<Res>>, S: Service<Request<Req>, Response = Response<Res>>,
S::Future: Send + 'static,
{ {
type Response = S::Response; type Response = S::Response;
type Error = S::Error; type Error = S::Error;
type Future = Future<Result<Self::Response, Self::Error>>; type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
#[inline]
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_ready(cx) self.inner.poll_ready(cx)
} }
fn call(&mut self, req: ::http::Request<ReqB>) -> Self::Future { fn call(&mut self, req: Request<Req>) -> Self::Future {
let parent_context = TraceContextPropagator::new().extract(&HeaderExtractor(req.headers())); let parent_context = TraceContextPropagator::new().extract(&HeaderExtractor(req.headers()));
let span = make_span(&req); let span = make_span(&req);
span.set_parent(parent_context); span.set_parent(parent_context);
self.inner self.inner.call(req).instrument(span.clone()).inspect_ok(move |response| {
.call(req) on_response(response, &span);
.instrument(span.clone()) })
.inspect_ok(move |response| on_response(response, &span)) .boxed()
} }
} }
``` ```
note: note:
@ -971,9 +1022,9 @@ Then that span is used as the parent span for the inner service call.
## Tracing layer ## Tracing layer
```rust ```rust
pub struct OpenTelemetryServerTracingLayer {} pub struct OpenTelemetryTracingLayer {}
impl OpenTelemetryServerTracingLayer { impl OpenTelemetryTracingLayer {
pub fn new() -> Self { pub fn new() -> Self {
Self {} Self {}
} }
@ -990,8 +1041,8 @@ In this case we don't need any data to be added to the layer.
## Tracing layer ## Tracing layer
```rust ```rust
impl<S> Layer<S> for OpenTelemetryServerTracingLayer { impl<S> Layer<S> for OpenTelemetryTracingLayer {
type Service = OpenTelemetryServerTracing<S>; type Service = OpenTelemetryTracer<S>;
fn layer(&self, inner: S) -> Self::Service { fn layer(&self, inner: S) -> Self::Service {
OpenTelemetryServerTracing { inner } OpenTelemetryServerTracing { inner }
@ -1003,8 +1054,10 @@ impl<S> Layer<S> for OpenTelemetryServerTracingLayer {
## Attaching it to our gRPC server ## Attaching it to our gRPC server
```rust ```rust
Server::builder() use tonic::Server as GrpcServer;
.layer(OpenTelemetryServerTracingLayer::new())
GrpcServer::builder()
.layer(OpenTelemetryTracingLayer::new())
// layer other services to benefit from tracing // layer other services to benefit from tracing
.serve(addr) .serve(addr)
.await?; .await?;