From 2642410702ef05fa3981a9e5a9b9817f0fbfa6ea Mon Sep 17 00:00:00 2001 From: Guillaume Desmottes Date: Sat, 12 Nov 2022 15:57:25 +0100 Subject: [PATCH] spotify: fix "start a runtime from within a runtime" with static link Part-of: --- audio/spotify/src/spotifyaudiosrc/imp.rs | 312 +++++++++++++++-------- 1 file changed, 200 insertions(+), 112 deletions(-) diff --git a/audio/spotify/src/spotifyaudiosrc/imp.rs b/audio/spotify/src/spotifyaudiosrc/imp.rs index c168fc3d..54e92a4e 100644 --- a/audio/spotify/src/spotifyaudiosrc/imp.rs +++ b/audio/spotify/src/spotifyaudiosrc/imp.rs @@ -6,9 +6,10 @@ // // SPDX-License-Identifier: MPL-2.0 -use std::sync::{mpsc, Arc, Mutex}; +use std::sync::{mpsc, Arc, Mutex, MutexGuard}; use anyhow::bail; +use futures::future::{AbortHandle, Abortable, Aborted}; use once_cell::sync::Lazy; use tokio::{runtime, task::JoinHandle}; @@ -74,10 +75,16 @@ struct Settings { #[derive(Default)] pub struct SpotifyAudioSrc { + setup_thread: Mutex>, state: Arc>>, settings: Mutex, } +struct SetupThread { + thread_handle: std::thread::JoinHandle, Aborted>>, + abort_handle: AbortHandle, +} + #[glib::object_subclass] impl ObjectSubclass for SpotifyAudioSrc { const NAME: &'static str = "GstSpotifyAudioSrc"; @@ -237,17 +244,22 @@ impl BaseSrcImpl for SpotifyAudioSrc { } } - if let Err(err) = RUNTIME.block_on(async move { self.setup().await }) { - let details = format!("{:?}", err); - gst::error!(CAT, imp: self, "failed to start: {}", details); - gst::element_imp_error!(self, gst::ResourceError::Settings, [&details]); - return Err(gst::error_msg!(gst::ResourceError::Settings, [&details])); + { + let setup_thread = self.setup_thread.lock().unwrap(); + if setup_thread.is_some() { + // already starting + return Ok(()); + } + self.start_setup(setup_thread); } Ok(()) } fn stop(&self) -> Result<(), gst::ErrorMessage> { + // stop the setup if it's not completed yet + self.cancel_setup(); + if let Some(state) = self.state.lock().unwrap().take() { gst::debug!(CAT, imp: self, "stopping"); state.player.stop(); @@ -258,6 +270,12 @@ impl BaseSrcImpl for SpotifyAudioSrc { Ok(()) } + + fn unlock(&self) -> Result<(), gst::ErrorMessage> { + self.cancel_setup(); + + self.parent_unlock() + } } impl PushSrcImpl for SpotifyAudioSrc { @@ -265,6 +283,41 @@ impl PushSrcImpl for SpotifyAudioSrc { &self, _buffer: Option<&mut gst::BufferRef>, ) -> Result { + let state_set = { + let state = self.state.lock().unwrap(); + state.is_some() + }; + + if !state_set { + let setup_thread = self.setup_thread.lock().unwrap(); + if setup_thread.is_none() { + // unlock() could potentially cancel the setup, and create() can be called after unlock() without going through start() again. + self.start_setup(setup_thread); + } + } + + { + // wait for the setup to be completed + let mut setup_thread = self.setup_thread.lock().unwrap(); + if let Some(setup) = setup_thread.take() { + let res = setup.thread_handle.join().unwrap(); + + match res { + Err(_aborted) => { + gst::debug!(CAT, imp: self, "setup has been cancelled"); + return Err(gst::FlowError::Flushing); + } + Ok(Err(err)) => { + let details = format!("{:?}", err); + gst::error!(CAT, imp: self, "failed to start: {}", details); + gst::element_imp_error!(self, gst::ResourceError::Settings, [&details]); + return Err(gst::FlowError::Error); + } + Ok(Ok(_)) => {} + } + } + } + let state = self.state.lock().unwrap(); let state = state.as_ref().unwrap(); @@ -290,112 +343,6 @@ impl PushSrcImpl for SpotifyAudioSrc { } } -impl SpotifyAudioSrc { - async fn setup(&self) -> anyhow::Result<()> { - let (credentials, cache, track) = { - let settings = self.settings.lock().unwrap(); - - let credentials_cache = if settings.cache_credentials.is_empty() { - None - } else { - Some(&settings.cache_credentials) - }; - - let files_cache = if settings.cache_files.is_empty() { - None - } else { - Some(&settings.cache_files) - }; - - let max_size = if settings.cache_max_size != 0 { - Some(settings.cache_max_size) - } else { - None - }; - - let cache = Cache::new(credentials_cache, None, files_cache, max_size)?; - - let credentials = match cache.credentials() { - Some(cached_cred) => { - gst::debug!(CAT, imp: self, "reuse credentials from cache",); - cached_cred - } - None => { - gst::debug!(CAT, imp: self, "credentials not in cache",); - - if settings.username.is_empty() { - bail!("username is not set and credentials are not in cache"); - } - if settings.password.is_empty() { - bail!("password is not set and credentials are not in cache"); - } - - let cred = Credentials::with_password(&settings.username, &settings.password); - cache.save_credentials(&cred); - cred - } - }; - - if settings.track.is_empty() { - bail!("track is not set") - } - - (credentials, cache, settings.track.clone()) - }; - - let state = self.state.clone(); - - let (session, _credentials) = - Session::connect(SessionConfig::default(), credentials, Some(cache), false).await?; - - let player_config = PlayerConfig { - passthrough: true, - ..Default::default() - }; - - // use a sync channel to prevent buffering the whole track inside the channel - let (sender, receiver) = mpsc::sync_channel(2); - let sender_clone = sender.clone(); - - let (mut player, mut player_event_channel) = - Player::new(player_config, session, Box::new(NoOpVolume), || { - Box::new(BufferSink { sender }) - }); - - let track = match SpotifyId::from_uri(&track) { - Ok(track) => track, - Err(_) => bail!("Failed to create Spotify URI from track"), - }; - - player.load(track, true, 0); - - let player_channel_handle = RUNTIME.spawn(async move { - let sender = sender_clone; - - while let Some(event) = player_event_channel.recv().await { - match event { - PlayerEvent::EndOfTrack { .. } => { - let _ = sender.send(Message::Eos); - } - PlayerEvent::Unavailable { .. } => { - let _ = sender.send(Message::Unavailable); - } - _ => {} - } - } - }); - - let mut state = state.lock().unwrap(); - state.replace(State { - player, - receiver, - player_channel_handle, - }); - - Ok(()) - } -} - struct BufferSink { sender: mpsc::SyncSender, } @@ -456,3 +403,144 @@ impl URIHandlerImpl for SpotifyAudioSrc { Ok(()) } } + +impl SpotifyAudioSrc { + fn start_setup(&self, mut setup_thread: MutexGuard>) { + let self_ = self.to_owned(); + + // run the runtime from another thread to prevent the "start a runtime from within a runtime" panic + // when the plugin is statically linked. + let (abort_handle, abort_registration) = AbortHandle::new_pair(); + let thread_handle = std::thread::spawn(move || { + RUNTIME.block_on(async move { + let future = Abortable::new(self_.setup(), abort_registration); + future.await + }) + }); + + setup_thread.replace(SetupThread { + thread_handle, + abort_handle, + }); + } + + async fn setup(&self) -> anyhow::Result<()> { + { + let state = self.state.lock().unwrap(); + + if state.is_some() { + // already setup + return Ok(()); + } + } + + let (credentials, cache, track) = { + let settings = self.settings.lock().unwrap(); + + let credentials_cache = if settings.cache_credentials.is_empty() { + None + } else { + Some(&settings.cache_credentials) + }; + + let files_cache = if settings.cache_files.is_empty() { + None + } else { + Some(&settings.cache_files) + }; + + let max_size = if settings.cache_max_size != 0 { + Some(settings.cache_max_size) + } else { + None + }; + + let cache = Cache::new(credentials_cache, None, files_cache, max_size)?; + + let credentials = match cache.credentials() { + Some(cached_cred) => { + gst::debug!(CAT, imp: self, "reuse credentials from cache",); + cached_cred + } + None => { + gst::debug!(CAT, imp: self, "credentials not in cache",); + + if settings.username.is_empty() { + bail!("username is not set and credentials are not in cache"); + } + if settings.password.is_empty() { + bail!("password is not set and credentials are not in cache"); + } + + let cred = Credentials::with_password(&settings.username, &settings.password); + cache.save_credentials(&cred); + cred + } + }; + + if settings.track.is_empty() { + bail!("track is not set") + } + + (credentials, cache, settings.track.clone()) + }; + + let (session, _credentials) = + Session::connect(SessionConfig::default(), credentials, Some(cache), false).await?; + + let player_config = PlayerConfig { + passthrough: true, + ..Default::default() + }; + + // use a sync channel to prevent buffering the whole track inside the channel + let (sender, receiver) = mpsc::sync_channel(2); + let sender_clone = sender.clone(); + + let (mut player, mut player_event_channel) = + Player::new(player_config, session, Box::new(NoOpVolume), || { + Box::new(BufferSink { sender }) + }); + + let track = match SpotifyId::from_uri(&track) { + Ok(track) => track, + Err(_) => bail!("Failed to create Spotify URI from track"), + }; + + player.load(track, true, 0); + + let player_channel_handle = RUNTIME.spawn(async move { + let sender = sender_clone; + + while let Some(event) = player_event_channel.recv().await { + match event { + PlayerEvent::EndOfTrack { .. } => { + let _ = sender.send(Message::Eos); + } + PlayerEvent::Unavailable { .. } => { + let _ = sender.send(Message::Unavailable); + } + _ => {} + } + } + }); + + let mut state = self.state.lock().unwrap(); + + state.replace(State { + player, + receiver, + player_channel_handle, + }); + + Ok(()) + } + + fn cancel_setup(&self) { + let mut setup_thread = self.setup_thread.lock().unwrap(); + + if let Some(setup) = setup_thread.take() { + setup.abort_handle.abort(); + } + } +}