scufflecloud_ingest/
services.rs1use std::sync::Arc;
2
3use anyhow::Context;
4use ingest_traits::RtmpConfigInterface;
5use scuffle_context::ContextFutExt;
6
7mod rtmp;
8
9#[derive(Debug)]
10pub struct IngestSvc<G> {
11 _phantom: std::marker::PhantomData<G>,
12}
13
14impl<G> Default for IngestSvc<G> {
15 fn default() -> Self {
16 Self {
17 _phantom: std::marker::PhantomData,
18 }
19 }
20}
21
22impl<G: ingest_traits::Global> scuffle_bootstrap::Service<G> for IngestSvc<G> {
23 async fn run(self, global: Arc<G>, ctx: scuffle_context::Context) -> anyhow::Result<()> {
24 let rtmp = async {
25 let addr = global.rtmp_bind();
26 tracing::info!(addr = %addr, "starting RTMP server");
27
28 let tcp_listener = tokio::net::TcpListener::bind(addr).await.context("bind TCP listener")?;
29
30 while let Some(connection) = tcp_listener.accept().with_context(&ctx).await {
31 match connection {
32 Ok((stream, _addr)) => {
33 let session = scuffle_rtmp::ServerSession::new(stream, rtmp::Handler).with_context(ctx.clone());
34
35 tokio::spawn(async move {
37 if let Err(err) = session.run().await {
38 tracing::error!(err = %err, "RTMP session error");
39 }
41 });
42 }
43 Err(err) => {
44 tracing::error!(err = %err, "failed to accept connection");
45 }
47 }
48 }
49
50 anyhow::Result::Ok(())
51 };
52
53 let rtmps = async {
54 if let Some(rtmps) = global.rtmps_config() {
55 let addr = rtmps.rtmps_bind();
56 tracing::info!(addr = %addr, "starting RTMPS server");
57
58 let tcp_listener = tokio::net::TcpListener::bind(addr).await.context("bind TCP listener")?;
59 let tls_acceptor = tokio_rustls::TlsAcceptor::from(rtmps.rtmps_rustls_server_config());
60
61 while let Some(connection) = tcp_listener.accept().with_context(&ctx).await {
62 match connection {
63 Ok((stream, _addr)) => {
64 let ctx = ctx.clone();
65 let tls_acceptor = tls_acceptor.clone();
66
67 tokio::spawn(async move {
68 match tls_acceptor.accept(stream).with_context(&ctx).await {
69 Some(Ok(stream)) => {
70 let session =
71 scuffle_rtmp::ServerSession::new(stream, rtmp::Handler).with_context(ctx);
72
73 if let Err(err) = session.run().await {
75 tracing::error!(err = %err, "RTMP session error");
76 }
78 }
79 Some(Err(err)) => {
80 tracing::error!(err = %err, "failed to accept TLS connection");
81 }
82 None => {} }
84 });
85 }
86 Err(err) => {
87 tracing::error!(err = %err, "failed to accept connection");
88 }
90 }
91 }
92 }
93
94 anyhow::Result::Ok(())
95 };
96
97 let (rtmp, rtmps): (anyhow::Result<()>, anyhow::Result<()>) = tokio::join!(rtmp, rtmps);
98 rtmp?;
99 rtmps?;
100
101 Ok(())
102 }
103}