scufflecloud_ingest/
services.rs

1use std::sync::Arc;
2
3use anyhow::Context;
4use ingest_traits::RtmpConfigInterface;
5use scuffle_context::ContextFutExt;
6
7mod rtmp;
8
9#[derive(Debug)]
10pub struct IngestSvc<G> {
11    _phantom: std::marker::PhantomData<G>,
12}
13
14impl<G> Default for IngestSvc<G> {
15    fn default() -> Self {
16        Self {
17            _phantom: std::marker::PhantomData,
18        }
19    }
20}
21
22impl<G: ingest_traits::Global> scuffle_bootstrap::Service<G> for IngestSvc<G> {
23    async fn run(self, global: Arc<G>, ctx: scuffle_context::Context) -> anyhow::Result<()> {
24        let rtmp = async {
25            let addr = global.rtmp_bind();
26            tracing::info!(addr = %addr, "starting RTMP server");
27
28            let tcp_listener = tokio::net::TcpListener::bind(addr).await.context("bind TCP listener")?;
29
30            while let Some(connection) = tcp_listener.accept().with_context(&ctx).await {
31                match connection {
32                    Ok((stream, _addr)) => {
33                        let session = scuffle_rtmp::ServerSession::new(stream, rtmp::Handler).with_context(ctx.clone());
34
35                        // This is bound by the context because we pass it to the session.
36                        tokio::spawn(async move {
37                            if let Err(err) = session.run().await {
38                                tracing::error!(err = %err, "RTMP session error");
39                                // TODO: what do we do here?
40                            }
41                        });
42                    }
43                    Err(err) => {
44                        tracing::error!(err = %err, "failed to accept connection");
45                        // TODO: what do we do here? can this be fatal?
46                    }
47                }
48            }
49
50            anyhow::Result::Ok(())
51        };
52
53        let rtmps = async {
54            if let Some(rtmps) = global.rtmps_config() {
55                let addr = rtmps.rtmps_bind();
56                tracing::info!(addr = %addr, "starting RTMPS server");
57
58                let tcp_listener = tokio::net::TcpListener::bind(addr).await.context("bind TCP listener")?;
59                let tls_acceptor = tokio_rustls::TlsAcceptor::from(rtmps.rtmps_rustls_server_config());
60
61                while let Some(connection) = tcp_listener.accept().with_context(&ctx).await {
62                    match connection {
63                        Ok((stream, _addr)) => {
64                            let ctx = ctx.clone();
65                            let tls_acceptor = tls_acceptor.clone();
66
67                            tokio::spawn(async move {
68                                match tls_acceptor.accept(stream).with_context(&ctx).await {
69                                    Some(Ok(stream)) => {
70                                        let session =
71                                            scuffle_rtmp::ServerSession::new(stream, rtmp::Handler).with_context(ctx);
72
73                                        // run is bound by the context because we pass it to the session.
74                                        if let Err(err) = session.run().await {
75                                            tracing::error!(err = %err, "RTMP session error");
76                                            // TODO: what do we do here?
77                                        }
78                                    }
79                                    Some(Err(err)) => {
80                                        tracing::error!(err = %err, "failed to accept TLS connection");
81                                    }
82                                    None => {} // context cancelled
83                                }
84                            });
85                        }
86                        Err(err) => {
87                            tracing::error!(err = %err, "failed to accept connection");
88                            // TODO: what do we do here? can this be fatal?
89                        }
90                    }
91                }
92            }
93
94            anyhow::Result::Ok(())
95        };
96
97        let (rtmp, rtmps): (anyhow::Result<()>, anyhow::Result<()>) = tokio::join!(rtmp, rtmps);
98        rtmp?;
99        rtmps?;
100
101        Ok(())
102    }
103}