From e9cf54d43b4de0366cc544d0a4742023fe875566 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20Dr=C3=B6ge?= Date: Fri, 6 Jan 2017 00:46:59 +0200 Subject: [PATCH] Add support for AAC and AVC/H264 in FLV --- gst-plugin-flv/src/flvdemux.rs | 214 +++++++++++++++++++++++++++++---- gst-plugin/src/buffer.rs | 22 ++++ gst-plugin/src/caps.rs | 40 +++--- 3 files changed, 238 insertions(+), 38 deletions(-) diff --git a/gst-plugin-flv/src/flvdemux.rs b/gst-plugin-flv/src/flvdemux.rs index 98ba8ca6..8063411c 100644 --- a/gst-plugin-flv/src/flvdemux.rs +++ b/gst-plugin-flv/src/flvdemux.rs @@ -59,6 +59,9 @@ struct StreamingState { last_position: Option, metadata: Option, + + aac_sequence_header: Option, + avc_sequence_header: Option, } impl StreamingState { @@ -71,6 +74,8 @@ impl StreamingState { got_all_streams: false, last_position: None, metadata: None, + aac_sequence_header: None, + avc_sequence_header: None, } } } @@ -82,18 +87,23 @@ struct AudioFormat { width: u8, channels: u8, bitrate: Option, + aac_sequence_header: Option, } // Ignores bitrate impl PartialEq for AudioFormat { fn eq(&self, other: &Self) -> bool { - self.format.eq(&other.format) && self.rate.eq(&other.rate) && self.width.eq(&other.width) && - self.channels.eq(&other.channels) + self.format.eq(&other.format) && self.rate.eq(&other.rate) && + self.width.eq(&other.width) && self.channels.eq(&other.channels) && + self.aac_sequence_header.eq(&other.aac_sequence_header) } } impl AudioFormat { - fn new(data_header: &flavors::AudioDataHeader, metadata: &Option) -> AudioFormat { + fn new(data_header: &flavors::AudioDataHeader, + metadata: &Option, + aac_sequence_header: &Option) + -> AudioFormat { let numeric_rate = match (data_header.sound_format, data_header.sound_rate) { (flavors::SoundFormat::NELLYMOSER_16KHZ_MONO, _) => 16000, (flavors::SoundFormat::NELLYMOSER_8KHZ_MONO, _) => 8000, @@ -121,6 +131,7 @@ impl AudioFormat { width: numeric_width, channels: numeric_channels, bitrate: metadata.as_ref().and_then(|m| m.audio_bitrate), + aac_sequence_header: aac_sequence_header.clone(), } } @@ -147,7 +158,7 @@ impl AudioFormat { vec![("mpegversion", &caps::Value::Int(1)), ("layer", &caps::Value::Int(3))])) } - flavors::SoundFormat::PCM_BE | + flavors::SoundFormat::PCM_NE | flavors::SoundFormat::PCM_LE => { if self.rate != 0 && self.channels != 0 { // Assume little-endian for "PCM_NE", it's probably more common and we have no @@ -177,8 +188,13 @@ impl AudioFormat { flavors::SoundFormat::PCM_ALAW => Some(Caps::new_simple("audio/x-alaw", vec![])), flavors::SoundFormat::PCM_ULAW => Some(Caps::new_simple("audio/x-mulaw", vec![])), flavors::SoundFormat::AAC => { - // TODO: This requires getting the codec config from the stream - None + self.aac_sequence_header.as_ref().map(|header| { + Caps::new_simple("audio/mpeg", + vec![("mpegversion", &caps::Value::Int(4)), + ("framed", &caps::Value::Bool(true)), + ("stream-format", &caps::Value::String("raw".into())), + ("codec_data", &caps::Value::Buffer(header.clone()))]) + }) } flavors::SoundFormat::SPEEX => { // TODO: This requires creating a Speex streamheader... @@ -211,10 +227,14 @@ struct VideoFormat { pixel_aspect_ratio: Option<(u32, u32)>, framerate: Option<(u32, u32)>, bitrate: Option, + avc_sequence_header: Option, } impl VideoFormat { - fn new(data_header: &flavors::VideoDataHeader, metadata: &Option) -> VideoFormat { + fn new(data_header: &flavors::VideoDataHeader, + metadata: &Option, + avc_sequence_header: &Option) + -> VideoFormat { VideoFormat { format: data_header.codec_id, width: metadata.as_ref().and_then(|m| m.video_width), @@ -222,6 +242,7 @@ impl VideoFormat { pixel_aspect_ratio: metadata.as_ref().and_then(|m| m.video_pixel_aspect_ratio), framerate: metadata.as_ref().and_then(|m| m.video_framerate), bitrate: metadata.as_ref().and_then(|m| m.video_bitrate), + avc_sequence_header: avc_sequence_header.clone(), } } @@ -262,7 +283,7 @@ impl VideoFormat { fn to_caps(&self) -> Option { let mut caps = match self.format { - flavors::CodecId::H263 => { + flavors::CodecId::SORENSON_H263 => { Some(Caps::new_simple("video/x-flash-video", vec![("flvversion", &caps::Value::Int(1))])) } @@ -271,8 +292,17 @@ impl VideoFormat { flavors::CodecId::VP6A => Some(Caps::new_simple("video/x-vp6-flash-alpha", vec![])), flavors::CodecId::SCREEN2 => Some(Caps::new_simple("video/x-flash-screen2", vec![])), flavors::CodecId::H264 => { - // TODO: Need codec_data from the stream - None + self.avc_sequence_header.as_ref().map(|header| { + Caps::new_simple("video/x-h264", + vec![("stream-format", &caps::Value::String("avc".into())), + ("codec_data", &caps::Value::Buffer(header.clone()))]) + }) + } + flavors::CodecId::H263 => Some(Caps::new_simple("video/x-h263", vec![])), + flavors::CodecId::MPEG4Part2 => { + Some(Caps::new_simple("video/x-h263", + vec![("mpegversion", &caps::Value::Int(4)), + ("systemstream", &caps::Value::Bool(false))])) } flavors::CodecId::JPEG => { // Unused according to spec @@ -315,7 +345,8 @@ impl PartialEq for VideoFormat { self.format.eq(&other.format) && self.width.eq(&other.width) && self.height.eq(&other.height) && self.pixel_aspect_ratio.eq(&other.pixel_aspect_ratio) && - self.framerate.eq(&other.framerate) + self.framerate.eq(&other.framerate) && + self.avc_sequence_header.eq(&other.avc_sequence_header) } } @@ -518,7 +549,9 @@ impl FlvDemux { let streaming_state = self.streaming_state.as_mut().unwrap(); - let new_audio_format = AudioFormat::new(data_header, &streaming_state.metadata); + let new_audio_format = AudioFormat::new(data_header, + &streaming_state.metadata, + &streaming_state.aac_sequence_header); if streaming_state.audio.as_ref() != Some(&new_audio_format) { debug!(logger, "Got new audio format: {:?}", new_audio_format); @@ -534,11 +567,11 @@ impl FlvDemux { return Ok(HandleBufferResult::StreamChanged(stream)); } } else { - unimplemented!(); + streaming_state.audio = None; } } - if !streaming_state.got_all_streams && + if !streaming_state.got_all_streams && streaming_state.audio != None && (streaming_state.expect_video && streaming_state.video != None || !streaming_state.expect_video) { streaming_state.got_all_streams = true; @@ -562,12 +595,81 @@ impl FlvDemux { return Ok(HandleBufferResult::NeedMoreData); } + // AAC special case + if data_header.sound_format == flavors::SoundFormat::AAC { + // Not big enough for the AAC packet header, ship! + if tag_header.data_size < 1 + 1 { + self.adapter.flush(15 + tag_header.data_size as usize).unwrap(); + warn!(self.logger, + "Too small packet for AAC packet header {}", + 15 + tag_header.data_size); + return Ok(HandleBufferResult::Again); + } + + let mut data = [0u8; 17]; + self.adapter.peek_into(&mut data).unwrap(); + match flavors::aac_audio_packet_header(&data[16..]) { + IResult::Error(_) | + IResult::Incomplete(_) => { + unimplemented!(); + } + IResult::Done(_, header) => { + trace!(self.logger, "Got AAC packet header {:?}", header); + match header.packet_type { + flavors::AACPacketType::SequenceHeader => { + self.adapter.flush(15 + 1 + 1).unwrap(); + let buffer = self.adapter + .get_buffer((tag_header.data_size - 1 - 1) as usize) + .unwrap(); + debug!(self.logger, + "Got AAC sequence header {:?} of size {}", + buffer, + tag_header.data_size - 1 - 1); + + let streaming_state = self.streaming_state.as_mut().unwrap(); + streaming_state.aac_sequence_header = Some(buffer); + return Ok(HandleBufferResult::Again); + } + flavors::AACPacketType::Raw => { + // fall through + } + } + } + } + } + + let streaming_state = self.streaming_state.as_ref().unwrap(); + + if streaming_state.audio == None { + self.adapter.flush((tag_header.data_size + 15) as usize).unwrap(); + return Ok(HandleBufferResult::Again); + } + + let audio = streaming_state.audio.as_ref().unwrap(); self.adapter.flush(16).unwrap(); + + let offset = match audio.format { + flavors::SoundFormat::AAC => 1, + _ => 0, + }; + if tag_header.data_size == 0 { return Ok(HandleBufferResult::Again); } - let mut buffer = self.adapter.get_buffer((tag_header.data_size - 1) as usize).unwrap(); + if tag_header.data_size < offset { + self.adapter.flush((tag_header.data_size - 1) as usize).unwrap(); + return Ok(HandleBufferResult::Again); + } + + if offset > 0 { + self.adapter.flush(offset as usize).unwrap(); + } + + let mut buffer = self.adapter + .get_buffer((tag_header.data_size - 1 - offset) as usize) + .unwrap(); + buffer.set_pts(Some((tag_header.timestamp as u64) * 1000 * 1000)).unwrap(); trace!(self.logger, "Outputting audio buffer {:?} for tag {:?} of size {}", @@ -586,7 +688,9 @@ impl FlvDemux { let streaming_state = self.streaming_state.as_mut().unwrap(); - let new_video_format = VideoFormat::new(data_header, &streaming_state.metadata); + let new_video_format = VideoFormat::new(data_header, + &streaming_state.metadata, + &streaming_state.avc_sequence_header); if streaming_state.video.as_ref() != Some(&new_video_format) { debug!(logger, "Got new video format: {:?}", new_video_format); @@ -603,11 +707,11 @@ impl FlvDemux { return Ok(HandleBufferResult::StreamChanged(stream)); } } else { - unimplemented!(); + streaming_state.video = None; } } - if !streaming_state.got_all_streams && + if !streaming_state.got_all_streams && streaming_state.video != None && (streaming_state.expect_audio && streaming_state.audio != None || !streaming_state.expect_audio) { streaming_state.got_all_streams = true; @@ -631,17 +735,73 @@ impl FlvDemux { return Ok(HandleBufferResult::NeedMoreData); } + let mut cts = 0; + + // AVC/H264 special case + if data_header.codec_id == flavors::CodecId::H264 { + // Not big enough for the AVC packet header, ship! + if tag_header.data_size < 1 + 4 { + self.adapter.flush(15 + tag_header.data_size as usize).unwrap(); + warn!(self.logger, + "Too small packet for AVC packet header {}", + 15 + tag_header.data_size); + return Ok(HandleBufferResult::Again); + } + + let mut data = [0u8; 20]; + self.adapter.peek_into(&mut data).unwrap(); + match flavors::avc_video_packet_header(&data[16..]) { + IResult::Error(_) | + IResult::Incomplete(_) => { + unimplemented!(); + } + IResult::Done(_, header) => { + trace!(self.logger, "Got AVC packet header {:?}", header); + match header.packet_type { + flavors::AVCPacketType::SequenceHeader => { + self.adapter.flush(15 + 1 + 4).unwrap(); + let buffer = self.adapter + .get_buffer((tag_header.data_size - 1 - 4) as usize) + .unwrap(); + debug!(self.logger, + "Got AVC sequence header {:?} of size {}", + buffer, + tag_header.data_size - 1 - 4); + + let streaming_state = self.streaming_state.as_mut().unwrap(); + streaming_state.avc_sequence_header = Some(buffer); + return Ok(HandleBufferResult::Again); + } + flavors::AVCPacketType::NALU => { + cts = header.composition_time; + } + flavors::AVCPacketType::EndOfSequence => { + // Skip + self.adapter.flush(15 + tag_header.data_size as usize).unwrap(); + return Ok(HandleBufferResult::Again); + } + } + } + } + } + let streaming_state = self.streaming_state.as_ref().unwrap(); + + if streaming_state.video == None { + self.adapter.flush((tag_header.data_size + 15) as usize).unwrap(); + return Ok(HandleBufferResult::Again); + } + let video = streaming_state.video.as_ref().unwrap(); let is_keyframe = data_header.frame_type == flavors::FrameType::Key; self.adapter.flush(16).unwrap(); - let offset = if video.format == flavors::CodecId::VP6 || - video.format == flavors::CodecId::VP6A { - 1 - } else { - 0 + let offset = match video.format { + flavors::CodecId::VP6 | + flavors::CodecId::VP6A => 1, + flavors::CodecId::H264 => 4, + _ => 0, }; if tag_header.data_size == 0 { @@ -666,6 +826,14 @@ impl FlvDemux { buffer.set_dts(Some((tag_header.timestamp as u64) * 1000 * 1000)) .unwrap(); + // Prevent negative numbers + let pts = if cts < 0 && tag_header.timestamp < (-cts) as u32 { + 0 + } else { + ((tag_header.timestamp as i64) + (cts as i64)) as u64 + }; + buffer.set_pts(Some(pts * 1000 * 1000)).unwrap(); + trace!(self.logger, "Outputting video buffer {:?} for tag {:?} of size {}, keyframe: {}", buffer, diff --git a/gst-plugin/src/buffer.rs b/gst-plugin/src/buffer.rs index 965b0699..9ac811e0 100644 --- a/gst-plugin/src/buffer.rs +++ b/gst-plugin/src/buffer.rs @@ -254,6 +254,10 @@ impl Buffer { self.raw } + pub unsafe fn as_ptr(&self) -> *mut c_void { + self.raw + } + pub fn is_writable(&self) -> bool { extern "C" { fn gst_mini_object_is_writable(obj: *const c_void) -> GBoolean; @@ -605,6 +609,24 @@ impl Clone for Buffer { } } +impl PartialEq for Buffer { + fn eq(&self, other: &Buffer) -> bool { + if self.get_size() != other.get_size() { + return false; + } + + let self_map = self.map_read(); + let other_map = other.map_read(); + + match (self_map, other_map) { + (Some(self_map), Some(other_map)) => self_map.as_slice().eq(other_map.as_slice()), + _ => false, + } + } +} + +impl Eq for Buffer {} + impl<'a> ReadBufferMap<'a> { pub fn as_slice(&self) -> &[u8] { unsafe { slice::from_raw_parts(self.map_info.data as *const u8, self.map_info.size) } diff --git a/gst-plugin/src/caps.rs b/gst-plugin/src/caps.rs index 12a4f35f..a0041e21 100644 --- a/gst-plugin/src/caps.rs +++ b/gst-plugin/src/caps.rs @@ -23,12 +23,15 @@ use std::mem; use std::borrow::Cow; use std::fmt; +use buffer::*; + #[derive(Debug, PartialEq, Eq, Clone)] pub enum Value<'a> { Bool(bool), Int(i32), String(Cow<'a, str>), Fraction(i32, i32), + Buffer(Buffer), } pub struct Caps(*mut c_void); @@ -50,9 +53,7 @@ impl Caps { fn gst_caps_new_empty() -> *mut c_void; } - let caps = Caps(unsafe { gst_caps_new_empty() }); - - caps + Caps(unsafe { gst_caps_new_empty() }) } pub fn new_any() -> Self { @@ -60,19 +61,18 @@ impl Caps { fn gst_caps_new_any() -> *mut c_void; } - let caps = Caps(unsafe { gst_caps_new_any() }); - - caps + Caps(unsafe { gst_caps_new_any() }) } - pub fn new_simple(name: &str, values: Vec<(&str, &Value)>) -> Self { + pub fn new_simple<'a, I>(name: &str, values: I) -> Self + where I: IntoIterator)> + { extern "C" { - fn gst_caps_new_empty() -> *mut c_void; fn gst_caps_append_structure(caps: *mut c_void, structure: *mut c_void); fn gst_structure_new_empty(name: *const c_char) -> *mut c_void; } - let mut caps = Caps(unsafe { gst_caps_new_empty() }); + let mut caps = Caps::new_empty(); let name_cstr = CString::new(name).unwrap(); let structure = unsafe { gst_structure_new_empty(name_cstr.as_ptr()) }; @@ -102,7 +102,9 @@ impl Caps { } } - pub fn set_simple(&mut self, values: Vec<(&str, &Value)>) { + pub fn set_simple<'a, I>(&mut self, values: I) + where I: IntoIterator)> + { extern "C" { fn gst_caps_set_value(caps: *mut c_void, name: *const c_char, value: *const GValue); fn g_value_init(value: *mut GValue, gtype: usize); @@ -112,26 +114,28 @@ impl Caps { fn g_value_set_string(value: *mut GValue, value: *const c_char); fn gst_value_set_fraction(value: *mut GValue, value_n: i32, value_d: i32); fn gst_fraction_get_type() -> usize; + fn g_value_set_boxed(value: *mut GValue, boxed: *const c_void); + fn gst_buffer_get_type() -> usize; } for value in values { let name_cstr = CString::new(value.0).unwrap(); let mut gvalue: GValue = unsafe { mem::zeroed() }; - match value.1 { - &Value::Bool(v) => unsafe { + match *value.1 { + Value::Bool(v) => unsafe { g_value_init(&mut gvalue as *mut GValue, TYPE_BOOLEAN); g_value_set_boolean(&mut gvalue as *mut GValue, if v { 1 } else { 0 }); gst_caps_set_value(self.0, name_cstr.as_ptr(), &mut gvalue as *mut GValue); g_value_unset(&mut gvalue as *mut GValue); }, - &Value::Int(v) => unsafe { + Value::Int(v) => unsafe { g_value_init(&mut gvalue as *mut GValue, TYPE_INT); g_value_set_int(&mut gvalue as *mut GValue, v); gst_caps_set_value(self.0, name_cstr.as_ptr(), &mut gvalue as *mut GValue); g_value_unset(&mut gvalue as *mut GValue); }, - &Value::String(ref v) => unsafe { + Value::String(ref v) => unsafe { let v_cstr = CString::new(String::from((*v).clone())).unwrap(); g_value_init(&mut gvalue as *mut GValue, TYPE_STRING); @@ -139,12 +143,18 @@ impl Caps { gst_caps_set_value(self.0, name_cstr.as_ptr(), &mut gvalue as *mut GValue); g_value_unset(&mut gvalue as *mut GValue); }, - &Value::Fraction(v_n, v_d) => unsafe { + Value::Fraction(v_n, v_d) => unsafe { g_value_init(&mut gvalue as *mut GValue, gst_fraction_get_type()); gst_value_set_fraction(&mut gvalue as *mut GValue, v_n, v_d); gst_caps_set_value(self.0, name_cstr.as_ptr(), &mut gvalue as *mut GValue); g_value_unset(&mut gvalue as *mut GValue); }, + Value::Buffer(ref buffer) => unsafe { + g_value_init(&mut gvalue as *mut GValue, gst_buffer_get_type()); + g_value_set_boxed(&mut gvalue as *mut GValue, buffer.as_ptr()); + gst_caps_set_value(self.0, name_cstr.as_ptr(), &mut gvalue as *mut GValue); + g_value_unset(&mut gvalue as *mut GValue); + }, } } }