集成测试多个Celery Workers和一个DB支持的Django API
我正在使用一个有多个芹菜工作者的软件定向架构(我们称之为worker1
, worker2
和worker3
)。 所有这三个工作人员都是独立的实体(即独立的代码库,独立的仓库,独立的芹菜实例,独立的机器),并且它们都不连接到Django应用程序。
与这三个工作人员进行通信的是基于Django的MySQL支持的RESTful API。
在开发过程中,这些服务都是在一个无用的盒子上,每个服务器都作为一个单独的机器运行,从一个单独的端口运行。 我们有一个用于所有Celery任务的RabbitMQ代理。
通过这些服务的典型路径可能如下所示: worker1
从设备获取消息,执行一些处理,在worker2
上排队执行任务,该任务执行进一步处理,并向API
写入POST,然后写入MySQL DB并在worker3
上触发一个任务,该任务执行一些其他处理并对该API
进行另一个POST,从而导致MySQL写入。
这些服务通信很好,但每次我们对任何服务进行更改时都会测试此流程,这非常烦人。 我真的想要得到一些完整的集成测试(即从发送给worker1
的消息开始并贯穿整个链),但我不确定从哪里开始。 我面临的主要问题是:
如果我在worker1
上worker1
,我怎么可能知道整个流程何时结束? 如果我不知道结果是否已经达到,我怎样才能对结果做出合理的断言?
我如何处理数据库设置/拆卸? 我想在每次测试结束时删除在测试期间所做的所有条目,但是如果我从Django应用以外的地方开始测试,我不确定如何有效地清除它。 手动删除它并在每次测试后重新创建它似乎可能会造成太大的开销。
Celery允许同步运行任务,因此第一步是:将整个流程划分为单独的任务,假请求和断言结果:
原始流程:
device --- worker1 --- worker2 --- django --- worker3 --- django
一级集成测试:
1. |- worker1 -|
2. |- worker2 -|
3. |- django -|
4. |- worker3 -|
5. |- django -|
对于每个测试,创建假请求或同步调用并声明结果。 将这些测试放入相应的存储库。 例如,在测试worker1时,可以模拟worker2并测试它是否已经用适当的参数调用。 然后,在另一个测试中,您将调用worker2和模拟请求来检查它是否调用了正确的API。 等等。
测试整个流程将很困难,因为所有任务都是独立的实体。 我现在想出的唯一方法是对worker1进行一次虚假的调用,设置合理的超时时间并等待数据库中的最终结果。 这种测试只会告诉你它是否有效。 它不会告诉你,问题在哪里。
要使用完整的设置,您可以设置一个Celery结果后端。 请参阅Celery的下一步文档以了解基础知识。
然后worker1
可以报告它传递给worker2
的任务句柄。 worker2
返回的结果将是它传递给worker3
的任务ID。 而worker3
返回的结果意味着整个序列已经完成,您可以检查结果。 结果也可能已经立即报告了这些结果的有趣部分,使检查更容易。
这在Celery中可能看起来像这样:
worker1_result = mytask.delay(someargs) # executed by worker1
worker2_result = worker1_result.get() # waits for worker1 to finish
worker3_result = worker2_result.get() # waits for worker2 to finish
outcome = worker3_result.get() # waits for worker3 to finish
(细节可能需要不同;我还没有使用过,我不确定任务结果是否可序列化,因此它们本身适合作为任务函数返回值。)
链接地址: http://www.djcxy.com/p/20201.html上一篇: Integration Testing Multiple Celery Workers and a DB Backed Django API