spotify: fix "start a runtime from within a runtime" with static link

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/976>
This commit is contained in:
Guillaume Desmottes 2022-11-12 15:57:25 +01:00
parent 3abd13e57b
commit 2642410702

View file

@ -6,9 +6,10 @@
//
// SPDX-License-Identifier: MPL-2.0
use std::sync::{mpsc, Arc, Mutex};
use std::sync::{mpsc, Arc, Mutex, MutexGuard};
use anyhow::bail;
use futures::future::{AbortHandle, Abortable, Aborted};
use once_cell::sync::Lazy;
use tokio::{runtime, task::JoinHandle};
@ -74,10 +75,16 @@ struct Settings {
#[derive(Default)]
pub struct SpotifyAudioSrc {
setup_thread: Mutex<Option<SetupThread>>,
state: Arc<Mutex<Option<State>>>,
settings: Mutex<Settings>,
}
struct SetupThread {
thread_handle: std::thread::JoinHandle<Result<anyhow::Result<()>, Aborted>>,
abort_handle: AbortHandle,
}
#[glib::object_subclass]
impl ObjectSubclass for SpotifyAudioSrc {
const NAME: &'static str = "GstSpotifyAudioSrc";
@ -237,17 +244,22 @@ impl BaseSrcImpl for SpotifyAudioSrc {
}
}
if let Err(err) = RUNTIME.block_on(async move { self.setup().await }) {
let details = format!("{:?}", err);
gst::error!(CAT, imp: self, "failed to start: {}", details);
gst::element_imp_error!(self, gst::ResourceError::Settings, [&details]);
return Err(gst::error_msg!(gst::ResourceError::Settings, [&details]));
{
let setup_thread = self.setup_thread.lock().unwrap();
if setup_thread.is_some() {
// already starting
return Ok(());
}
self.start_setup(setup_thread);
}
Ok(())
}
fn stop(&self) -> Result<(), gst::ErrorMessage> {
// stop the setup if it's not completed yet
self.cancel_setup();
if let Some(state) = self.state.lock().unwrap().take() {
gst::debug!(CAT, imp: self, "stopping");
state.player.stop();
@ -258,6 +270,12 @@ impl BaseSrcImpl for SpotifyAudioSrc {
Ok(())
}
fn unlock(&self) -> Result<(), gst::ErrorMessage> {
self.cancel_setup();
self.parent_unlock()
}
}
impl PushSrcImpl for SpotifyAudioSrc {
@ -265,6 +283,41 @@ impl PushSrcImpl for SpotifyAudioSrc {
&self,
_buffer: Option<&mut gst::BufferRef>,
) -> Result<CreateSuccess, gst::FlowError> {
let state_set = {
let state = self.state.lock().unwrap();
state.is_some()
};
if !state_set {
let setup_thread = self.setup_thread.lock().unwrap();
if setup_thread.is_none() {
// unlock() could potentially cancel the setup, and create() can be called after unlock() without going through start() again.
self.start_setup(setup_thread);
}
}
{
// wait for the setup to be completed
let mut setup_thread = self.setup_thread.lock().unwrap();
if let Some(setup) = setup_thread.take() {
let res = setup.thread_handle.join().unwrap();
match res {
Err(_aborted) => {
gst::debug!(CAT, imp: self, "setup has been cancelled");
return Err(gst::FlowError::Flushing);
}
Ok(Err(err)) => {
let details = format!("{:?}", err);
gst::error!(CAT, imp: self, "failed to start: {}", details);
gst::element_imp_error!(self, gst::ResourceError::Settings, [&details]);
return Err(gst::FlowError::Error);
}
Ok(Ok(_)) => {}
}
}
}
let state = self.state.lock().unwrap();
let state = state.as_ref().unwrap();
@ -290,112 +343,6 @@ impl PushSrcImpl for SpotifyAudioSrc {
}
}
impl SpotifyAudioSrc {
async fn setup(&self) -> anyhow::Result<()> {
let (credentials, cache, track) = {
let settings = self.settings.lock().unwrap();
let credentials_cache = if settings.cache_credentials.is_empty() {
None
} else {
Some(&settings.cache_credentials)
};
let files_cache = if settings.cache_files.is_empty() {
None
} else {
Some(&settings.cache_files)
};
let max_size = if settings.cache_max_size != 0 {
Some(settings.cache_max_size)
} else {
None
};
let cache = Cache::new(credentials_cache, None, files_cache, max_size)?;
let credentials = match cache.credentials() {
Some(cached_cred) => {
gst::debug!(CAT, imp: self, "reuse credentials from cache",);
cached_cred
}
None => {
gst::debug!(CAT, imp: self, "credentials not in cache",);
if settings.username.is_empty() {
bail!("username is not set and credentials are not in cache");
}
if settings.password.is_empty() {
bail!("password is not set and credentials are not in cache");
}
let cred = Credentials::with_password(&settings.username, &settings.password);
cache.save_credentials(&cred);
cred
}
};
if settings.track.is_empty() {
bail!("track is not set")
}
(credentials, cache, settings.track.clone())
};
let state = self.state.clone();
let (session, _credentials) =
Session::connect(SessionConfig::default(), credentials, Some(cache), false).await?;
let player_config = PlayerConfig {
passthrough: true,
..Default::default()
};
// use a sync channel to prevent buffering the whole track inside the channel
let (sender, receiver) = mpsc::sync_channel(2);
let sender_clone = sender.clone();
let (mut player, mut player_event_channel) =
Player::new(player_config, session, Box::new(NoOpVolume), || {
Box::new(BufferSink { sender })
});
let track = match SpotifyId::from_uri(&track) {
Ok(track) => track,
Err(_) => bail!("Failed to create Spotify URI from track"),
};
player.load(track, true, 0);
let player_channel_handle = RUNTIME.spawn(async move {
let sender = sender_clone;
while let Some(event) = player_event_channel.recv().await {
match event {
PlayerEvent::EndOfTrack { .. } => {
let _ = sender.send(Message::Eos);
}
PlayerEvent::Unavailable { .. } => {
let _ = sender.send(Message::Unavailable);
}
_ => {}
}
}
});
let mut state = state.lock().unwrap();
state.replace(State {
player,
receiver,
player_channel_handle,
});
Ok(())
}
}
struct BufferSink {
sender: mpsc::SyncSender<Message>,
}
@ -456,3 +403,144 @@ impl URIHandlerImpl for SpotifyAudioSrc {
Ok(())
}
}
impl SpotifyAudioSrc {
fn start_setup(&self, mut setup_thread: MutexGuard<Option<SetupThread>>) {
let self_ = self.to_owned();
// run the runtime from another thread to prevent the "start a runtime from within a runtime" panic
// when the plugin is statically linked.
let (abort_handle, abort_registration) = AbortHandle::new_pair();
let thread_handle = std::thread::spawn(move || {
RUNTIME.block_on(async move {
let future = Abortable::new(self_.setup(), abort_registration);
future.await
})
});
setup_thread.replace(SetupThread {
thread_handle,
abort_handle,
});
}
async fn setup(&self) -> anyhow::Result<()> {
{
let state = self.state.lock().unwrap();
if state.is_some() {
// already setup
return Ok(());
}
}
let (credentials, cache, track) = {
let settings = self.settings.lock().unwrap();
let credentials_cache = if settings.cache_credentials.is_empty() {
None
} else {
Some(&settings.cache_credentials)
};
let files_cache = if settings.cache_files.is_empty() {
None
} else {
Some(&settings.cache_files)
};
let max_size = if settings.cache_max_size != 0 {
Some(settings.cache_max_size)
} else {
None
};
let cache = Cache::new(credentials_cache, None, files_cache, max_size)?;
let credentials = match cache.credentials() {
Some(cached_cred) => {
gst::debug!(CAT, imp: self, "reuse credentials from cache",);
cached_cred
}
None => {
gst::debug!(CAT, imp: self, "credentials not in cache",);
if settings.username.is_empty() {
bail!("username is not set and credentials are not in cache");
}
if settings.password.is_empty() {
bail!("password is not set and credentials are not in cache");
}
let cred = Credentials::with_password(&settings.username, &settings.password);
cache.save_credentials(&cred);
cred
}
};
if settings.track.is_empty() {
bail!("track is not set")
}
(credentials, cache, settings.track.clone())
};
let (session, _credentials) =
Session::connect(SessionConfig::default(), credentials, Some(cache), false).await?;
let player_config = PlayerConfig {
passthrough: true,
..Default::default()
};
// use a sync channel to prevent buffering the whole track inside the channel
let (sender, receiver) = mpsc::sync_channel(2);
let sender_clone = sender.clone();
let (mut player, mut player_event_channel) =
Player::new(player_config, session, Box::new(NoOpVolume), || {
Box::new(BufferSink { sender })
});
let track = match SpotifyId::from_uri(&track) {
Ok(track) => track,
Err(_) => bail!("Failed to create Spotify URI from track"),
};
player.load(track, true, 0);
let player_channel_handle = RUNTIME.spawn(async move {
let sender = sender_clone;
while let Some(event) = player_event_channel.recv().await {
match event {
PlayerEvent::EndOfTrack { .. } => {
let _ = sender.send(Message::Eos);
}
PlayerEvent::Unavailable { .. } => {
let _ = sender.send(Message::Unavailable);
}
_ => {}
}
}
});
let mut state = self.state.lock().unwrap();
state.replace(State {
player,
receiver,
player_channel_handle,
});
Ok(())
}
fn cancel_setup(&self) {
let mut setup_thread = self.setup_thread.lock().unwrap();
if let Some(setup) = setup_thread.take() {
setup.abort_handle.abort();
}
}
}