No description
  • Python 99.6%
  • Makefile 0.2%
  • Dockerfile 0.2%
Find a file
Chris Meyers 73f7bf2aae
Fix apply_async to fall back to task-configured queue when none specified (#221)
* Fix delay() not forwarding self.queue to apply_async

Tasks decorated with queue= had their queue ignored when dispatched via
.delay(), causing them to publish to the broker's default channel instead
of the intended queue. This broke cross-pod task dispatch (e.g., web pod
dispatching to task pod) when the queues differed.

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>

* Update test assertions to expect queue=None from delay()

Since delay() now forwards self.queue to apply_async(), tests that
assert apply_async is called without a queue argument must now include
queue=None in their expected call signatures.

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>

* Fix apply_async to fall back to task-level queue when none specified

When no queue is passed to apply_async(), fall back to self.queue
(the queue configured on the task decorator) before falling through to
the broker's default channel. This ensures tasks decorated with queue=
are dispatched to the correct channel regardless of whether they are
called via .delay() or .apply_async() without an explicit queue.

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>

* Add regression tests for task-level queue routing via delay() and apply_async()

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>

---------

Co-authored-by: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
2026-03-25 09:01:32 -04:00
.github Collect coverage for what happens in service subprocess (#216) 2026-01-27 11:28:09 -05:00
asyncio_tests AAP-66379 Fix failure to scale-down bug and increase scale-down speed (#219) 2026-02-26 14:24:51 -05:00
dispatcherd Fix apply_async to fall back to task-configured queue when none specified (#221) 2026-03-25 09:01:32 -04:00
docs AAP-59125 Graceful process group SIGTERM handling, and refinement of shutdown practices (#177) 2026-01-14 09:48:25 -05:00
logs Worker callbacks (#159) 2025-04-22 09:36:12 -04:00
tests Fix apply_async to fall back to task-configured queue when none specified (#221) 2026-03-25 09:01:32 -04:00
tools Fix demo build by installing git (#147) 2025-04-07 11:46:18 -04:00
.flake8 Fix bug where checks did not lint same files as Makefile command (#205) 2026-01-07 14:38:58 -05:00
.gitignore Initial commit, demo functionality 2024-10-20 22:13:25 -04:00
conftest.py Collect coverage for what happens in service subprocess (#216) 2026-01-27 11:28:09 -05:00
dispatcher.yml Add test and demo for task that breaks connection (#155) 2026-01-14 11:09:15 -05:00
docker-compose.yml refactor: rename dispatcher to dispatcherd 2025-03-21 15:51:12 -04:00
LICENSE chore: add apache license 2025-02-10 20:59:27 +01:00
Makefile Fix bug where checks did not lint same files as Makefile command (#205) 2026-01-07 14:38:58 -05:00
pyproject.toml Support py 3.11 temporarily again (#217) 2026-01-27 10:34:42 -05:00
README.md Split tests into normal and asyncio folders 2025-12-17 11:40:01 -05:00
requirements_dev.txt Add pytest-cov to dev requirements for coverage reporting (#189) 2025-12-08 14:06:18 -05:00
run_demo.py Add test and demo for task that breaks connection (#155) 2026-01-14 11:09:15 -05:00
schema.json AAP-66379 Fix failure to scale-down bug and increase scale-down speed (#219) 2026-02-26 14:24:51 -05:00
sonar-project.properties Collect coverage for what happens in service subprocess (#216) 2026-01-27 11:28:09 -05:00

License

Dispatcherd

The dispatcherd is a service to run python tasks in subprocesses, designed specifically to work well with pg_notify, but intended to be extensible to other message delivery means. Its philosophy is to have a limited scope as a "local" runner of background tasks, but to be composable so that it can be "wrapped" easily to enable clustering and distributed task management by apps using it.

Warning

This project is in initial development. Expect many changes, including name, paths, and CLIs.

Licensed under Apache Software License 2.0

Usage

You have a postgres server configured and a python project. You will use dispatcherd to trigger a background task over pg_notify. Both your background dispatcherd service and your task publisher process must have python configured so that your task is importable. Instructions are broken into 3 steps:

  1. Library - Configure dispatcherd, mark the python methods you will run with it
  2. dispatcherd service - Start your background task service, it will start listening
  3. Publisher - From some other script, submit tasks to be ran

In the "Manual Demo" section, an runnable example of this is given.

Library

The dispatcherd @task() decorator is used to register tasks. The tests/data/methods.py module defines some dispatcherd tasks.

The decorator accepts some kwargs (like queue below) that will affect task behavior, see docs/task_options.md. Using decorate=False tells it to not attach the deprecated Celery-like methods.

from dispatcherd.publish import task

@task(queue='test_channel', decorate=False)
def print_hello():
    print('hello world!!')

Configure dispatcherd somewhere in your import path or before running the service. This tells dispatcherd how to submit tasks to be ran.

from dispatcherd.config import setup

config = {
    "producers": {
        "brokers": {
            "pg_notify": {
                "conninfo": "dbname=postgres user=postgres"
                "channels": [
                    "test_channel",
                ],
            },
        },
    },
    "pool": {"max_workers": 4},
}
setup(config)

For more on how to set up and the allowed options in the config, see the section config docs. The queue passed to @task needs to match a pg_notify channel in the config. It is often useful to have different workers listen to different sets of channels.

dispatcherd service

The dispatcherd service needs to be running before you submit tasks. This does not make any attempts at message durability or confirmation. If you submit a task in an outage of the service, it will be dropped.

There are 2 ways to run the dispatcherd service:

  • Importing and running (code snippet below)
  • A CLI entrypoint dispatcherd for demo purposes
from dispatcherd import run_service

# After the setup() method has been called

run_service()

Configuration tells how to connect to postgres, and what channel(s) to listen to.

Publisher

This assumes you configured python so that print_hello is importable from the test_methods python module. The following code will submit print_hello to run in the background dispatcherd service.

from test_methods import print_hello

from dispatcherd.publish import submit_task

# After the setup() method has been called

submit_task(
    test_methods.print_hello
)

For more options related to publishing tasks see docs/submit_task.md.

Manual Demos

Service in foreground

Initial setup:

pip install -e .[pg_notify]

To experience running a dispatcherd service, you can try this:

make postgres
dispatcherd

The dispatcherd entrypoint will look for a config file in the current directory if not otherwise specified, which is dispatcher.yml in this case. You can see it running some schedules and listening.

Ctl+c to stop that server.

Two nodes in background

The following will start up postgres, then start up 2 dispatcherd services. It should take a few seconds, mainly waiting for postgres.

make demo

After it completes docker ps -a should show dispatcherd1 and dispatcherd2 containers as well as postgres. You can see logs via docker logs dispatcherd1. These will accept task submissions. Submit a lot of tasks as a python task publisher with the run_demo.py script. To get accurate replies, we need to specify that 2 replies are expected because we are communicating with 2 background task services.

./run_demo.py 2

You can talk to these services over postgres with dispatcherctl, using the same local dispatcher.yml config.

dispatcherctl running
dispatcherctl workers

The "running" command will likely show scheduled tasks and leftover tasks from the demo. For demo, the uuid and task options allow doing filtering.

dispatcherctl running --task=tests.data.methods.sleep_function

This would show any specific instance of tests.data.methods.sleep_function currently running.

Running Tests

Most tests (except for tests/unit/) require postgres to be running.

pip install -r requirements_dev.txt
make postgres
pytest tests/
pytest asyncio_tests/

Background

This is intended to be a working space for prototyping a code split of:

https://github.com/ansible/awx/tree/devel/awx/main/dispatch

As a part of doing the split, we also want to resolve a number of long-standing design and sustainability issues, thus, asyncio. For a little more background see docs/design_notes.md.

There is documentation of the message formats used by the dispatcherd in docs/message_formats.md. Some of these are internal, but some messages are what goes over the user-defined brokers (pg_notify). You can trigger tasks using your own "publisher" code as an alternative to attached methods like .apply_async. Doing this requires connecting to postges and submitting a pg_notify message with JSON data that conforms to the expected format. The ./run_demo.py script shows examples of this, but borrows some connection and config utilities to help.

Contributing

We ask all of our community members and contributors to adhere to the Ansible code of conduct. If you have questions or need assistance, please reach out to our community team at codeofconduct@ansible.com

Refer to the Contributing guide for further information.

Communication

See the Communication section of the Contributing guide to find out how to get help and contact us.

For more information about getting in touch, see the Ansible communication guide.

Credits

Dispatcherd is sponsored by Red Hat, Inc.

Metrics

You can run a demo of the metrics server. In your first terminal tab, run:

pip install .[pg_notify,metrics]
dispatcherd

In another tab run:

curl http://localhost:8070/metrics

This should report metrics in the following general format:

$ curl http://localhost:8070/metrics
# HELP dispatcher_messages_received_total Number of messages received by dispatchermain
# TYPE dispatcher_messages_received_total counter
dispatcher_messages_received_total 12.0
# HELP dispatcher_control_messages_count_total Number of control messages received.
# TYPE dispatcher_control_messages_count_total counter
dispatcher_control_messages_count_total 1.0
# HELP dispatcher_worker_count_total Number of workers running.
# TYPE dispatcher_worker_count_total counter
dispatcher_worker_count_total 2.0

We expect to add more metrics in the future.