diff --git a/.github/FUNDING.yml b/.github/FUNDING.yml deleted file mode 100644 index 3475f51..0000000 --- a/.github/FUNDING.yml +++ /dev/null @@ -1,4 +0,0 @@ -# These are supported funding model platforms - -github: [ayrat555, pxp9] -custom: ["https://paypal.me/AyratBadykov", "https://paypal.me/PMR9"] diff --git a/.github/workflows/docs.yml b/.github/workflows/docs.yml deleted file mode 100644 index 80cb4a8..0000000 --- a/.github/workflows/docs.yml +++ /dev/null @@ -1,23 +0,0 @@ -name: docs - -on: - workflow_dispatch: - -jobs: - build: - name: Deploy to gh-pages - runs-on: ubuntu-latest - steps: - - uses: actions/checkout@v3 - with: - ref: 'master' - submodules: 'recursive' - - run: | - git fetch --no-tags --prune --depth=1 origin +refs/heads/*:refs/remotes/origin/* - - run: | - git config --global user.email "ayratin555@gmail.com" - git config --global user.name "Ayrat Badykov" - - - name: Deploy docs - run: | - source ./docs/deploy.sh && build && deploy diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 4d200f4..f5d3031 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -19,7 +19,7 @@ jobs: image: postgres # Provide the password for postgres env: - POSTGRES_PASSWORD: postgres + POSTGRES_PASSWORD: password POSTGRES_USER: postgres # Set health checks to wait until postgres has started @@ -60,9 +60,11 @@ jobs: - name: Run tests uses: actions-rs/cargo@v1 + env: + DATABASE_URL: postgres://postgres:password@localhost:5432 with: command: test - args: --verbose --all-features + args: --verbose -- --test-threads 1 - name: Run dirty tests uses: actions-rs/cargo@v1 diff --git a/Cargo.toml b/Cargo.toml index 8630409..e3d707f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,11 +24,12 @@ serde_derive = "1.0" serde_json = "1.0" sha2 = "0.10" thiserror = "1.0" -typed-builder = "0.12" +typed-builder = "0.13" typetag = "0.2" uuid = { version = "1.1", features = ["v4", "serde"] } async-trait = "0.1" async-recursion = "1" +futures = "0.3" diesel = { version = "2.0", features = ["postgres", "serde_json", "chrono", "uuid"] } diesel-derive-newtype = "2.0.0-rc.0" diesel-async = { version = "0.2", features = ["postgres", "bb8"] } diff --git a/Makefile b/Makefile index f955856..df74c47 100644 --- a/Makefile +++ b/Makefile @@ -1,20 +1,25 @@ -db: - docker run --rm -d --name postgres -p 5432:5432 \ - -e POSTGRES_DB=fang \ - -e POSTGRES_USER=postgres \ - -e POSTGRES_PASSWORD=postgres \ - postgres:latest -clippy: - cargo clippy --all-features -diesel: - DATABASE_URL=postgres://postgres:postgres@localhost/fang diesel migration run -stop: - docker kill postgres -tests: - DATABASE_URL=postgres://postgres:postgres@localhost/fang cargo test --all-features -- --color always --nocapture +PHONY: db, clippy, diesel, stop, tests, ignored, doc -ignored: - DATABASE_URL=postgres://postgres:postgres@localhost/fang cargo test --all-features -- --color always --nocapture --ignored +DATABASE_URL := postgres://postgres:password@localhost/backie + +db: + docker run --rm -d --name backie-db -p 5432:5432 \ + -e POSTGRES_DB=backie \ + -e POSTGRES_USER=postgres \ + -e POSTGRES_PASSWORD=password \ + postgres:latest + +clippy: + cargo clippy --tests -- -D clippy::all + +diesel: + DATABASE_URL=$(DATABASE_URL) diesel migration run + +stop: + docker kill backie-db + +tests: + DATABASE_URL=$(DATABASE_URL) cargo test --all-features -- --color always --nocapture --test-threads 1 doc: cargo doc --open diff --git a/docs/config.toml b/docs/config.toml deleted file mode 100644 index 2075b46..0000000 --- a/docs/config.toml +++ /dev/null @@ -1,112 +0,0 @@ -# The URL the site will be built for -base_url = "https://fang.badykov.com" -title = "Fang" -description = "Fang is a background task processing for Rust. It uses Postgres DB as a task queue." - -# The site theme to use. -theme = "adidoks" - -# The default language; used in feeds and search index -# Note: the search index doesn't support Chinese/Japanese/Korean Languages -default_language = "en" - -# Whether to automatically compile all Sass files in the sass directory -compile_sass = true - -# Whether to generate a feed file for the site -generate_feed = true - -# When set to "true", the generated HTML files are minified. -minify_html = false - -# The taxonomies to be rendered for the site and their configuration. -taxonomies = [ - {name = "authors"}, # Basic definition: no feed or pagination -] - -# Whether to build a search index to be used later on by a JavaScript library -# When set to "true", a search index is built from the pages and section -# content for `default_language`. -build_search_index = false - -[search] -# Whether to include the title of the page/section in the index -include_title = true -# Whether to include the description of the page/section in the index -include_description = false -# Whether to include the rendered content of the page/section in the index -include_content = true - -[markdown] -# Whether to do syntax highlighting. -# Theme can be customised by setting the `highlight_theme` -# variable to a theme supported by Zola -highlight_code = true - -[extra] -# Put all your custom variables here -author = "Ayrat Badykov, Pepe Márquez" -github = "https://github.com/ayrat555/fang" - -# If running on netlify.app site, set to true -is_netlify = true - -# Set HTML file language -language_code = "en-US" - -# Set theme-color meta tag for Chrome browser -theme_color = "#fff" - -# More about site's title -title_separator = "|" # set as |, -, _, etc -title_addition = "Background processing" - - -# Set date format in blog publish metadata -timeformat = "%B %e, %Y" # e.g. June 14, 2021 -timezone = "America/New_York" - -# Edit page on reposity or not -edit_page = false -docs_repo = "https://github.com/ayrat555/fang" -repo_branch = "master" - -## Math settings -# options: true, false. Enable math support globally, -# default: false. You can always enable math on a per page. -math = false -library = "katex" # options: "katex", "mathjax". default is "katex". - -## Open Graph + Twitter Cards -[extra.open] -enable = true -# this image will be used as fallback if a page has no image of its own -image = "logo.png" -twitter_site = "" -twitter_creator = "" -facebook_author = "" -facebook_publisher = "" -og_locale = "en_US" - -## JSON-LD -[extra.schema] -type = "FANG" -logo = "logo.png" -twitter = "" -linked_in = "" -github = "https://github.com/ayrat555/fang" -section = "blog" # see config.extra.main~url -## Sitelinks Search Box -site_links_search_box = false - - -[[extra.menu.social]] -name = "GitHub" -pre = '' -url = "https://github.com/ayrat555/fang" -post = "v0.1.0" -weight = 20 - -# Footer contents -[extra.footer] -info = 'Built with ❤️ by Ayrat Badykov and Pepe Márquez' diff --git a/docs/content/_index.md b/docs/content/_index.md deleted file mode 100644 index 9300752..0000000 --- a/docs/content/_index.md +++ /dev/null @@ -1,58 +0,0 @@ -+++ -title = "FANG" - - -# The homepage contents -[extra] -lead = 'Fang is a background task processing for Rust. It uses Postgres DB as a task queue.' - -url = "/docs/readme" - -url_button = "Get started" -repo_version = "GitHub v0.10.2" -repo_license = "Open-source MIT License." -repo_url = "https://github.com/ayrat555/fang" - -# Menu items -[[extra.menu.main]] -name = "README" -section = "docs" -url = "/docs/readme" -weight = 10 - -[[extra.menu.main]] -name = "CHANGELOG" -section = "docs" -url = "/docs/changelog" - -[[extra.menu.main]] -name = "Blog" -section = "blog" -url = "/blog/" -weight = 20 - -[[extra.list]] -title = "Async and threaded workers" -content = 'Workers can be started in threads (threaded workers) or tokio tasks (async workers)' - -[[extra.list]] -title = "Scheduled tasks" -content = 'Tasks can be scheduled at any time in the future' - -[[extra.list]] -title = "Periodic (CRON) tasks" -content = 'Tasks can be scheduled using cron expressions' - -[[extra.list]] -title = "Unique tasks" -content = 'Tasks are not duplicated in the queue if they are unique' - -[[extra.list]] -title = "Single-purpose workers" -content = 'Tasks are stored in a single table but workers can execute only tasks of the specific type' -+++ - -[[extra.list]] -title = "Retries" -content = 'Tasks can be retried with a custom backoff mode' -+++ diff --git a/docs/content/authors/_index.md b/docs/content/authors/_index.md deleted file mode 100644 index fa3abd1..0000000 --- a/docs/content/authors/_index.md +++ /dev/null @@ -1,15 +0,0 @@ -+++ -title = "Authors" -description = "The authors of the blog articles." -date = 2022-09-06T08:00:00.00Z -updated = 2022-09-06T08:00:00.00Z -draft = false - -# Authors - -[extra.author_pages] -"ayrat-badykov" = "authors/ayrat-badykov.md" -"pepe-marquez" = "authors/pepe-marquez.md" -+++ - -The authors of the blog articles. diff --git a/docs/content/authors/ayrat-badykov.md b/docs/content/authors/ayrat-badykov.md deleted file mode 100644 index 213948c..0000000 --- a/docs/content/authors/ayrat-badykov.md +++ /dev/null @@ -1,11 +0,0 @@ -+++ -title = "Ayrat Badykov" -description = "Creator of Fang." -date = 2021-04-01T08:50:45.00Z -updated = 2021-04-01T08:50:45.00Z -draft = false -+++ - -Co-creator of **Fang** - -[@ayrat555](https://github.com/ayrat555) diff --git a/docs/content/authors/pepe-marquez.md b/docs/content/authors/pepe-marquez.md deleted file mode 100644 index 8120b32..0000000 --- a/docs/content/authors/pepe-marquez.md +++ /dev/null @@ -1,11 +0,0 @@ -+++ -title = "Pepe Márquez Romero" -description = "Co-Creator of Fang." -date = 2021-04-01T08:50:45.00Z -updated = 2021-04-01T08:50:45.00Z -draft = false -+++ - -Co-creator of **Fang**. - -[@pxp9](https://github.com/pxp9) diff --git a/docs/content/blog/2022-08-06-async-processing.md b/docs/content/blog/2022-08-06-async-processing.md deleted file mode 100644 index 97e5572..0000000 --- a/docs/content/blog/2022-08-06-async-processing.md +++ /dev/null @@ -1,189 +0,0 @@ -+++ -title = "Fang, async background processing for Rust" -description = "Async background processing for rust with tokio and postgres" -date = 2022-08-06T08:00:00+00:00 -updated = 2022-08-06T08:00:00+00:00 -template = "blog/page.html" -sort_by = "weight" -weight = 1 -draft = false - -[taxonomies] -authors = ["Ayrat Badykov", "Pepe Márquez"] - -[extra] -lead = 'Async background processing for rust with tokio and postgres' -images = [] -+++ - -Even though the first stable version of Rust was released in 2015, there are still some holes in its ecosystem for solving common tasks. One of which is background processing. - -In software engineering background processing is a common approach for solving several problems: - -- Carry out periodic tasks. For example, deliver notifications, update cached values. -- Defer expensive work so your application stays responsive while performing calculations in the background - -Most programming languages have go-to background processing frameworks/libraries. For example: - -- Ruby - [sidekiq](https://github.com/mperham/sidekiq). It uses Redis as a job queue. -- Python - [dramatiq](https://github.com/Bogdanp/dramatiq). It uses RabbitMQ as a job queue. -- Elixir - [oban](https://github.com/sorentwo/oban). It uses a Postgres DB as a job queue. - -The async programming (async/await) can be used for background processing but it has several major disadvantages if used directly: - -- It doesn't give control of the number of tasks that are being executed at any given time. So a lot of spawned tasks can overload a thread/threads that they're started on. -- It doesn't provide any monitoring which can be useful to investigate your system and find bottlenecks -- Tasks are not persistent. So all enqueued tasks are lost on every application restart - -To solve these shortcomings of the async programming we implemented the async processing in [the fang library](https://github.com/ayrat555/fang). - -## Threaded Fang - -Fang is a background processing library for rust. The first version of Fang was released exactly one year ago. Its key features were: - -- Each worker is started in a separate thread -- A Postgres table is used as the task queue - -This implementation was written for a specific use case - [el monitorro bot](https://github.com/ayrat555/el_monitorro). This specific implementation of background processing was proved by time. Each day it processes more and more feeds every minute (the current number is more than 3000). Some users host the bot on their infrastructure. - -You can find out more about the threaded processing in fang in [this blog post](https://www.badykov.com/rust/fang/). - -## Async Fang - -
-

