scuffle_http/backend/hyper/
handler.rs

1use hyper_util::rt::{TokioExecutor, TokioIo, TokioTimer};
2use hyper_util::server::conn::auto;
3use scuffle_context::ContextFutExt;
4use tokio::io::{AsyncRead, AsyncWrite};
5
6use crate::error::HttpError;
7use crate::service::{HttpService, HttpServiceFactory};
8
9/// Helper function used by hyper server to handle incoming connections.
10pub(crate) async fn handle_connection<F, S, I>(
11    ctx: scuffle_context::Context,
12    service: S,
13    extra_extensions: http::Extensions,
14    io: I,
15    http1: bool,
16    http2: bool,
17) -> Result<(), HttpError<F>>
18where
19    F: HttpServiceFactory<Service = S>,
20    F::Error: std::error::Error,
21    S: HttpService + Clone + Send + 'static,
22    S::Error: std::error::Error + Send + Sync,
23    S::ResBody: Send,
24    <S::ResBody as http_body::Body>::Data: Send,
25    <S::ResBody as http_body::Body>::Error: std::error::Error + Send + Sync,
26    I: AsyncRead + AsyncWrite + Unpin + Send + 'static,
27{
28    let io = TokioIo::new(io);
29
30    let hyper_proxy_service = hyper::service::service_fn(move |req: http::Request<hyper::body::Incoming>| {
31        let mut service = service.clone();
32        let extra_extensions = extra_extensions.clone();
33
34        async move {
35            let (mut parts, body) = req.into_parts();
36            parts.extensions.extend(extra_extensions);
37
38            let body = crate::body::IncomingBody::from(body);
39            let req = http::Request::from_parts(parts, body);
40            service.call(req).await
41        }
42    });
43
44    let mut builder = auto::Builder::new(TokioExecutor::new());
45
46    if http1 && http2 {
47        #[cfg(feature = "http1")]
48        builder.http1().timer(TokioTimer::new());
49
50        #[cfg(feature = "http2")]
51        builder.http2().timer(TokioTimer::new());
52
53        builder
54            .serve_connection_with_upgrades(io, hyper_proxy_service)
55            .with_context(ctx)
56            .await
57            .transpose()
58            .map_err(HttpError::HyperConnection)?;
59    } else if http1 {
60        #[cfg(not(feature = "http1"))]
61        unreachable!("http1 enabled but http1 feature disabled");
62
63        #[cfg(feature = "http1")]
64        builder
65            .http1_only()
66            .serve_connection_with_upgrades(io, hyper_proxy_service)
67            .with_context(ctx)
68            .await
69            .transpose()
70            .map_err(HttpError::HyperConnection)?;
71    } else if http2 {
72        #[cfg(not(feature = "http2"))]
73        unreachable!("http2 enabled but http2 feature disabled");
74
75        #[cfg(feature = "http2")]
76        builder
77            .http2_only()
78            .serve_connection_with_upgrades(io, hyper_proxy_service)
79            .with_context(ctx)
80            .await
81            .transpose()
82            .map_err(HttpError::HyperConnection)?;
83    } else {
84        #[cfg(feature = "tracing")]
85        tracing::warn!("both http1 and http2 are disabled, closing connection");
86    }
87
88    Ok(())
89}