celery 如何分布式运行
前言
关于分布式的内容
环境配置
- python3.9
- celery:4.4.7 redis:3.5.3 msgpack:1.0.2
- redis:6.2.1
代码
- server.py
from celery.schedules import timedelta
from celery import Celery
class Config:
BROKER_URL = 'redis://192.168.0.15:6379/0' #需要使用任务管理的redis数据库地址
# CELERY_RESULT_BACKEND = 'redis://192.168.0.15:6739/6'
CELERY_TIMEZONE = 'Asia/Shanghai'
app = Celery('celery-tasks')
app.config_from_object(Config)
@app.task(queue='for_task_collect') # 第一个队列
def println(name):
print("thanks",name)
return "success"
@app.task(queue='for_task_add') # 第二个队列
def add(x, y):
print(x + y, "result")
return x + y
此文件可以理解为celery 启动监听文件,通过以下命令运行
celery -A 文件名 worker -l info
- add_task.py
from server import add, println # 导入我们的任务函数add
for i in range(1, 100):
println.delay(i)
add.delay(i, 10)
此文件添加任务,通过以下命令运行
python3 add_task.py
分布式运行通过复制server.py 到另一台电脑运行,两台电脑启动celery服务之后,一台主机电脑运行add_task 即可两台电脑同时运行服务
- 运行输出
————– celery@AntdeMacBook-Air.local v4.4.7 (cliffs) — *** —– – *** —- macOS-10.14.6-x86_64-i386-64bit 2021-04-07 18:03:04
- *** — * —
- ** ———- [config]
- ** ———- .> app:
celery-tasks:0x108ad0b50
- ** ———- .> transport: redis://192.168.0.15:6379/0
- ** ———- .> results: disabled://
- *** — * — .> concurrency: 6 (prefork) – *** —- .> task events: OFF (enable -E to monitor tasks in this worker) — *** —– ————– [queues] .> celery exchange=celery(direct) key=celery [tasks] . celery1.add . celery1.println [2021-04-07 18:03:04,246: INFO/MainProcess] Connected to redis://192.168.0.15:6379/0 [2021-04-07 18:03:04,253: INFO/MainProcess] mingle: searching for neighbors [2021-04-07 18:03:05,269: INFO/MainProcess] mingle: all alone [2021-04-07 18:03:05,277: INFO/MainProcess] celery@AntdeMacBook-Air.local ready.