-Async provides significantly reduced CPU and memory overhead, especially for workloads with a large amount of IO-bound tasks, such as servers and databases. All else equal, you can have orders of magnitude more tasks than OS threads, because an async runtime uses a small amount of (expensive) threads to handle a large amount of (cheap) tasks -

- -
- -For some lightweight background tasks, it's cheaper to run them on the same thread using async instead of starting one thread per worker. That's why we implemented this kind of processing in fang. Its key features: - -- Each worker is started as a tokio task -- If any worker fails during task execution, it's restarted -- Tasks are saved to a Postgres database. Instead of diesel, [tokio-postgres](https://github.com/sfackler/rust-postgres) is used to interact with a db. The threaded processing uses the [diesel](https://github.com/diesel-rs/diesel) ORM which blocks the thread. -- The implementation is based on traits so it's easy to implement additional backends (redis, in-memory) to store tasks. - -## Usage - -The usage is straightforward: - -1. Define a serializable task by adding `serde` derives to a task struct. -2. Implement `AsyncRunnable` runnable trait for fang to be able to run it. -3. Start workers. -4. Enqueue tasks. - -Let's go over each step. - -### Define a job - -```rust -use fang::serde::{Deserialize, Serialize}; - -#[derive(Serialize, Deserialize)] -#[serde(crate = "fang::serde")] -pub struct MyTask { - pub number: u16, -} - -impl MyTask { - pub fn new(number: u16) -> Self { - Self { number } - } -} -``` - -Fang re-exports `serde` so it's not required to add it to the `Cargo.toml` file - - -### Implement the AsyncRunnable trait - -```rust -use fang::async_trait; -use fang::typetag; -use fang::AsyncRunnable; -use std::time::Duration; - -#[async_trait] -#[typetag::serde] -impl AsyncRunnable for MyTask { - async fn run(&self, queue: &mut dyn AsyncQueueable) -> Result<(), Error> { - let new_task = MyTask::new(self.number + 1); - queue - .insert_task(&new_task as &dyn AsyncRunnable) - .await - .unwrap(); - - log::info!("the current number is {}", self.number); - tokio::time::sleep(Duration::from_secs(3)).await; - - Ok(()) - } -} - -``` - -- Fang uses the [typetag library](https://github.com/dtolnay/typetag) to serialize trait objects and save them to the queue. -- The [async-trait](https://github.com/dtolnay/async-trait) is used for implementing async traits - -### Init queue - -```rust -use fang::asynk::async_queue::AsyncQueue; - -let max_pool_size: u32 = 2; -let mut queue = AsyncQueue::builder() - .uri("postgres://postgres:postgres@localhost/fang") - .max_pool_size(max_pool_size) - .duplicated_tasks(true) - .build(); -``` - - -### Start workers - -```rust -use fang::asynk::async_worker_pool::AsyncWorkerPool; -use fang::NoTls; - -let mut pool: AsyncWorkerPool> = AsyncWorkerPool::builder() - .number_of_workers(10_u32) - .queue(queue.clone()) - .build(); - -pool.start().await; -``` - -### Insert tasks - -```rust -let task = MyTask::new(0); - -queue - .insert_task(&task1 as &dyn AsyncRunnable) - .await - .unwrap(); -``` - -## Pitfalls - -The async processing is suitable for lightweight tasks. But for heavier tasks it's advised to use one of the following approaches: - -- start a separate tokio runtime to run fang workers -- use the threaded processing feature implemented in fang instead of the async processing - -## Future directions - -There are a couple of features planned for fang: - -- Retries with different backoff modes -- Additional backends (in-memory, redis) -- Graceful shutdown for async workers (for the threaded processing this feature is implemented) -- Cron jobs - -## Conclusion - -The project is available on [GitHub](https://github.com/ayrat555/fang) - -The async feature and this post is written in collaboration between [Ayrat Badykov](https://www.badykov.com/) ([github](https://github.com/ayrat555)) and [Pepe Márquez Romero](https://pxp9.github.io/) ([github](https://github.com/pxp9)) - diff --git a/docs/content/blog/2022-09-08-fang-09-release.md b/docs/content/blog/2022-09-08-fang-09-release.md deleted file mode 100644 index 023e199..0000000 --- a/docs/content/blog/2022-09-08-fang-09-release.md +++ /dev/null @@ -1,163 +0,0 @@ -+++ -title = "Fang 0.9" -description = "What's new with the release of fang 0.9" -date = 2022-09-09T16:45:22+00:00 -updated = 2022-09-09T16:45:22+00:00 -template = "blog/page.html" -sort_by = "weight" -weight = 1 -draft = false - -[taxonomies] -authors = ["Pepe Márquez", "Ayrat Badykov"] - -[extra] -lead = "What's new with the release of fang 0.9" -images = [] -+++ - -## Major changes - -- Simplify the database schema -- Improve the way tasks are scheduled -- Add CRON tasks support to both modules (asynk and blocking) -- Update the diesel crate to 2.0 (used only by blocking module) -- Major refactoring of the blocking module - -### Simplify the DB schema - -We got rid of the periodic tasks table. Now periodic, scheduled and one-time tasks are stored in the same table (`fang_tasks`). - -We added two new fields to the `fang_tasks` table - -- `scheduled_at` - based on this table tasks are scheduled. Workers fetch tasks with `scheduled_at` <= `current_time` -- `uniq_hash` - hash calculated from the JSON metadata of the task. Based on this field tasks are deduplicated. - -So changed schema is looking like this: - -```sql -CREATE TABLE fang_tasks ( - id uuid PRIMARY KEY DEFAULT uuid_generate_v4(), - metadata jsonb NOT NULL, - error_message TEXT, - state fang_task_state DEFAULT 'new' NOT NULL, - task_type VARCHAR DEFAULT 'common' NOT NULL, - uniq_hash CHAR(64), - scheduled_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(), - created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(), - updated_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW() -); -``` - -### Schedule tasks - -Let's examine how periodic tasks can be created with fang 0.9. - -The only method that should be defined is the `cron` method in the `Runnable`(blocking)/`AsyncRunnable`(asynk) trait implementation. - -Let's take a look at an example: - -```rust -impl AsyncRunnable for MyCronTask { - async fn run(&self, _queue: &mut dyn AsyncQueueable) -> Result<(), Error> { - log::info!("CRON!!!!!!!!!!!!!!!",); - - Ok(()) - } - - // you must use fang::Scheduled enum. - fn cron(&self) -> Option { - // cron expression to execute a task every 20 seconds. - let expression = "0/20 * * * * * *"; - Some(Scheduled::CronPattern(expression.to_string())) - } - - fn uniq(&self) -> bool { - true - } -} -``` - -Also, it is possible to schedule a task only once. - -```rust -impl AsyncRunnable for MyCronTask { - async fn run(&self, _queue: &mut dyn AsyncQueueable) -> Result<(), Error> { - log::info!("CRON!!!!!!!!!!!!!!!",); - - Ok(()) - } - - // you must use fang::Scheduled enum. - fn cron(&self) -> Option { - // You must use DateTime to specify - // when in the future you would like schedule the task. - - // This will schedule the task for within 7 seconds. - Some(Scheduled::ScheduleOnce(Utc::now() + Duration::seconds(7i64))) - } - - fn uniq(&self) -> bool { - true - } -} -``` - -More examples are available at [fang examples](https://github.com/ayrat555/fang/tree/master/fang_examples) - -It is no longer needed to start the scheduler process, the scheduled tasks will be executed by `WorkerPool` or `AsyncWorkerPool`. If a task is periodic, it will be re-scheduled before its next execution. - -### Blocking refactor - -- We deleted the graceful shutdown feature of the blocking module. But we're planning to re-implement it in the future. - -- We completely changed most of the blocking module's API. - -The reason for this change is to unify the APIs of the blocking and the asynk modules. So users can easily switch between blocking and async workers. - -Another reason is we wanted to do a trait for the task queue in the blocking module. It opens a possibility to implement new backends for the blocking module. - -A new API of the blocking queues looks like this: - -```rust -pub trait Queueable { - fn fetch_and_touch_task(&self, task_type: String) -> Result, QueueError>; - - fn insert_task(&self, params: &dyn Runnable) -> Result; - - fn remove_all_tasks(&self) -> Result; - - fn remove_all_scheduled_tasks(&self) -> Result; - - fn remove_tasks_of_type(&self, task_type: &str) -> Result; - - fn remove_task(&self, id: Uuid) -> Result; - - fn find_task_by_id(&self, id: Uuid) -> Option; - - fn update_task_state(&self, task: &Task, state: FangTaskState) -> Result; - - fn fail_task(&self, task: &Task, error: String) -> Result; - - fn schedule_task(&self, task: &dyn Runnable) -> Result; -} -``` - -- Another change we want to highlight is that we updated Diesel to 2.0 (used only in the blocking module to interact with the DB) - -Pre 0.9 release was tested in real projects: - -- [el_monitorro](https://github.com/ayrat555/el_monitorro/) -- [weather_bot_rust](https://github.com/pxp9/weather_bot_rust/). - -## Future directions - -- Retries with different backoff modes -- Additional backends (in-memory, redis) -- Graceful shutdown for both modules - -## Conclusion - -The project is available on [GitHub](https://github.com/ayrat555/fang) - -The new release and this post is written in collaboration between [Pepe Márquez Romero](https://pxp9.github.io/) ([github](https://github.com/pxp9)) and [Ayrat Badykov](https://www.badykov.com/) ([github](https://github.com/ayrat555)). diff --git a/docs/content/blog/_index.md b/docs/content/blog/_index.md deleted file mode 100644 index 95ecd8e..0000000 --- a/docs/content/blog/_index.md +++ /dev/null @@ -1,7 +0,0 @@ -+++ -title = "Blog" -description = "Blog" -sort_by = "date" -paginate_by = 2 -template = "blog/section.html" -+++ diff --git a/docs/content/docs/_index.md b/docs/content/docs/_index.md deleted file mode 100644 index e292660..0000000 --- a/docs/content/docs/_index.md +++ /dev/null @@ -1,10 +0,0 @@ -+++ -title = "Docs" -description = "The documentation of Fang library." -date = 2022-09-06T08:00:00.00Z -updated = 2022-09-06T08:00:00.00Z -template = "docs/section.html" -sort_by = "weight" -weight = 4 -draft = false -+++ diff --git a/docs/deploy.sh b/docs/deploy.sh deleted file mode 100755 index dbc439b..0000000 --- a/docs/deploy.sh +++ /dev/null @@ -1,47 +0,0 @@ -#!/usr/bin/env bash -set -e - -BRANCH="gh-pages" - -build() { - echo "Starting building..." - - TIME=$(date +"%Y-%m-%dT%H:%M:%S.00Z") - - printf "+++\ntitle = \"CHANGELOG\"\ndate = $TIME\nupdated = $TIME\ndraft = false\nweight = 410\nsort_by = \"weight\"\ntemplate = \"docs/page.html\"\n\n[extra]\ntoc = true\ntop = false\n+++\n\n" > docs/content/docs/CHANGELOG.md - - cat CHANGELOG.md >> docs/content/docs/CHANGELOG.md - - printf "+++\ntitle = \"README\"\ndate = $TIME\nupdated = $TIME\ndraft = false\nweight = 410\nsort_by = \"weight\"\ntemplate = \"docs/page.html\"\n\n[extra]\ntoc = true\ntop = false\n+++\n\n" > docs/content/docs/README.md - - cat README.md >> docs/content/docs/README.md - - - cp -R docs ../docs_backup - rm -r * - cp -R ../docs_backup ./docs - cd docs - - sudo snap install --edge zola - zola build - mv public /tmp/public - cd .. -} - -deploy() { - echo "Starting deploying..." - git config --global url."https://".insteadOf git:// - git config --global url."https://github.com/".insteadOf git@github.com: - - git checkout ${BRANCH} - cp -vr /tmp/public/* . - git config user.name "GitHub Actions" - git config user.email "github-actions-bot@users.noreply.github.com" - - rm -r docs/themes - git add . - git commit -m "Deploy new version docs" - git push --force "https://${GITHUB_TOKEN}@github.com/${GITHUB_REPOSITORY}.git" ${BRANCH} - - echo "Deploy complete" -} diff --git a/docs/static/favicon.ico b/docs/static/favicon.ico deleted file mode 100644 index 41957b4..0000000 Binary files a/docs/static/favicon.ico and /dev/null differ diff --git a/docs/static/logo.png b/docs/static/logo.png deleted file mode 100644 index d22a761..0000000 Binary files a/docs/static/logo.png and /dev/null differ diff --git a/docs/themes/adidoks b/docs/themes/adidoks deleted file mode 160000 index 5c69827..0000000 --- a/docs/themes/adidoks +++ /dev/null @@ -1 +0,0 @@ -Subproject commit 5c698271c460046034605b743a15196b12e32887 diff --git a/examples/simple_worker/src/lib.rs b/examples/simple_worker/src/lib.rs index bbac6d5..ee847f0 100644 --- a/examples/simple_worker/src/lib.rs +++ b/examples/simple_worker/src/lib.rs @@ -1,10 +1,7 @@ -use std::convert::Infallible; -use fang::queue::AsyncQueueable; -use fang::runnable::AsyncRunnable; -use fang::errors::FrangoError; use std::time::Duration; use async_trait::async_trait; use serde::{Serialize, Deserialize}; +use backie::{RunnableTask, Queueable}; #[derive(Serialize, Deserialize)] pub struct MyTask { @@ -30,8 +27,8 @@ impl MyFailingTask { #[async_trait] #[typetag::serde] -impl AsyncRunnable for MyTask { - async fn run(&self, queue: &mut dyn AsyncQueueable) -> Result<(), Infallible> { +impl RunnableTask for MyTask { + async fn run(&self, _queue: &mut dyn Queueable) -> Result<(), Box> { // let new_task = MyTask::new(self.number + 1); // queue // .insert_task(&new_task as &dyn AsyncRunnable) @@ -41,14 +38,15 @@ impl AsyncRunnable for MyTask { log::info!("the current number is {}", self.number); tokio::time::sleep(Duration::from_secs(3)).await; + log::info!("done.."); Ok(()) } } #[async_trait] #[typetag::serde] -impl AsyncRunnable for MyFailingTask { - async fn run(&self, queue: &mut dyn AsyncQueueable) -> Result<(), FrangoError> { +impl RunnableTask for MyFailingTask { + async fn run(&self, _queue: &mut dyn Queueable) -> Result<(), Box> { // let new_task = MyFailingTask::new(self.number + 1); // queue // .insert_task(&new_task as &dyn AsyncRunnable) diff --git a/examples/simple_worker/src/main.rs b/examples/simple_worker/src/main.rs index cf4373d..ed207aa 100644 --- a/examples/simple_worker/src/main.rs +++ b/examples/simple_worker/src/main.rs @@ -1,18 +1,15 @@ -use fang::queue::PgAsyncQueue; -use fang::queue::AsyncQueueable; -use fang::worker_pool::AsyncWorkerPool; -use fang::runnable::AsyncRunnable; -use simple_async_worker::MyFailingTask; -use simple_async_worker::MyTask; +use simple_worker::MyFailingTask; +use simple_worker::MyTask; use std::time::Duration; use diesel_async::pg::AsyncPgConnection; use diesel_async::pooled_connection::{bb8::Pool, AsyncDieselConnectionManager}; +use backie::{PgAsyncQueue, WorkerPool, Queueable}; #[tokio::main] async fn main() { env_logger::init(); - let connection_url = "postgres://postgres:password@localhost/fang"; + let connection_url = "postgres://postgres:password@localhost/backie"; log::info!("Starting..."); let max_pool_size: u32 = 3; @@ -23,41 +20,51 @@ async fn main() { .build(manager) .await .unwrap(); - - let mut queue = PgAsyncQueue::builder() - .pool(pool) - .build(); - - log::info!("Queue connected..."); - - let mut workers_pool: AsyncWorkerPool = AsyncWorkerPool::builder() - .number_of_workers(10_u32) - .queue(queue.clone()) - .build(); - log::info!("Pool created ..."); - workers_pool.start().await; - log::info!("Workers started ..."); + let mut queue = PgAsyncQueue::new(pool); + + let (tx, mut rx) = tokio::sync::watch::channel(false); + + let executor_task = tokio::spawn({ + let mut queue = queue.clone(); + async move { + let mut workers_pool: WorkerPool = WorkerPool::builder() + .number_of_workers(10_u32) + .queue(queue) + .build(); + + log::info!("Workers starting ..."); + workers_pool.start(async move { + rx.changed().await; + }).await; + log::info!("Workers stopped!"); + } + }); let task1 = MyTask::new(0); let task2 = MyTask::new(20_000); let task3 = MyFailingTask::new(50_000); queue - .insert_task(&task1 as &dyn AsyncRunnable) + .create_task(&task1) .await .unwrap(); queue - .insert_task(&task2 as &dyn AsyncRunnable) + .create_task(&task2) .await .unwrap(); queue - .insert_task(&task3 as &dyn AsyncRunnable) + .create_task(&task3) .await .unwrap(); - tokio::time::sleep(Duration::from_secs(100)).await; + log::info!("Tasks created ..."); + tokio::signal::ctrl_c().await; + log::info!("Stopping ..."); + tx.send(true).unwrap(); + executor_task.await.unwrap(); + log::info!("Stopped!"); } diff --git a/src/errors.rs b/src/errors.rs index c472d12..db2fcf1 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -3,33 +3,25 @@ use std::fmt::Display; use thiserror::Error; /// Library errors -#[derive(Debug, Clone, Error)] -pub struct BackieError { - /// A description of an error - pub description: String, +#[derive(Debug, Error)] +pub enum BackieError { + QueueProcessingError(#[from] AsyncQueueError), + SerializationError(#[from] SerdeError), + ShutdownError(#[from] tokio::sync::watch::error::SendError<()>), } impl Display for BackieError { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{}", self.description) - } -} - -impl From for BackieError { - fn from(error: AsyncQueueError) -> Self { - let message = format!("{error:?}"); - BackieError { - description: message, + match self { + BackieError::QueueProcessingError(error) => { + write!(f, "Queue processing error: {}", error) + } + BackieError::SerializationError(error) => write!(f, "Serialization error: {}", error), + BackieError::ShutdownError(error) => write!(f, "Shutdown error: {}", error), } } } -impl From for BackieError { - fn from(error: SerdeError) -> Self { - Self::from(AsyncQueueError::SerdeError(error)) - } -} - /// List of error types that can occur while working with cron schedules. #[derive(Debug, Error)] pub enum CronError { diff --git a/src/lib.rs b/src/lib.rs index d1cb7e6..e4958b1 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -38,11 +38,16 @@ impl Default for RetentionMode { } } +pub use queue::PgAsyncQueue; +pub use queue::Queueable; +pub use runnable::RunnableTask; +pub use worker_pool::WorkerPool; + pub mod errors; mod queries; pub mod queue; pub mod runnable; -pub mod schema; +mod schema; pub mod task; pub mod worker; pub mod worker_pool; diff --git a/src/queue.rs b/src/queue.rs index 58710c5..8455716 100644 --- a/src/queue.rs +++ b/src/queue.rs @@ -360,7 +360,7 @@ mod async_queue_tests { #[tokio::test] async fn remove_all_tasks_test() { let pool = pool().await; - let mut test = PgAsyncQueue::new(pool.into()); + let mut test = PgAsyncQueue::new(pool); let task = test.create_task(&AsyncTask { number: 1 }).await.unwrap(); @@ -560,7 +560,7 @@ mod async_queue_tests { async fn pool() -> Pool { let manager = AsyncDieselConnectionManager::::new( - "postgres://postgres:password@localhost/backie", + option_env!("DATABASE_URL").expect("DATABASE_URL must be set"), ); Pool::builder() .max_size(1) diff --git a/src/runnable.rs b/src/runnable.rs index dde232d..e2bfb1d 100644 --- a/src/runnable.rs +++ b/src/runnable.rs @@ -5,7 +5,7 @@ use crate::Scheduled; use async_trait::async_trait; use std::error::Error; -pub const RETRIES_NUMBER: i32 = 20; +pub const RETRIES_NUMBER: i32 = 5; /// Task that can be executed by the queue. /// diff --git a/src/worker.rs b/src/worker.rs index 95c248e..ef8ca47 100644 --- a/src/worker.rs +++ b/src/worker.rs @@ -4,13 +4,14 @@ use crate::runnable::RunnableTask; use crate::task::{Task, TaskType}; use crate::RetentionMode; use crate::Scheduled::*; -use log::error; +use futures::future::FutureExt; +use futures::select; use std::error::Error; use typed_builder::TypedBuilder; /// it executes tasks only of task_type type, it sleeps when there are no tasks in the queue #[derive(TypedBuilder)] -pub struct AsyncWorker +pub struct Worker where Q: Queueable + Clone + Sync + 'static, { @@ -22,22 +23,12 @@ where #[builder(default, setter(into))] pub retention_mode: RetentionMode, + + #[builder(default, setter(into))] + pub shutdown: Option>, } -// impl AsyncWorkerBuilder -// where -// TypedBuilderFields: Clone, -// Q: Queueable + Clone + Sync + 'static, -// { -// pub fn with_graceful_shutdown(self, signal: F) -> Self -// where -// F: Future, -// { -// self -// } -// } - -impl AsyncWorker +impl Worker where Q: Queueable + Clone + Sync + 'static, { @@ -111,25 +102,12 @@ where Ok(()) } - async fn wait(&mut self) { - // TODO: add a way to stop the worker - // Listen to postgres pubsub notification - // Listen to watchable future - // All that until a max timeout - // - // select! { - // _ = self.queue.wait_for_task(Some(self.task_type.clone())).fuse() => {}, - // _ = SleepParams::default().sleep().fuse() => {}, - // } - } - - pub(crate) async fn run_tasks(&mut self) { + pub(crate) async fn run_tasks(&mut self) -> Result<(), BackieError> { loop { - // TODO: check if should stop the worker - match self.queue.pull_next_task(self.task_type.clone()).await { - Ok(Some(task)) => { + match self.queue.pull_next_task(self.task_type.clone()).await? { + Some(task) => { let actual_task: Box = - serde_json::from_value(task.payload.clone()).unwrap(); + serde_json::from_value(task.payload.clone())?; // check if task is scheduled or not if let Some(CronPattern(_)) = actual_task.cron() { @@ -140,13 +118,25 @@ where // TODO: what do we do if the task fails? it's an internal error, inform the logs let _ = self.run(task, actual_task).await; } - Ok(None) => { - self.wait().await; - } - - Err(error) => { - error!("Failed to fetch a task {:?}", error); - self.wait().await; + None => { + // Listen to watchable future + // All that until a max timeout + match &mut self.shutdown { + Some(recv) => { + // Listen to watchable future + // All that until a max timeout + select! { + _ = recv.changed().fuse() => { + log::info!("Shutting down worker"); + return Ok(()); + } + _ = tokio::time::sleep(std::time::Duration::from_secs(1)).fuse() => {} + } + } + None => { + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + } + }; } }; } @@ -155,8 +145,8 @@ where #[cfg(test)] pub async fn run_tasks_until_none(&mut self) -> Result<(), BackieError> { loop { - match self.queue.pull_next_task(self.task_type.clone()).await { - Ok(Some(task)) => { + match self.queue.pull_next_task(self.task_type.clone()).await? { + Some(task) => { let actual_task: Box = serde_json::from_value(task.payload.clone()).unwrap(); @@ -165,18 +155,12 @@ where // program task // self.queue.schedule_task(&*actual_task).await?; } - self.wait().await; // run scheduled task self.run(task, actual_task).await?; } - Ok(None) => { + None => { return Ok(()); } - Err(error) => { - error!("Failed to fetch a task {:?}", error); - - self.wait().await; - } }; } } @@ -184,8 +168,8 @@ where #[cfg(test)] mod async_worker_tests { + use std::fmt::Display; use super::*; - use crate::errors::BackieError; use crate::queue::PgAsyncQueue; use crate::queue::Queueable; use crate::task::TaskState; @@ -198,6 +182,22 @@ mod async_worker_tests { use diesel_async::pooled_connection::{bb8::Pool, AsyncDieselConnectionManager}; use diesel_async::AsyncPgConnection; use serde::{Deserialize, Serialize}; + use thiserror::Error; + + #[derive(Error, Debug)] + enum TaskError { + SomethingWrong, + Custom(String), + } + + impl Display for TaskError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + TaskError::SomethingWrong => write!(f, "Something went wrong"), + TaskError::Custom(message) => write!(f, "{}", message), + } + } + } #[derive(Serialize, Deserialize)] struct WorkerAsyncTask { @@ -248,9 +248,7 @@ mod async_worker_tests { ) -> Result<(), Box<(dyn std::error::Error + Send + 'static)>> { let message = format!("number {} is wrong :(", self.number); - Err(Box::new(BackieError { - description: message, - })) + Err(Box::new(TaskError::Custom(message))) } fn max_retries(&self) -> i32 { @@ -268,11 +266,7 @@ mod async_worker_tests { &self, _queueable: &mut dyn Queueable, ) -> Result<(), Box<(dyn std::error::Error + Send + 'static)>> { - let message = "Failed".to_string(); - - Err(Box::new(BackieError { - description: message, - })) + Err(Box::new(TaskError::SomethingWrong)) } fn max_retries(&self) -> i32 { @@ -326,7 +320,7 @@ mod async_worker_tests { let task = insert_task(&mut test, &actual_task).await; let id = task.id; - let mut worker = AsyncWorker::::builder() + let mut worker = Worker::::builder() .queue(test.clone()) .retention_mode(RetentionMode::KeepAll) .build(); @@ -384,7 +378,7 @@ mod async_worker_tests { let id = task.id; - let mut worker = AsyncWorker::::builder() + let mut worker = Worker::::builder() .queue(test.clone()) .retention_mode(RetentionMode::KeepAll) .build(); @@ -413,11 +407,35 @@ mod async_worker_tests { let task = test.find_task_by_id(id).await.unwrap(); assert_eq!(id, task.id); assert_eq!(TaskState::Failed, task.state()); - assert_eq!("Failed".to_string(), task.error_message.unwrap()); + assert_eq!("Something went wrong".to_string(), task.error_message.unwrap()); test.remove_all_tasks().await.unwrap(); } + #[tokio::test] + async fn worker_shutsdown_when_notified() { + let pool = pool().await; + let queue = PgAsyncQueue::new(pool); + + let (tx, rx) = tokio::sync::watch::channel(()); + + let mut worker = Worker::::builder() + .queue(queue) + .shutdown(rx) + .build(); + + let handle = tokio::spawn(async move { + worker.run_tasks().await.unwrap(); + true + }); + + tx.send(()).unwrap(); + select! { + _ = handle.fuse() => {} + _ = tokio::time::sleep(core::time::Duration::from_secs(1)).fuse() => panic!("Worker did not shutdown") + } + } + #[tokio::test] async fn saves_error_for_failed_task() { let pool = pool().await; @@ -428,7 +446,7 @@ mod async_worker_tests { let task = insert_task(&mut test, &failed_task).await; let id = task.id; - let mut worker = AsyncWorker::::builder() + let mut worker = Worker::::builder() .queue(test.clone()) .retention_mode(RetentionMode::KeepAll) .build(); @@ -459,7 +477,7 @@ mod async_worker_tests { let id12 = task12.id; let id2 = task2.id; - let mut worker = AsyncWorker::::builder() + let mut worker = Worker::::builder() .queue(test.clone()) .task_type(TaskType::from("type1")) .retention_mode(RetentionMode::KeepAll) @@ -493,7 +511,7 @@ mod async_worker_tests { let _id12 = task12.id; let id2 = task2.id; - let mut worker = AsyncWorker::::builder() + let mut worker = Worker::::builder() .queue(test.clone()) .task_type(TaskType::from("type1")) .build(); @@ -521,7 +539,7 @@ mod async_worker_tests { async fn pool() -> Pool { let manager = AsyncDieselConnectionManager::::new( - "postgres://postgres:password@localhost/backie", + option_env!("DATABASE_URL").expect("DATABASE_URL must be set"), ); Pool::builder() .max_size(1) diff --git a/src/worker_pool.rs b/src/worker_pool.rs index c5f5407..63a0874 100644 --- a/src/worker_pool.rs +++ b/src/worker_pool.rs @@ -1,13 +1,16 @@ +use crate::errors::BackieError; use crate::queue::Queueable; use crate::task::TaskType; -use crate::worker::AsyncWorker; +use crate::worker::Worker; use crate::RetentionMode; use async_recursion::async_recursion; use log::error; +use std::future::Future; +use tokio::sync::watch::Receiver; use typed_builder::TypedBuilder; #[derive(TypedBuilder, Clone)] -pub struct AsyncWorkerPool +pub struct WorkerPool where AQueue: Queueable + Clone + Sync + 'static, { @@ -24,34 +27,65 @@ where pub number_of_workers: u32, /// The type of tasks that will be executed by `AsyncWorkerPool`. - #[builder(default=None, setter(into))] + #[builder(default, setter(into))] pub task_type: Option, } -impl AsyncWorkerPool +// impl AsyncWorkerBuilder +// where +// TypedBuilderFields: Clone, +// Q: Queueable + Clone + Sync + 'static, +// { +// pub fn with_graceful_shutdown(self, signal: F) -> Self +// where +// F: Future, +// { +// self +// } +// } + +impl WorkerPool where AQueue: Queueable + Clone + Sync + 'static, { /// Starts the configured number of workers /// This is necessary in order to execute tasks. - pub async fn start(&mut self) { + pub async fn start(&mut self, graceful_shutdown: F) -> Result<(), BackieError> + where + F: Future + Send + 'static, + { + let (tx, rx) = tokio::sync::watch::channel(()); for idx in 0..self.number_of_workers { let pool = self.clone(); - tokio::spawn(Self::supervise_task(pool, 0, idx)); + // TODO: the worker pool keeps track of the number of workers and spawns new workers as needed. + // There should be always a minimum number of workers active waiting for tasks to execute + // or for a gracefull shutdown. + tokio::spawn(Self::supervise_task(pool, rx.clone(), 0, idx)); } + graceful_shutdown.await; + tx.send(())?; + log::info!("Worker pool stopped gracefully"); + Ok(()) } #[async_recursion] - async fn supervise_task(pool: AsyncWorkerPool, restarts: u64, worker_number: u32) { + async fn supervise_task( + pool: WorkerPool, + receiver: Receiver<()>, + restarts: u64, + worker_number: u32, + ) { let restarts = restarts + 1; let inner_pool = pool.clone(); + let inner_receiver = receiver.clone(); let join_handle = tokio::spawn(async move { - let mut worker: AsyncWorker = AsyncWorker::builder() + let mut worker: Worker = Worker::builder() .queue(inner_pool.queue.clone()) .retention_mode(inner_pool.retention_mode) .task_type(inner_pool.task_type.clone()) + .shutdown(inner_receiver) .build(); worker.run_tasks().await @@ -62,7 +96,7 @@ where "Worker {} stopped. Restarting. the number of restarts {}", worker_number, restarts, ); - Self::supervise_task(pool, restarts, worker_number).await; + Self::supervise_task(pool, receiver, restarts, worker_number).await; } } }