1
0
Fork 0
mirror of https://github.com/alfg/mp4-rust.git synced 2024-06-02 13:39:54 +00:00

Add async reader/writer

This commit is contained in:
Ian Jun 2020-08-10 20:52:22 +09:00
parent d52eb674f9
commit 54caec98f6
9 changed files with 618 additions and 8 deletions

View file

@ -17,8 +17,18 @@ keywords = ["mp4", "isobmff"]
license = "MIT"
[features]
default = []
async = ["tokio"]
[dependencies]
thiserror = "^1.0"
byteorder = "1"
bytes = "0.5"
num-rational = "0.3"
num-rational = "0.3"
[dependencies.tokio]
version = "0.2"
default-features = false
optional = true
features = ["macros", "io-util", "fs"]

View file

@ -1,11 +1,22 @@
use std::env;
use std::fs::File;
use std::io::prelude::*;
use std::io::{self, BufReader, BufWriter};
use std::io;
use std::path::Path;
#[cfg(not(feature = "async"))]
use {
std::fs::File,
std::io::{BufReader, BufWriter},
};
#[cfg(feature = "async")]
use {
tokio::fs::File,
};
use mp4::{AacConfig, AvcConfig, MediaConfig, MediaType, Mp4Config, Result, TrackConfig};
#[cfg(not(feature = "async"))]
fn main() {
let args: Vec<String> = env::args().collect();
@ -19,6 +30,22 @@ fn main() {
}
}
#[cfg(feature = "async")]
#[tokio::main]
async fn main() {
let args: Vec<String> = env::args().collect();
if args.len() < 3 {
println!("Usage: mp4copy <source file> <target file>");
std::process::exit(1);
}
if let Err(err) = async_copy(&args[1], &args[2]).await {
let _ = writeln!(io::stderr(), "{}", err);
}
}
#[cfg(not(feature = "async"))]
fn copy<P: AsRef<Path>>(src_filename: &P, dst_filename: &P) -> Result<()> {
let src_file = File::open(src_filename)?;
let size = src_file.metadata()?.len();
@ -73,8 +100,8 @@ fn copy<P: AsRef<Path>>(src_filename: &P, dst_filename: &P) -> Result<()> {
for sample_idx in 0..sample_count {
let sample_id = sample_idx + 1;
let sample = mp4_reader.read_sample(track_id, sample_id)?.unwrap();
println!("{}:({})", sample_id, sample);
mp4_writer.write_sample(track_id, &sample)?;
// println!("copy {}:({})", sample_id, sample);
}
}
@ -82,3 +109,66 @@ fn copy<P: AsRef<Path>>(src_filename: &P, dst_filename: &P) -> Result<()> {
Ok(())
}
#[cfg(feature = "async")]
async fn async_copy<P: AsRef<Path>>(src_filename: &P, dst_filename: &P) -> Result<()> {
let src_file = File::open(src_filename).await?;
let size = src_file.metadata().await?.len();
let dst_file = File::create(dst_filename).await?;
let mut mp4_reader = mp4::Mp4AsyncReader::async_read_header(src_file, size).await?;
let mut mp4_writer = mp4::Mp4AsyncWriter::async_write_start(
dst_file,
&Mp4Config {
major_brand: mp4_reader.major_brand().clone(),
minor_version: mp4_reader.minor_version(),
compatible_brands: mp4_reader.compatible_brands().to_vec(),
timescale: mp4_reader.timescale(),
},
).await?;
// TODO interleaving
for track_idx in 0..mp4_reader.tracks().len() {
if let Some(ref track) = mp4_reader.tracks().get(track_idx) {
let media_conf = match track.media_type()? {
MediaType::H264 => MediaConfig::AvcConfig(AvcConfig {
width: track.width(),
height: track.height(),
seq_param_set: track.sequence_parameter_set()?.to_vec(),
pic_param_set: track.picture_parameter_set()?.to_vec(),
}),
MediaType::AAC => MediaConfig::AacConfig(AacConfig {
bitrate: track.bitrate(),
profile: track.audio_profile()?,
freq_index: track.sample_freq_index()?,
chan_conf: track.channel_config()?,
}),
};
let track_conf = TrackConfig {
track_type: track.track_type()?,
timescale: track.timescale(),
language: track.language().to_string(),
media_conf,
};
mp4_writer.add_track(&track_conf)?;
} else {
unreachable!()
}
let track_id = track_idx as u32 + 1;
let sample_count = mp4_reader.sample_count(track_id)?;
for sample_idx in 0..sample_count {
let sample_id = sample_idx + 1;
let sample = mp4_reader.async_read_sample(track_id, sample_id).await?.unwrap();
println!("{}:({})", sample_id, sample);
mp4_writer.async_write_sample(track_id, &sample).await?;
}
}
mp4_writer.async_write_end().await?;
Ok(())
}

View file

@ -1,11 +1,22 @@
use std::env;
use std::fs::File;
use std::io::prelude::*;
use std::io::{self, BufReader};
use std::io;
use std::path::Path;
#[cfg(not(feature = "async"))]
use {
std::fs::File,
std::io::BufReader,
};
#[cfg(feature = "async")]
use {
tokio::fs::File,
};
use mp4::{Mp4Track, Result, TrackType};
#[cfg(not(feature = "async"))]
fn main() {
let args: Vec<String> = env::args().collect();
@ -19,6 +30,23 @@ fn main() {
}
}
#[cfg(feature = "async")]
#[tokio::main]
async fn main() {
let args: Vec<String> = env::args().collect();
if args.len() < 2 {
println!("Usage: mp4info <filename>");
std::process::exit(1);
}
if let Err(err) = async_info(&args[1]).await {
let _ = writeln!(io::stderr(), "{}", err);
}
}
#[cfg(not(feature = "async"))]
fn info<P: AsRef<Path>>(filename: &P) -> Result<()> {
let f = File::open(filename)?;
let size = f.metadata()?.len();
@ -54,6 +82,41 @@ fn info<P: AsRef<Path>>(filename: &P) -> Result<()> {
Ok(())
}
#[cfg(feature = "async")]
async fn async_info<P: AsRef<Path>>(filename: &P) -> Result<()> {
let file = File::open(filename).await?;
let size = file.metadata().await?.len();
let mp4 = mp4::Mp4AsyncReader::async_read_header(file, size).await?;
println!("Metadata:");
println!(" size : {}", mp4.size());
println!(" major_brand : {}", mp4.major_brand());
let mut compatible_brands = String::new();
for brand in mp4.compatible_brands().iter() {
compatible_brands.push_str(&brand.to_string());
compatible_brands.push_str(",");
}
println!(" compatible_brands: {}", compatible_brands);
println!("Duration: {:?}", mp4.duration());
for track in mp4.tracks().iter() {
let media_info = match track.track_type()? {
TrackType::Video => video_info(track)?,
TrackType::Audio => audio_info(track)?,
};
println!(
" Track: #{}({}) {}: {}",
track.track_id(),
track.language(),
track.track_type()?,
media_info
);
}
Ok(())
}
fn video_info(track: &Mp4Track) -> Result<String> {
Ok(format!(
"{} ({}) ({:?}), {}x{}, {} kb/s, {:.2} fps",

View file

@ -13,6 +13,10 @@ pub use track::{Mp4Track, TrackConfig};
mod reader;
pub use reader::Mp4Reader;
#[cfg(feature = "async")]
pub use reader::Mp4AsyncReader;
mod writer;
pub use writer::{Mp4Config, Mp4Writer};
#[cfg(feature = "async")]
pub use writer::Mp4AsyncWriter;

View file

@ -2,6 +2,13 @@ use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
use std::convert::TryInto;
use std::io::{Read, Seek, SeekFrom, Write};
#[cfg(feature = "async")]
use {
std::marker::Unpin,
std::io::Cursor,
tokio::io::{AsyncReadExt, AsyncWriteExt, AsyncSeekExt}
};
use crate::*;
pub(crate) mod avc1;
@ -154,6 +161,41 @@ impl BoxHeader {
}
}
#[cfg(feature = "async")]
pub async fn async_read<R>(reader: &mut R) -> Result<Self>
where
R: AsyncReadExt + Unpin
{
// Create and read to buf.
let mut buf = [0u8; 8]; // 8 bytes for box header.
reader.read(&mut buf).await?;
// Get size.
let s = buf[0..4].try_into().unwrap();
let size = u32::from_be_bytes(s);
// Get box type string.
let t = buf[4..8].try_into().unwrap();
let typ = u32::from_be_bytes(t);
// Get largesize if size is 1
if size == 1 {
reader.read(&mut buf).await?;
let s = buf.try_into().unwrap();
let largesize = u64::from_be_bytes(s);
Ok(BoxHeader {
name: BoxType::from(typ),
size: largesize,
})
} else {
Ok(BoxHeader {
name: BoxType::from(typ),
size: size as u64,
})
}
}
pub fn write<W: Write>(&self, writer: &mut W) -> Result<u64> {
if self.size > u32::MAX as u64 {
writer.write_u32::<BigEndian>(1)?;
@ -166,6 +208,22 @@ impl BoxHeader {
Ok(8)
}
}
#[cfg(feature = "async")]
pub async fn async_write<W>(&self, writer: &mut W) -> Result<u64>
where
W: AsyncWriteExt + Unpin
{
let size = if self.size > u32::MAX as u64 {
16
} else {
8
};
let mut buffer = vec![0u8; size];
let hdr_size = self.write(&mut Cursor::new(&mut buffer))?;
writer.write_all(&buffer).await?;
Ok(hdr_size)
}
}
pub fn read_box_header_ext<R: Read>(reader: &mut R) -> Result<(u8, u32)> {
@ -180,10 +238,15 @@ pub fn write_box_header_ext<W: Write>(w: &mut W, v: u8, f: u32) -> Result<u64> {
Ok(4)
}
pub fn box_start<R: Seek>(seeker: &mut R) -> Result<u64> {
pub fn box_start<S: Seek>(seeker: &mut S) -> Result<u64> {
Ok(seeker.seek(SeekFrom::Current(0))? - HEADER_SIZE)
}
#[cfg(feature = "async")]
pub async fn async_box_start<S: AsyncSeekExt + Unpin>(seeker: &mut S) -> Result<u64> {
Ok(seeker.seek(SeekFrom::Current(0)).await? - HEADER_SIZE)
}
pub fn skip_bytes<S: Seek>(seeker: &mut S, size: u64) -> Result<()> {
seeker.seek(SeekFrom::Current(size as i64))?;
Ok(())
@ -194,12 +257,31 @@ pub fn skip_bytes_to<S: Seek>(seeker: &mut S, pos: u64) -> Result<()> {
Ok(())
}
#[cfg(feature = "async")]
pub async fn async_skip_bytes_to<S>(seeker: &mut S, pos: u64) -> Result<()>
where
S: AsyncSeekExt + Unpin
{
seeker.seek(SeekFrom::Start(pos)).await?;
Ok(())
}
pub fn skip_box<S: Seek>(seeker: &mut S, size: u64) -> Result<()> {
let start = box_start(seeker)?;
skip_bytes_to(seeker, start + size)?;
Ok(())
}
#[cfg(feature = "async")]
pub async fn async_skip_box<S>(seeker: &mut S, size: u64) -> Result<()>
where
S: AsyncSeekExt + Unpin
{
let start = async_box_start(seeker).await?;
async_skip_bytes_to(seeker, start + size).await?;
Ok(())
}
pub fn write_zeros<W: Write>(writer: &mut W, size: u64) -> Result<()> {
for _ in 0..size {
writer.write_u8(0)?;

View file

@ -80,6 +80,6 @@ impl<W: Write> WriteBox<&mut W> for MoovBox {
for trak in self.traks.iter() {
trak.write_box(writer)?;
}
Ok(0)
Ok(size)
}
}

View file

@ -1,6 +1,13 @@
use std::io::{Read, Seek, SeekFrom};
use std::time::Duration;
#[cfg(feature = "async")]
use {
std::marker::Unpin,
std::io::Cursor,
tokio::io::{AsyncReadExt, AsyncSeekExt},
};
use crate::mp4box::*;
use crate::*;
@ -132,3 +139,145 @@ impl<R: Read + Seek> Mp4Reader<R> {
}
}
}
#[cfg(feature = "async")]
#[derive(Debug)]
pub struct Mp4AsyncReader<R> {
reader: R,
ftyp: FtypBox,
moov: MoovBox,
tracks: Vec<Mp4Track>,
size: u64,
}
#[cfg(feature = "async")]
impl<R> Mp4AsyncReader<R>
where
R: AsyncReadExt + AsyncSeekExt + Unpin
{
pub async fn async_read_header(mut reader: R, size: u64) -> Result<Self> {
let start = reader.seek(SeekFrom::Current(0)).await?;
let mut ftyp = None;
let mut moov = None;
let mut current = start;
while current < size {
// Get box header.
let header = BoxHeader::async_read(&mut reader).await?;
let BoxHeader { name, size: s } = header;
// Match and parse the atom boxes.
match name {
BoxType::FtypBox => {
let mut buffer = vec![0u8; s as usize];
reader.read_exact(&mut buffer[HEADER_SIZE as usize..]).await?;
let mut cursor = Cursor::new(&mut buffer);
skip_bytes(&mut cursor, HEADER_SIZE)?;
ftyp = Some(FtypBox::read_box(&mut cursor, s)?);
}
BoxType::FreeBox => {
async_skip_box(&mut reader, s).await?;
}
BoxType::MdatBox => {
async_skip_box(&mut reader, s).await?;
}
BoxType::MoovBox => {
let mut buffer = vec![0u8; s as usize];
reader.read_exact(&mut buffer[HEADER_SIZE as usize..]).await?;
let mut cursor = Cursor::new(&mut buffer);
skip_bytes(&mut cursor, HEADER_SIZE)?;
moov = Some(MoovBox::read_box(&mut cursor, s)?);
}
BoxType::MoofBox => {
async_skip_box(&mut reader, s).await?;
}
_ => {
// XXX warn!()
async_skip_box(&mut reader, s).await?;
}
}
current = reader.seek(SeekFrom::Current(0)).await?;
}
if ftyp.is_none() {
return Err(Error::BoxNotFound(BoxType::FtypBox));
}
if moov.is_none() {
return Err(Error::BoxNotFound(BoxType::MoovBox));
}
let size = current - start;
let tracks = if let Some(ref moov) = moov {
let mut tracks = Vec::with_capacity(moov.traks.len());
for (i, trak) in moov.traks.iter().enumerate() {
assert_eq!(trak.tkhd.track_id, i as u32 + 1);
tracks.push(Mp4Track::from(trak));
}
tracks
} else {
Vec::new()
};
Ok(Mp4AsyncReader {
reader,
ftyp: ftyp.unwrap(),
moov: moov.unwrap(),
size,
tracks,
})
}
pub fn size(&self) -> u64 {
self.size
}
pub fn major_brand(&self) -> &FourCC {
&self.ftyp.major_brand
}
pub fn minor_version(&self) -> u32 {
self.ftyp.minor_version
}
pub fn compatible_brands(&self) -> &[FourCC] {
&self.ftyp.compatible_brands
}
pub fn duration(&self) -> Duration {
Duration::from_millis(self.moov.mvhd.duration * 1000 / self.moov.mvhd.timescale as u64)
}
pub fn timescale(&self) -> u32 {
self.moov.mvhd.timescale
}
pub fn tracks(&self) -> &[Mp4Track] {
&self.tracks
}
pub fn sample_count(&self, track_id: u32) -> Result<u32> {
if track_id == 0 {
return Err(Error::TrakNotFound(track_id));
}
if let Some(track) = self.tracks.get(track_id as usize - 1) {
Ok(track.sample_count())
} else {
Err(Error::TrakNotFound(track_id))
}
}
pub async fn async_read_sample(&mut self, track_id: u32, sample_id: u32) -> Result<Option<Mp4Sample>> {
if track_id == 0 {
return Err(Error::TrakNotFound(track_id));
}
if let Some(ref track) = self.tracks.get(track_id as usize - 1) {
track.async_read_sample(&mut self.reader, sample_id).await
} else {
Err(Error::TrakNotFound(track_id))
}
}
}

View file

@ -4,6 +4,12 @@ use std::convert::TryFrom;
use std::io::{Read, Seek, SeekFrom, Write};
use std::time::Duration;
#[cfg(feature = "async")]
use {
std::marker::Unpin,
tokio::io::{AsyncReadExt, AsyncWriteExt, AsyncSeekExt},
};
use crate::mp4box::trak::TrakBox;
use crate::mp4box::*;
use crate::mp4box::{
@ -405,6 +411,39 @@ impl Mp4Track {
bytes: Bytes::from(buffer),
}))
}
#[cfg(feature = "async")]
pub(crate) async fn async_read_sample<R>(
&self,
reader: &mut R,
sample_id: u32,
) -> Result<Option<Mp4Sample>>
where
R: AsyncReadExt + AsyncSeekExt + Unpin
{
let sample_offset = match self.sample_offset(sample_id) {
Ok(offset) => offset,
Err(Error::EntryInStblNotFound(_, _, _)) => return Ok(None),
Err(err) => return Err(err),
};
let sample_size = self.sample_size(sample_id).unwrap();
let mut buffer = vec![0x0u8; sample_size as usize];
reader.seek(SeekFrom::Start(sample_offset)).await?;
reader.read_exact(&mut buffer).await?;
let (start_time, duration) = self.sample_time(sample_id).unwrap(); // XXX
let rendering_offset = self.sample_rendering_offset(sample_id);
let is_sync = self.is_sync_sample(sample_id);
Ok(Some(Mp4Sample {
start_time,
duration,
rendering_offset,
is_sync,
bytes: Bytes::from(buffer),
}))
}
}
// TODO creation_time, modification_time
@ -598,6 +637,33 @@ impl Mp4TrackWriter {
Ok(self.trak.tkhd.duration)
}
#[cfg(feature = "async")]
pub(crate) async fn async_write_sample<W>(
&mut self,
writer: &mut W,
sample: &Mp4Sample,
movie_timescale: u32,
) -> Result<u64>
where
W: AsyncWriteExt + AsyncSeekExt + Unpin
{
self.chunk_buffer.extend_from_slice(&sample.bytes);
self.chunk_samples += 1;
self.chunk_duration += sample.duration;
self.update_sample_sizes(sample.bytes.len() as u32);
self.update_sample_times(sample.duration);
self.update_rendering_offsets(sample.rendering_offset);
self.update_sync_samples(sample.is_sync);
if self.is_chunk_full() {
self.async_write_chunk(writer).await?;
}
self.update_durations(sample.duration, movie_timescale);
self.sample_id += 1;
Ok(self.trak.tkhd.duration)
}
// XXX largesize
fn chunk_count(&self) -> u32 {
let stco = self.trak.mdia.minf.stbl.stco.as_ref().unwrap();
@ -643,6 +709,28 @@ impl Mp4TrackWriter {
Ok(())
}
#[cfg(feature = "async")]
async fn async_write_chunk<W>(&mut self, writer: &mut W) -> Result<()>
where
W: AsyncWriteExt + AsyncSeekExt + Unpin
{
if self.chunk_buffer.is_empty() {
return Ok(());
}
let chunk_offset = writer.seek(SeekFrom::Current(0)).await?;
writer.write_all(&self.chunk_buffer).await?;
self.update_sample_to_chunk(self.chunk_count() + 1);
self.update_chunk_offsets(chunk_offset);
self.chunk_buffer.clear();
self.chunk_samples = 0;
self.chunk_duration = 0;
Ok(())
}
fn max_sample_size(&self) -> u32 {
if self.trak.mdia.minf.stbl.stsz.sample_size > 0 {
self.trak.mdia.minf.stbl.stsz.sample_size
@ -668,4 +756,22 @@ impl Mp4TrackWriter {
Ok(self.trak.clone())
}
#[cfg(feature = "async")]
pub(crate) async fn async_write_end<W>(&mut self, writer: &mut W) -> Result<TrakBox>
where
W: AsyncWriteExt + AsyncSeekExt + Unpin
{
self.async_write_chunk(writer).await?;
let max_sample_size = self.max_sample_size();
if let Some(ref mut mp4a) = self.trak.mdia.minf.stbl.stsd.mp4a {
mp4a.esds.es_desc.dec_config.buffer_size_db = max_sample_size;
// TODO
// mp4a.esds.es_desc.dec_config.max_bitrate
// mp4a.esds.es_desc.dec_config.avg_bitrate
}
Ok(self.trak.clone())
}
}

View file

@ -1,6 +1,13 @@
use byteorder::{BigEndian, WriteBytesExt};
use std::io::{Seek, SeekFrom, Write};
#[cfg(feature = "async")]
use {
std::marker::Unpin,
std::io::Cursor,
tokio::io::{AsyncSeekExt, AsyncWriteExt},
};
use crate::mp4box::*;
use crate::track::Mp4TrackWriter;
use crate::*;
@ -100,3 +107,102 @@ impl<W: Write + Seek> Mp4Writer<W> {
Ok(())
}
}
#[cfg(feature = "async")]
#[derive(Debug)]
pub struct Mp4AsyncWriter<W> {
writer: W,
tracks: Vec<Mp4TrackWriter>,
mdat_pos: u64,
timescale: u32,
duration: u64,
}
#[cfg(feature = "async")]
impl<W> Mp4AsyncWriter<W>
where
W: AsyncWriteExt + AsyncSeekExt + Unpin
{
pub async fn async_write_start(mut writer: W, config: &Mp4Config) -> Result<Self> {
let ftyp = FtypBox {
major_brand: config.major_brand.clone(),
minor_version: config.minor_version.clone(),
compatible_brands: config.compatible_brands.clone(),
};
let mut buffer = vec![0u8; ftyp.box_size() as usize];
ftyp.write_box(&mut Cursor::new(&mut buffer))?;
writer.write_all(&buffer).await?;
// TODO largesize
let mdat_pos = writer.seek(SeekFrom::Current(0)).await?;
BoxHeader::new(BoxType::MdatBox, HEADER_SIZE).async_write(&mut writer).await?;
let tracks = Vec::new();
let timescale = config.timescale;
let duration = 0;
Ok(Self {
writer,
tracks,
mdat_pos,
timescale,
duration,
})
}
pub fn add_track(&mut self, config: &TrackConfig) -> Result<()> {
let track_id = self.tracks.len() as u32 + 1;
let track = Mp4TrackWriter::new(track_id, config)?;
self.tracks.push(track);
Ok(())
}
fn update_durations(&mut self, track_dur: u64) {
if track_dur > self.duration {
self.duration = track_dur;
}
}
pub async fn async_write_sample(&mut self, track_id: u32, sample: &Mp4Sample) -> Result<()> {
if track_id == 0 {
return Err(Error::TrakNotFound(track_id));
}
let track_dur = if let Some(ref mut track) = self.tracks.get_mut(track_id as usize - 1) {
track.async_write_sample(&mut self.writer, sample, self.timescale).await?
} else {
return Err(Error::TrakNotFound(track_id));
};
self.update_durations(track_dur);
Ok(())
}
async fn async_update_mdat_size(&mut self) -> Result<()> {
let mdat_end = self.writer.seek(SeekFrom::Current(0)).await?;
let mdat_size = mdat_end - self.mdat_pos;
assert!(mdat_size < std::u32::MAX as u64);
self.writer.seek(SeekFrom::Start(self.mdat_pos)).await?;
self.writer.write_all(&(mdat_size as u32).to_be_bytes()).await?;
self.writer.seek(SeekFrom::Start(mdat_end)).await?;
Ok(())
}
pub async fn async_write_end(&mut self) -> Result<()> {
let mut moov = MoovBox::default();
for track in self.tracks.iter_mut() {
moov.traks.push(track.async_write_end(&mut self.writer).await?);
}
self.async_update_mdat_size().await?;
moov.mvhd.timescale = self.timescale;
moov.mvhd.duration = self.duration;
let mut buffer = vec![0u8; moov.box_size() as usize];
moov.write_box(&mut Cursor::new(&mut buffer))?;
self.writer.write_all(&buffer).await?;
Ok(())
}
}