Integration Testing Multiple Celery Workers and a DB Backed Django API
I'm working with a Software Oriented Architecture that has multiple celery workers (let's call them worker1
, worker2
, and worker3
). All three workers are separate entities (ie, separate code bases, separate repos, separate celery instances, separate machines) and none of them are connected to a Django app.
Communicating with each of these three workers is a Django based, MySQL backed RESTful API.
In development, these services are all on a vagrant box, each acting as a separate machine running off of a separate port. We have a single RabbitMQ broker for all of the Celery tasks.
A typical path through these services might look something like this: worker1
gets a message from a device, does some processing, queues up a task on worker2
, which does further processing and makes a POST to the API
, which writes to the MySQL DB and triggers a task on worker3
, which does some other processing and makes another POST to the API
which results in a MySQL write.
The services are communicating nicely, but it's very annoying to test this flow every time we make a change to any service. I really want to get some full integration tests (ie, starting at a message sent to worker1
and going through the entire chain) in place but I'm not sure where to start. The main problems I'm facing are these:
If I queue up something on worker1
, how can I possibly tell when the whole flow is over? How can I make reasonable assertions about results when I don't know if the results have even arrived?
How do I deal with DB set up/tear down? I want to delete all of the entries made during a test at the end of each test, but if I'm starting the test from outside of the Django app, I'm not sure how to efficiently clear it out. Manually deleting it and recreating it after every test seems like it might be too much overhead.
Celery allows to run task synchronously, so the first step is: Divide whole flow into separate tasks, fake requests and assert results:
Original flow:
device --- worker1 --- worker2 --- django --- worker3 --- django
First-level integration tests:
1. |- worker1 -|
2. |- worker2 -|
3. |- django -|
4. |- worker3 -|
5. |- django -|
For each test create fake request or synchronous call and assert results. Place these tests in corresponding repository. For example in test for worker1, you can mock worker2 and test that it has been called with proper arguments. Then, in another test, you will call worker2 and mock request to check, that it calls right API. And so on.
Testing whole flow, will be difficult, since all tasks are separate entities. The only way I've came up with now is to make one fake call to worker1, set reasonable timeout and wait for final result in database. This kind test only tells you if it works or not. It won't show you, where is the problem.
To work with the full setup, you can set up a Celery results backend. See the Celery 'next steps' documentation for the basics.
worker1
could then report the task handle of what it has passed on to worker2
. The result returned by worker2
would be the task id of what it has passed on to worker3
. And the result returned by worker3
would mean the whole sequence is finished and you can check the outcomes. The results could also already report interesting bits of those outcomes right away to make the checking easier.
This might look somewhat like this in Celery:
worker1_result = mytask.delay(someargs) # executed by worker1
worker2_result = worker1_result.get() # waits for worker1 to finish
worker3_result = worker2_result.get() # waits for worker2 to finish
outcome = worker3_result.get() # waits for worker3 to finish
(The details probably need to be different; I have not used this myself yet. I am not sure whether task results are serializable and hence themselves suitable as task function return values.)
链接地址: http://www.djcxy.com/p/20202.html