celery 任务处理

python 分布式celry

Posted by samll_ant on April 7, 2021

celery 如何分布式运行

前言

关于分布式的内容

环境配置

  1. python3.9
  2. celery:4.4.7 redis:3.5.3 msgpack:1.0.2
  3. redis:6.2.1

代码

  • server.py
from celery.schedules import timedelta
from celery import Celery


class Config:
    BROKER_URL = 'redis://192.168.0.15:6379/0' #需要使用任务管理的redis数据库地址
    
    # CELERY_RESULT_BACKEND = 'redis://192.168.0.15:6739/6'

    CELERY_TIMEZONE = 'Asia/Shanghai'


app = Celery('celery-tasks')
app.config_from_object(Config)


@app.task(queue='for_task_collect')  # 第一个队列
def println(name):
    print("thanks",name)
    return "success"


@app.task(queue='for_task_add')  # 第二个队列
def add(x, y):
    print(x + y, "result")
    return x + y

此文件可以理解为celery 启动监听文件,通过以下命令运行

celery -A 文件名 worker -l info 
  • add_task.py
from server import add, println  # 导入我们的任务函数add
for i in range(1, 100):
    println.delay(i)
    add.delay(i, 10)

此文件添加任务,通过以下命令运行

python3 add_task.py

分布式运行通过复制server.py 到另一台电脑运行,两台电脑启动celery服务之后,一台主机电脑运行add_task 即可两台电脑同时运行服务

  • 运行输出

    ————– celery@AntdeMacBook-Air.local v4.4.7 (cliffs) — *** —– – *** —- macOS-10.14.6-x86_64-i386-64bit 2021-04-07 18:03:04
    - *** — * —
    - ** ———- [config]
    - ** ———- .> app:
    celery-tasks:0x108ad0b50
    - ** ———- .> transport: redis://192.168.0.15:6379/0
    - ** ———- .> results: disabled://
    - *** — * — .> concurrency: 6 (prefork) – **
    * —- .> task events: OFF (enable -E to monitor tasks in this worker) — *** —– ————– [queues] .> celery exchange=celery(direct) key=celery [tasks] . celery1.add . celery1.println [2021-04-07 18:03:04,246: INFO/MainProcess] Connected to redis://192.168.0.15:6379/0 [2021-04-07 18:03:04,253: INFO/MainProcess] mingle: searching for neighbors [2021-04-07 18:03:05,269: INFO/MainProcess] mingle: all alone [2021-04-07 18:03:05,277: INFO/MainProcess] celery@AntdeMacBook-Air.local ready.