diff --git a/Cargo.toml b/Cargo.toml index c5a41d6..c6e8e57 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,9 +12,6 @@ license = "MIT" readme = "README.md" rust-version = "1.67" -[lib] -doctest = false - [dependencies] chrono = "0.4" log = "0.4" diff --git a/README.md b/README.md index da7bfc6..9d960b5 100644 --- a/README.md +++ b/README.md @@ -17,13 +17,14 @@ Backie started as a fork of Here are some of the Backie's key features: -- Async workers: Workers are started as [Tokio](https://tokio.rs/) tasks -- Application context: Tasks can access an shared user-provided application context -- Single-purpose workers: Tasks are stored together but workers are configured to execute only tasks of a specific queue -- Retries: Tasks are retried with a custom backoff mode -- Graceful shutdown: provide a future to gracefully shutdown the workers, on-the-fly tasks are not interrupted -- Recovery of unfinished tasks: Tasks that were not finished are retried on the next worker start -- Unique tasks: Tasks are not duplicated in the queue if they provide a unique hash +- **Guaranteed execution**: at least one execution of a task +- **Async workers**: Workers are started as [Tokio](https://tokio.rs/) tasks +- **Application context**: Tasks can access an shared user-provided application context +- **Single-purpose workers**: Tasks are stored together but workers are configured to execute only tasks of a specific queue +- **Retries**: Tasks are retried with a custom backoff mode +- **Graceful shutdown**: provide a future to gracefully shutdown the workers, on-the-fly tasks are not interrupted +- **Recovery of unfinished tasks**: Tasks that were not finished are retried on the next worker start +- **Unique tasks**: Tasks are not duplicated in the queue if they provide a unique hash ## Other planned features @@ -99,50 +100,19 @@ First, we need to create a [`TaskStore`] trait instance. This is the object resp tasks from a database. Backie currently only supports Postgres as a storage backend via the provided [`PgTaskStore`]. You can implement other storage backends by implementing the [`TaskStore`] trait. -```rust -let connection_url = "postgres://postgres:password@localhost/backie"; - -let manager = AsyncDieselConnectionManager::::new(connection_url); -let pool = Pool::builder() - .max_size(3) - .build(manager) - .await - .unwrap(); - -let task_store = PgTaskStore::new(pool); -``` - Then, we can use the `task_store` to start a worker pool using the [`WorkerPool`]. The [`WorkerPool`] is responsible for starting the workers and managing their lifecycle. -```rust -// Register the task types I want to use and start the worker pool -let (_, queue) = WorkerPool::new(task_store, |_|()) - .register_task_type::() - .configure_queue("default", 1, RetentionMode::default()) - .start(futures::future::pending::<()>()) - .await - .unwrap(); -``` - -With that, we are defining that we want to execute instances of `MyTask` and that the `default` queue should -have 1 worker running using the default [`RetentionMode`] (remove from the database only successfully finished tasks). -We also defined in the `start` method that the worker pool should run forever. +A full example of starting a worker pool can be found in the [examples directory](https://github.com/rafaelcaricio/backie/blob/main/examples/simple_worker/src/main.rs). ### Queueing tasks -After stating the workers we get an instance of [`Queue`] which we can use to enqueue tasks. It is also possible +After stating the workers, we get an instance of [`Queue`] which we can use to enqueue tasks. It is also possible to directly create a [`Queue`] instance from with a [`TaskStore`] instance. -```rust -let queue = Queue::new(task_store); -let task = MyTask { info: "Hello world!".to_string() }; -queue.enqueue(task).await.unwrap(); -``` - -This will enqueue the task and whenever a worker is available it will start processing it. Workers don't need to be +This will enqueue the task and whenever a worker is available it will start processing. Workers don't need to be started before enqueuing tasks. Workers don't need to be in the same process as the queue as long as the workers have -access to the same underlying storage system. +access to the same underlying storage system. This enables horizontal scaling of the workers. ## Contributing diff --git a/examples/simple_worker/Cargo.toml b/examples/simple_worker/Cargo.toml index 5b3a9c1..aafc79f 100644 --- a/examples/simple_worker/Cargo.toml +++ b/examples/simple_worker/Cargo.toml @@ -6,7 +6,7 @@ edition = "2021" [dependencies] backie = { path = "../../" } anyhow = "1" -env_logger = "0.9.0" +env_logger = "0.10" log = "0.4.0" tokio = { version = "1", features = ["full"] } diesel-async = { version = "0.2", features = ["postgres", "bb8"] } diff --git a/examples/simple_worker/src/lib.rs b/examples/simple_worker/src/lib.rs deleted file mode 100644 index 4776108..0000000 --- a/examples/simple_worker/src/lib.rs +++ /dev/null @@ -1,97 +0,0 @@ -use async_trait::async_trait; -use backie::{BackgroundTask, CurrentTask}; -use serde::{Deserialize, Serialize}; -use std::time::Duration; - -#[derive(Clone, Debug)] -pub struct MyApplicationContext { - app_name: String, -} - -impl MyApplicationContext { - pub fn new(app_name: &str) -> Self { - Self { - app_name: app_name.to_string(), - } - } -} - -#[derive(Serialize, Deserialize)] -pub struct MyTask { - pub number: u16, -} - -impl MyTask { - pub fn new(number: u16) -> Self { - Self { number } - } -} - -#[derive(Serialize, Deserialize)] -pub struct MyFailingTask { - pub number: u16, -} - -impl MyFailingTask { - pub fn new(number: u16) -> Self { - Self { number } - } -} - -#[async_trait] -impl BackgroundTask for MyTask { - const TASK_NAME: &'static str = "my_task"; - type AppData = MyApplicationContext; - - async fn run(&self, task: CurrentTask, ctx: Self::AppData) -> Result<(), anyhow::Error> { - // let new_task = MyTask::new(self.number + 1); - // queue - // .insert_task(&new_task) - // .await - // .unwrap(); - - log::info!( - "[{}] Hello from {}! the current number is {}", - task.id(), - ctx.app_name, - self.number - ); - tokio::time::sleep(Duration::from_secs(3)).await; - - log::info!("[{}] done..", task.id()); - Ok(()) - } -} - -#[async_trait] -impl BackgroundTask for MyFailingTask { - const TASK_NAME: &'static str = "my_failing_task"; - type AppData = MyApplicationContext; - - async fn run(&self, task: CurrentTask, _ctx: Self::AppData) -> Result<(), anyhow::Error> { - // let new_task = MyFailingTask::new(self.number + 1); - // queue - // .insert_task(&new_task) - // .await - // .unwrap(); - - // task.id(); - // task.keep_alive().await?; - // task.previous_error(); - // task.retry_count(); - - log::info!("[{}] the current number is {}", task.id(), self.number); - tokio::time::sleep(Duration::from_secs(3)).await; - - log::info!("[{}] done..", task.id()); - // - // let b = true; - // - // if b { - // panic!("Hello!"); - // } else { - // Ok(()) - // } - Ok(()) - } -} diff --git a/examples/simple_worker/src/main.rs b/examples/simple_worker/src/main.rs index c828313..0b322cc 100644 --- a/examples/simple_worker/src/main.rs +++ b/examples/simple_worker/src/main.rs @@ -1,9 +1,96 @@ -use backie::{PgTaskStore, RetentionMode, WorkerPool}; +use async_trait::async_trait; +use backie::{BackgroundTask, CurrentTask}; +use backie::{PgTaskStore, Queue, WorkerPool}; use diesel_async::pg::AsyncPgConnection; use diesel_async::pooled_connection::{bb8::Pool, AsyncDieselConnectionManager}; -use simple_worker::MyApplicationContext; -use simple_worker::MyFailingTask; -use simple_worker::MyTask; +use serde::{Deserialize, Serialize}; +use std::sync::Arc; +use std::time::Duration; + +#[derive(Clone, Debug)] +pub struct MyApplicationContext { + app_name: String, +} + +impl MyApplicationContext { + pub fn new(app_name: &str) -> Self { + Self { + app_name: app_name.to_string(), + } + } +} + +#[derive(Serialize, Deserialize)] +pub struct MyTask { + pub number: u16, +} + +impl MyTask { + pub fn new(number: u16) -> Self { + Self { number } + } +} + +#[async_trait] +impl BackgroundTask for MyTask { + const TASK_NAME: &'static str = "my_task"; + type AppData = MyApplicationContext; + + async fn run(&self, task: CurrentTask, ctx: Self::AppData) -> Result<(), anyhow::Error> { + // let new_task = MyTask::new(self.number + 1); + // queue + // .insert_task(&new_task) + // .await + // .unwrap(); + + log::info!( + "[{}] Hello from {}! the current number is {}", + task.id(), + ctx.app_name, + self.number + ); + tokio::time::sleep(Duration::from_secs(3)).await; + + log::info!("[{}] done..", task.id()); + Ok(()) + } +} + +#[derive(Serialize, Deserialize)] +pub struct MyFailingTask { + pub number: u16, +} + +impl MyFailingTask { + pub fn new(number: u16) -> Self { + Self { number } + } +} + +#[async_trait] +impl BackgroundTask for MyFailingTask { + const TASK_NAME: &'static str = "my_failing_task"; + type AppData = MyApplicationContext; + + async fn run(&self, task: CurrentTask, _ctx: Self::AppData) -> Result<(), anyhow::Error> { + // let new_task = MyFailingTask::new(self.number + 1); + // queue + // .insert_task(&new_task) + // .await + // .unwrap(); + + // task.id(); + // task.keep_alive().await?; + // task.previous_error(); + // task.retry_count(); + + log::info!("[{}] the current number is {}", task.id(), self.number); + tokio::time::sleep(Duration::from_secs(3)).await; + + log::info!("[{}] done..", task.id()); + Ok(()) + } +} #[tokio::main] async fn main() { @@ -30,15 +117,16 @@ async fn main() { let my_app_context = MyApplicationContext::new("Backie Example App"); // Register the task types I want to use and start the worker pool - let (join_handle, queue) = WorkerPool::new(task_store, move |_| my_app_context.clone()) - .register_task_type::() - .register_task_type::() - .configure_queue("default", 3, RetentionMode::RemoveDone) - .start(async move { - let _ = rx.changed().await; - }) - .await - .unwrap(); + let (join_handle, _queue) = + WorkerPool::new(task_store.clone(), move |_| my_app_context.clone()) + .register_task_type::() + .register_task_type::() + .configure_queue("default".into()) + .start(async move { + let _ = rx.changed().await; + }) + .await + .unwrap(); log::info!("Workers started ..."); @@ -46,6 +134,7 @@ async fn main() { let task2 = MyTask::new(20_000); let task3 = MyFailingTask::new(50_000); + let queue = Queue::new(Arc::new(task_store)); // or use the `queue` instance returned by the worker pool queue.enqueue(task1).await.unwrap(); queue.enqueue(task2).await.unwrap(); queue.enqueue(task3).await.unwrap(); diff --git a/src/lib.rs b/src/lib.rs index 2d755a9..418938e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -5,7 +5,7 @@ /// All possible options for retaining tasks in the db after their execution. /// /// The default mode is [`RetentionMode::RemoveAll`] -#[derive(Copy, Clone, Eq, PartialEq, Debug)] +#[derive(Copy, Clone, Eq, PartialEq, Debug, Hash)] pub enum RetentionMode { /// Keep all tasks KeepAll, @@ -28,7 +28,7 @@ pub use runnable::BackgroundTask; pub use store::{PgTaskStore, TaskStore}; pub use task::{CurrentTask, Task, TaskId, TaskState}; pub use worker::Worker; -pub use worker_pool::WorkerPool; +pub use worker_pool::{QueueConfig, WorkerPool}; pub mod errors; mod queries; diff --git a/src/runnable.rs b/src/runnable.rs index f9eae3d..4079ee3 100644 --- a/src/runnable.rs +++ b/src/runnable.rs @@ -17,7 +17,7 @@ use serde::{de::DeserializeOwned, ser::Serialize}; /// /// /// # Example -/// ```rust +/// ``` /// use async_trait::async_trait; /// use backie::{BackgroundTask, CurrentTask}; /// use serde::{Deserialize, Serialize}; @@ -25,6 +25,7 @@ use serde::{de::DeserializeOwned, ser::Serialize}; /// #[derive(Serialize, Deserialize)] /// pub struct MyTask {} /// +/// #[async_trait] /// impl BackgroundTask for MyTask { /// const TASK_NAME: &'static str = "my_task_unique_name"; /// type AppData = (); diff --git a/src/worker.rs b/src/worker.rs index 05fb882..d1c37d0 100644 --- a/src/worker.rs +++ b/src/worker.rs @@ -9,6 +9,7 @@ use std::collections::BTreeMap; use std::future::Future; use std::pin::Pin; use std::sync::Arc; +use std::time::Duration; use thiserror::Error; pub type ExecuteTaskFn = Arc< @@ -59,6 +60,8 @@ where retention_mode: RetentionMode, + pull_interval: Duration, + task_registry: BTreeMap>, app_data_fn: StateFn, @@ -76,6 +79,7 @@ where store: Arc, queue_name: String, retention_mode: RetentionMode, + pull_interval: Duration, task_registry: BTreeMap>, app_data_fn: StateFn, shutdown: Option>, @@ -84,6 +88,7 @@ where store, queue_name, retention_mode, + pull_interval, task_registry, app_data_fn, shutdown, @@ -120,11 +125,11 @@ where log::info!("Shutting down worker"); return Ok(()); } - _ = tokio::time::sleep(std::time::Duration::from_secs(1)).fuse() => {} + _ = tokio::time::sleep(self.pull_interval).fuse() => {} } } None => { - tokio::time::sleep(std::time::Duration::from_secs(1)).await; + tokio::time::sleep(self.pull_interval).await; } }; } diff --git a/src/worker_pool.rs b/src/worker_pool.rs index a8653af..d5379b6 100644 --- a/src/worker_pool.rs +++ b/src/worker_pool.rs @@ -9,6 +9,7 @@ use futures::future::join_all; use std::collections::BTreeMap; use std::future::Future; use std::sync::Arc; +use std::time::Duration; use tokio::task::JoinHandle; #[derive(Clone)] @@ -37,7 +38,7 @@ where queue_tasks: BTreeMap>, /// Number of workers that will be spawned per queue. - worker_queues: BTreeMap, + worker_queues: BTreeMap, } impl WorkerPool @@ -80,14 +81,8 @@ where self } - pub fn configure_queue( - mut self, - queue_name: impl ToString, - num_workers: u32, - retention_mode: RetentionMode, - ) -> Self { - self.worker_queues - .insert(queue_name.to_string(), (retention_mode, num_workers)); + pub fn configure_queue(mut self, config: QueueConfig) -> Self { + self.worker_queues.insert(config.name.clone(), config); self } @@ -110,12 +105,13 @@ where let mut worker_handles = Vec::new(); // Spawn all individual workers per queue - for (queue_name, (retention_mode, num_workers)) in self.worker_queues.iter() { - for idx in 0..*num_workers { + for (queue_name, queue_config) in self.worker_queues.iter() { + for idx in 0..queue_config.num_workers { let mut worker: Worker = Worker::new( self.task_store.clone(), queue_name.clone(), - *retention_mode, + queue_config.retention_mode, + queue_config.pull_interval, self.task_registry.clone(), self.application_data_fn.clone(), Some(rx.clone()), @@ -157,6 +153,80 @@ where } } +/// Configuration for a queue. +/// +/// This is used to configure the number of workers, the retention mode, and the pulling interval +/// for a queue. +/// +/// # Examples +/// +/// Example of configuring a queue with all options: +/// ``` +/// # use backie::QueueConfig; +/// # use backie::RetentionMode; +/// # use std::time::Duration; +/// let config = QueueConfig::new("default") +/// .num_workers(5) +/// .retention_mode(RetentionMode::KeepAll) +/// .pull_interval(Duration::from_secs(1)); +/// ``` +/// Example of queue configuration with default options: +/// ``` +/// # use backie::QueueConfig; +/// let config = QueueConfig::new("default"); +/// // Also possible to use the `From` trait: +/// let config: QueueConfig = "default".into(); +/// ``` +#[derive(Clone, Debug, PartialEq, Eq, Hash)] +pub struct QueueConfig { + name: String, + num_workers: u32, + retention_mode: RetentionMode, + pull_interval: Duration, +} + +impl QueueConfig { + /// Create a new queue configuration. + pub fn new(name: impl ToString) -> Self { + Self { + name: name.to_string(), + num_workers: 1, + retention_mode: RetentionMode::default(), + pull_interval: Duration::from_secs(1), + } + } + + /// Set the number of workers for this queue. + pub fn num_workers(mut self, num_workers: u32) -> Self { + self.num_workers = num_workers; + self + } + + /// Set the retention mode for this queue. + pub fn retention_mode(mut self, retention_mode: RetentionMode) -> Self { + self.retention_mode = retention_mode; + self + } + + /// Set the pull interval for this queue. + /// + /// This is the interval at which the queue will be checking for new tasks by calling + /// the backend storage. + pub fn pull_interval(mut self, pull_interval: Duration) -> Self { + self.pull_interval = pull_interval; + self + } +} + +impl From for QueueConfig +where + S: ToString, +{ + fn from(name: S) -> Self { + Self::new(name.to_string()) + } +} + #[cfg(test)] mod tests { use super::*; @@ -263,7 +333,7 @@ mod tests { let (join_handle, queue) = WorkerPool::new(memory_store().await, move |_| my_app_context.clone()) .register_task_type::() - .configure_queue(GreetingTask::QUEUE, 1, RetentionMode::RemoveDone) + .configure_queue(GreetingTask::QUEUE.into()) .start(futures::future::ready(())) .await .unwrap(); @@ -286,8 +356,8 @@ mod tests { WorkerPool::new(memory_store().await, move |_| my_app_context.clone()) .register_task_type::() .register_task_type::() - .configure_queue("default", 1, RetentionMode::default()) - .configure_queue("other_queue", 1, RetentionMode::default()) + .configure_queue("default".into()) + .configure_queue("other_queue".into()) .start(futures::future::ready(())) .await .unwrap(); @@ -348,7 +418,7 @@ mod tests { let (join_handle, queue) = WorkerPool::new(memory_store().await, move |_| my_app_context.clone()) .register_task_type::() - .configure_queue("default", 1, RetentionMode::default()) + .configure_queue("default".into()) .start(async move { rx.await.unwrap(); println!("Worker pool got notified to stop"); @@ -437,7 +507,7 @@ mod tests { move |_| my_app_context.clone() }) .register_task_type::() - .configure_queue("default", 1, RetentionMode::default()) + .configure_queue("default".into()) .start(async move { rx.await.unwrap(); println!("Worker pool got notified to stop"); @@ -530,7 +600,7 @@ mod tests { move |_| player_context.clone() }) .register_task_type::() - .configure_queue("default", 1, RetentionMode::default()) + .configure_queue("default".into()) .start(async move { should_stop.await.unwrap(); println!("Worker pool got notified to stop"); @@ -575,7 +645,9 @@ mod tests { let (join_handle, _queue) = WorkerPool::new(pg_task_store().await, move |_| my_app_context.clone()) .register_task_type::() - .configure_queue(GreetingTask::QUEUE, 1, RetentionMode::RemoveDone) + .configure_queue( + QueueConfig::new(GreetingTask::QUEUE).retention_mode(RetentionMode::RemoveDone), + ) .start(futures::future::ready(())) .await .unwrap();