RQ (Redis Queue) is a simple Python library for queueing jobs and processing them in the background with workers. It is backed by Redis and it is designed to have a low barrier to entry.
From the rq docs:
A job is a Python object, representing a function that is invoked asynchronously in a worker (background) process. Any Python function can be invoked asynchronously, by simply pushing a reference to the function and its arguments onto a queue. This is called enqueueing.
- Workers
- These are services (when running on DBT Platform) or containers (when running locally on Docker). They loop through their assigned queues looking for jobs to process. They are started from the scripts
long-running-worker.py
andshort-running-worker.py
.
- These are services (when running on DBT Platform) or containers (when running locally on Docker). They loop through their assigned queues looking for jobs to process. They are started from the scripts
- Queues
- These are just like real life queues, identified by unique strings (e.g. ‘long-running’). Jobs can be ‘enqueued’ on them, to be run asynchronously in order.
- Jobs
- A job represents a function to be called by the worker.
- Job_scheduler
- This is the main interface to enqueue jobs. Based on the arguments it will either call DataHubScheduler’s
enqueue
orcron
functions.
- This is the main interface to enqueue jobs. Based on the arguments it will either call DataHubScheduler’s
- DataHubScheduler
- This is the class to ensure that we start workers and enqueue or schedule jobs in a consistent way.
- Rate Limit Library
- Python redis rate limit library to helping to enforce a set rate to help with reducing or throttling the load, especially for usage with 3rd party API's that enforce this.
First decide if it's a one off job, or you want to it to repeat regularly.
(schedule to run when it reaches the front of a queue)
- Call
job_scheduler()
with the function you want to get called, e.g.job_scheduler(hello_world)
some more examples are in the tests - You can configure args and kwargs for the function, and override the default queue, set intervals etc.
(repeat job at set intervals/times)
- The same as step 1 above, but pass in a
cron
value and it will repeat accordingly. There are constants for some often used cron values.. Cron tasks listed in./cron-scheduler.py
are automatically scheduled when the service (re)starts. Note: Cannot be used if time_delta is defined.
(Schedules the job to run at X seconds/minutes/hours/days/weeks later)
- The same as step 1 above, but pass in a
time_delta
value. Note: Cannot be used if cron is defined.
When running tests jobs are scheduled but not executed (as the workers do not automatically run during tests). If you want to run a test that automatically runs the scheduled jobs include the fixture async_queue
.
def test_stores_notification_id(
self,
async_queue,
):
# Code under test that calls job_scheduler will be executed.
Additional examples in datahub/reminder/test/test_task.py/TestGeneratenoRecentInteractionReminderTask/test_stores_notification_id or various occurences of async_queue in datahub/core/test/.queues/test_scheduler.py.
This will allow you to monitor RQ locally, to see statistics like failures, successes, average duration and when most things were executed, as well as the queues defined for RQ.
The core service for facilitating RQ exported information is RQ exporter. This will run on localhost:9726 to facilitate any RQ monitoring as data source. The configuration for the environment variables can be overridden, see more information at Environment variable configuration. The datasource will be setup through Prometheus.
NOTE: If you have a new or flushed docker system make sure you run docker-compose -f docker-compose.yml -f docker-compose-rq-monitor.yml up --build
to build all the images or simply run docker compose up --build
to build the core dependencies.
- Starting all the services
make start-rq-mon
- Stopping all the services
make stop-rq-mon
-
Jobs are being queued but not started
Means the service to work those queues has not been started or the started queue is blocking that from being started.
-
Jobs are failing These will stay failed for a long duration for the purposes of flagging issues, potentially writing code to requeue failed jobs and realign jobs that can pass under different circumstances and jobs that will never pass.
-
Scheduled jobs are being repeated/run too many times
Cron jobs need to be canceled or they will run forever even if the app has restarted/been redeployed. Hence the
cancel_existing_cron_jobs
in thecron-scheduler
. NOTE: this boot up file is also used to configure all the cron jobs that need configuration. -
Check services are running
Check docker dashboard to make sure all services are running as expected when jobs are set, and make sure if you have any jobs in environment variables, that you enable the variables to make sure the job gets scheduled.
-
Grafana, locally on localhost:4000, under RQ Dashboard, can help with logging queues and statistics that get run or scheduled by RQ. Queues that get run will either pass or fail. NOTE: time to live expires data on redis so sometimes the statistics seem off over longer periods but fails persist for a long duration.
-
RQ Monitoring is a dashboard, locally on http://localhost:9181/, helps with seeing the same details on a more granual level, including the functionality to purge, depending on the queue. Navigate to jobs for clearing and visualising the data that is transmitted. Navigate to workers to see worker information. Note: schedule information is not available and cron sheduler data is not visible either, can only be seen in redis.
-
Dev tools like checking queues and rate limiting, can be done through a command tool, see help for more details.
python manage.py test_rq --generated_jobs 100000
Sometimes this is necessary when developing jobs, when you realise that something is wrong or not intentional, and you have a job generating many jobs that will always fail. Above is the RQ monitoring tool for doing it locally. For production, there is a python django command tool for purging queues, see purge_queue help for more details.
python manage.py purge_queue long-running --queue_state queued
Elk is the esasiest way to trace and monitor problems, by correlation ids or job ids, so be sure to add logging for traceability. NOTE: when logging a large stream of data, sometimes data gets lost based on the stream restriction, so don't be alarmed if your data is missing, just look at the result of what you were processing based on a summary of expectations logged.
Sometimes you need to calculate intervals for setting specific values at irregular times. There are several functional ways of doing this, can configure the job_scheduler with retry_backoff=True or 60. There is an exponential_backoff function to help do that by function and finally there is a website, exponentialbackoffcalculator.com that can help configure this by different exponentail values, and then you can simply configure the retry_intervals on job_scheduler.
The cron expression is made of five fields. Each field can have the following values. crontab.cronhub.io is a great way to visualise the settings created.
If the production, staging or UAT queue has thousands of messages and the workers are struggling to process them, you can reboot the Redis Cache on the AWS console. To restart it, refer to the documentation on Confluence.
⚠️ Rebooting the Redis Cache will delete the whole queue, so anything queued such as emails and OpenSearch changes will not be run.
⚠️ Services (workers, queues, etc) must be restarted after rebooting the Redis Cache for the application and environment you rebooted. If these are not restarted then scheduled tasks will not be picked up by the scheduler.