webrtcsrc: add capability to initiate offer and handle answer

add the handler for session-request signal from the signaller
to initiate an offer

in the session-description signal handler, if the sdp is an answer type
handle the answer from the remote peer and create the src pad using
the media caps of sendonly type media
This commit is contained in:
Taruntej Kanakamalla 2024-04-17 20:46:59 +05:30
parent ac9ef0a8d2
commit f2783fda93

View file

@ -292,6 +292,7 @@ struct SignallerSignals {
request_meta: glib::SignalHandlerId,
session_description: glib::SignalHandlerId,
handle_ice: glib::SignalHandlerId,
session_requested: glib::SignalHandlerId,
}
impl Session {
@ -572,6 +573,210 @@ impl Session {
}
}
fn generate_offer(&self, element: &super::BaseWebRTCSrc) {
let sess_id = self.id.clone();
let webrtcbin = self.webrtcbin();
let direction = gst_webrtc::WebRTCRTPTransceiverDirection::Recvonly;
let settings = element.imp().settings.lock().unwrap();
let caps = settings
.video_codecs
.iter()
.chain(settings.audio_codecs.iter())
.map(|codec| {
let name = &codec.name;
let (media, clock_rate, pt) = if codec.stream_type == gst::StreamType::AUDIO {
("audio", 48000, 96)
} else {
//video stream type
("video", 90000, 101)
};
let mut caps = gst::Caps::new_empty();
{
let caps = caps.get_mut().unwrap();
let s = gst::Structure::builder("application/x-rtp")
.field("media", media)
.field("payload", pt)
.field("encoding-name", name.as_str())
.field("clock-rate", clock_rate)
.build();
caps.append_structure(s);
}
caps
});
for c in caps {
gst::info!(
CAT,
obj: element,
"Adding transceiver with caps: {c:#?}"
);
let transceiver = webrtcbin.emit_by_name::<gst_webrtc::WebRTCRTPTransceiver>(
"add-transceiver",
&[&direction, &c],
);
transceiver.set_property("do_nack", settings.do_retransmission);
transceiver.set_property("fec-type", gst_webrtc::WebRTCFECType::UlpRed);
}
let webrtcbin_weak = webrtcbin.downgrade();
let promise = gst::Promise::with_change_func(
glib::clone!(@weak element as ele => move |reply| {
let Some(webrtcbin) = webrtcbin_weak.upgrade() else {
return;
};
let reply = match reply {
Ok(Some(reply)) => reply,
Ok(None) => {
gst::error!(CAT, obj: ele, "generate offer::Promise returned with no reply");
return;
}
Err(e) => {
gst::error!(CAT, obj: ele, "generate offer::Promise returned with error {:?}", e);
return;
}
};
if let Ok(offer_sdp) = reply
.value("offer")
.map(|offer| offer.get::<gst_webrtc::WebRTCSessionDescription>().unwrap())
{
gst::debug!(
CAT,
obj: ele,
"Setting local description: {}",
offer_sdp.sdp().to_string()
);
webrtcbin.emit_by_name::<()>(
"set-local-description",
&[&offer_sdp, &None::<gst::Promise>],
);
gst::log!(CAT, obj: ele, "Sending SDP, {}", offer_sdp.sdp().to_string());
let signaller = ele.imp().signaller();
signaller.send_sdp(sess_id.as_str(), &offer_sdp);
} else {
let error = reply
.value("error")
.expect("structure must have an error value")
.get::<glib::Error>()
.expect("value must be a GLib error");
gst::error!(CAT, obj: ele, "generate offer::Promise returned with error: {}", error);
}
}),
);
webrtcbin
.clone()
.emit_by_name::<()>("create-offer", &[&None::<gst::Structure>, &promise]);
}
fn handle_answer(
&self,
answer: &gst_webrtc::WebRTCSessionDescription,
element: &super::BaseWebRTCSrc,
) {
//FIXME: refactor the common parts of this function and handle_offer()
gst::debug!(
CAT,
obj: element,
"Setting remote description: {}",
answer.sdp().to_string()
);
let webrtcbin = self.webrtcbin();
for (i, media) in answer.sdp().medias().enumerate() {
let codec_names = {
let settings = element.imp().settings.lock().unwrap();
settings
.video_codecs
.iter()
.chain(settings.audio_codecs.iter())
.map(|codec| codec.name.clone())
.collect::<HashSet<String>>()
};
let caps = media
.formats()
.filter_map(|format| {
format.parse::<i32>().ok().and_then(|pt| {
let mut mediacaps = media.caps_from_media(pt)?;
let s = mediacaps.structure(0).unwrap();
if !codec_names.contains(s.get::<&str>("encoding-name").ok()?) {
return None;
}
// filter the remote media whose direction is not sendonly
media.attribute_val("sendonly")?;
let mut filtered_s = gst::Structure::new_empty("application/x-rtp");
filtered_s.extend(s.iter().filter_map(|(key, value)| {
if key.starts_with("rtcp-") {
None
} else {
Some((key, value.to_owned()))
}
}));
if media
.attributes_to_caps(mediacaps.get_mut().unwrap())
.is_err()
{
gst::warning!(
CAT,
obj: element,
"Failed to retrieve attributes from media!"
);
return None;
}
let s = mediacaps.structure(0).unwrap();
filtered_s.extend(s.iter().filter_map(|(key, value)| {
if key.starts_with("extmap-") {
return Some((key, value.to_owned()));
}
None
}));
Some(filtered_s)
})
})
.collect::<gst::Caps>();
if !caps.is_empty() {
let stream_id = self.get_stream_id(None, Some(i as u32)).unwrap();
if !element
.imp()
.create_and_probe_src_pad(&caps, &stream_id, self)
{
gst::error!(
CAT,
obj: element,
"Failed to create src pad with caps {:?}",
caps
);
}
} else {
gst::info!(
CAT,
obj: element,
"Not using media: {media:#?} as it doesn't match our codec restrictions"
);
}
}
webrtcbin.emit_by_name::<()>("set-remote-description", &[&answer, &None::<gst::Promise>]);
}
fn handle_offer(
&self,
offer: &gst_webrtc::WebRTCSessionDescription,
@ -887,18 +1092,49 @@ impl BaseWebRTCSrc {
_signaller: glib::Object,
session_id: &str,
desc: &gst_webrtc::WebRTCSessionDescription| {
assert_eq!(desc.type_(), gst_webrtc::WebRTCSDPType::Offer);
let this = instance.imp();
gst::info!(CAT, imp: this, "got sdp offer");
let state = this.state.lock().unwrap();
let Some(session) = state.sessions.get(session_id) else {
gst::error!(CAT, imp: this, "session {session_id:?} not found");
return
};
match desc.type_() {
gst_webrtc::WebRTCSDPType::Offer =>{
let this = instance.imp();
gst::info!(CAT, imp: this, "got sdp offer");
let state = this.state.lock().unwrap();
let Some(session) = state.sessions.get(session_id) else {
gst::error!(CAT, imp: this, "session {session_id:?} not found");
return
};
let (promise, webrtcbin) = session.handle_offer(desc, &this.obj());
drop (state);
webrtcbin.emit_by_name::<()>("create-answer", &[&None::<gst::Structure>, &promise]);
let (promise, webrtcbin) = session.handle_offer(desc, &this.obj());
drop (state);
webrtcbin.emit_by_name::<()>("create-answer", &[&None::<gst::Structure>, &promise]);
},
gst_webrtc::WebRTCSDPType::Answer => {
let this = instance.imp();
gst::info!(CAT, imp: this, "got sdp answer");
let state = this.state.lock().unwrap();
let Some(session) = state.sessions.get(session_id) else {
gst::error!(CAT, imp: this, "session {session_id:?} not found");
return
};
session.handle_answer(desc, &this.obj());
},
_ => {},
}
}),
),
session_requested: signaller.connect_closure(
"session-requested",
false,
glib::closure!(@watch instance => move |_signaler: glib::Object, session_id: &str, _peer_id: &str, offer: Option<&gst_webrtc::WebRTCSessionDescription>|{
if offer.is_none() {
let this = instance.imp();
let state = this.state.lock().unwrap();
let Some(session) = state.sessions.get(session_id) else {
gst::error!(CAT, imp: this, "session {session_id:?} not found");
return
};
session.generate_offer(&this.obj());
}
}),
),
@ -995,13 +1231,15 @@ impl BaseWebRTCSrc {
fn maybe_start_signaller(&self) {
let obj = self.obj();
let mut state = self.state.lock().unwrap();
let state = self.state.lock().unwrap();
if state.signaller_state == SignallerState::Stopped
&& obj.current_state() >= gst::State::Paused
{
drop(state);
self.signaller().start();
gst::info!(CAT, imp: self, "Started signaller");
let mut state = self.state.lock().unwrap();
state.signaller_state = SignallerState::Started;
}
}