threadshare/udpsink: Minor cleanup / optimizations for client addition

This commit is contained in:
Sebastian Dröge 2020-03-16 12:30:53 +02:00
parent 9ddcae4ed2
commit ac574cd112

View file

@ -364,25 +364,20 @@ impl UdpSinkPadHandler {
})))
}
async fn configure_client(&self, client: &SocketAddr) -> Result<(), gst::ErrorMessage> {
let (auto_multicast, multicast_loop, ttl_mc, ttl, socket, socket_v6) = {
let state = self.0.read().unwrap();
let settings = state.settings.lock().unwrap();
(
settings.auto_multicast,
settings.multicast_loop,
settings.ttl_mc,
settings.ttl,
Arc::clone(&state.socket),
Arc::clone(&state.socket_v6),
)
};
fn configure_client(
&self,
auto_multicast: bool,
multicast_loop: bool,
ttl_mc: u32,
ttl: u32,
socket: &mut Option<tokio::net::UdpSocket>,
socket_v6: &mut Option<tokio::net::UdpSocket>,
client: &SocketAddr,
) -> Result<(), gst::ErrorMessage> {
if client.ip().is_multicast() {
match client.ip() {
IpAddr::V4(addr) => {
if let Some(socket) = socket.lock().await.as_mut() {
if let Some(socket) = socket.as_mut() {
if auto_multicast {
socket
.join_multicast_v4(addr, Ipv4Addr::new(0, 0, 0, 0))
@ -410,7 +405,7 @@ impl UdpSinkPadHandler {
}
}
IpAddr::V6(addr) => {
if let Some(socket) = socket_v6.lock().await.as_mut() {
if let Some(socket) = socket_v6.as_mut() {
if auto_multicast {
socket.join_multicast_v6(&addr, 0).map_err(|err| {
gst_error_msg!(
@ -434,7 +429,7 @@ impl UdpSinkPadHandler {
} else {
match client.ip() {
IpAddr::V4(_) => {
if let Some(socket) = socket.lock().await.as_mut() {
if let Some(socket) = socket.as_mut() {
socket.set_ttl(ttl).map_err(|err| {
gst_error_msg!(
gst::ResourceError::OpenWrite,
@ -444,7 +439,7 @@ impl UdpSinkPadHandler {
}
}
IpAddr::V6(_) => {
if let Some(socket) = socket_v6.lock().await.as_mut() {
if let Some(socket) = socket_v6.as_mut() {
socket.set_ttl(ttl).map_err(|err| {
gst_error_msg!(
gst::ResourceError::OpenWrite,
@ -490,16 +485,44 @@ impl UdpSinkPadHandler {
)
};
for client in &clients_to_configure {
self.configure_client(&client).await.map_err(|err| {
gst_element_error!(
element,
gst::StreamError::Failed,
["Failed to configure client {:?}: {}", client, err]
);
if !clients_to_configure.is_empty() {
let (auto_multicast, multicast_loop, ttl_mc, ttl, socket, socket_v6) = {
let state = self.0.read().unwrap();
let settings = state.settings.lock().unwrap();
gst::FlowError::Error
})?;
(
settings.auto_multicast,
settings.multicast_loop,
settings.ttl_mc,
settings.ttl,
Arc::clone(&state.socket),
Arc::clone(&state.socket_v6),
)
};
let mut socket = socket.lock().await;
let mut socket_v6 = socket_v6.lock().await;
for client in &clients_to_configure {
self.configure_client(
auto_multicast,
multicast_loop,
ttl_mc,
ttl,
&mut socket,
&mut socket_v6,
&client,
)
.map_err(|err| {
gst_element_error!(
element,
gst::StreamError::Failed,
["Failed to configure client {:?}: {}", client, err]
);
gst::FlowError::Error
})?;
}
}
if do_sync {
@ -917,7 +940,10 @@ impl UdpSink {
state: &mut RwLockWriteGuard<'_, UdpSinkPadHandlerState>,
settings: &StdMutexGuard<'_, Settings>,
) {
state.clients = Arc::new(vec![]);
let clients = Arc::make_mut(&mut state.clients);
clients.clear();
state.clients_to_configure = vec![];
if let Some(host) = &settings.host {
self.add_client(&element, state, &host, settings.port as u16);
@ -940,12 +966,16 @@ impl UdpSink {
};
let addr = SocketAddr::new(addr, port);
if !state.clients.contains(&addr) {
gst_warning!(CAT, obj: element, "Not removing unknown client {:?}", &addr);
return;
}
gst_info!(CAT, obj: element, "Removing client {:?}", addr);
let clients = Arc::make_mut(&mut state.clients);
clients.retain(|addr2| addr != *addr2);
state.clients = Arc::new(clients.to_vec());
state.clients_to_configure.retain(|addr2| addr != *addr2);
}
fn add_client(
@ -964,15 +994,16 @@ impl UdpSink {
};
let addr = SocketAddr::new(addr, port);
let clients = Arc::make_mut(&mut state.clients);
if !clients.contains(&addr) {
gst_info!(CAT, obj: element, "Adding client {:?}", addr);
clients.push(addr);
state.clients_to_configure.push(addr.clone());
} else {
if state.clients.contains(&addr) {
gst_warning!(CAT, obj: element, "Not adding client {:?} again", &addr);
return;
}
gst_info!(CAT, obj: element, "Adding client {:?}", addr);
let clients = Arc::make_mut(&mut state.clients);
clients.push(addr);
state.clients_to_configure.push(addr);
}
}