【Python】Celery实现分布式定时任务并开启监控(Celery-Beat、Celery-Once、flower)

Celery实现分布式定时任务并开启监控(Celery-Beat、Celery-Once、flower)

SyntaxError发布于 24 分钟前

Celery实现分布式定时任务并开启监控(Celery-Beat、Celery-Once、flower)

 

1.环境准备

pip install celery

pip install flower

pip install celery_once

 

2.代码结构

代码结构如下:

【Python】Celery实现分布式定时任务并开启监控(Celery-Beat、Celery-Once、flower)

任务函数sendDingTest.py(钉钉机器人):

from dingtalkchatbot.chatbot import DingtalkChatbot

from start import celery_app

from datetime import datetime

import socket

from celery_once import QueueOnce

WEB_HOOK_SPIDER = '在钉钉机器人页面获取web_hook'

# 基于任务名及传递的参数值来确认是否是同一个任务

@celery_app.task(base=QueueOnce, once={'graceful': True})

def send_ding_test(arg1, arg2):

dingding = DingtalkChatbot(WEB_HOOK_SPIDER)

arg3 = arg1 + arg2

dingding.send_text(msg="城市数据定时任务测试-{},{} 执行时刻:{}".format(arg3, socket.gethostname(), datetime.now().strftime("%Y-%m-%d %H:%M:%S")))

配置文件config.py:

from celery.schedules import crontab

class celeryConfig(object):

# accept_content = ['json'] # 可以是set,list,tuple,pickle,yaml

# result_accept_content = ['json']

timezone = 'Asia/Shanghai' # 中国只有两个时区,一个上海,一个乌鲁木齐

broker_url = "amqp://user:[email protected]:port/vhost"

backend = ""

include = ['jobs.sendDingTest'] # worker启动时要导入的任务模块,需要在这里添加,以便worker能够找到我们的任务

beat_schedule = {

'add-every-monday-morning':

{

'task': 'jobs.sendDingTest.send_ding_test', # 这里要写全路径,否则worker找不到

'schedule': crontab(minute="*/2"),

'args': (16, 16),

},

}

# celery-once配置

ONCE = {

'backend': 'celery_once.backends.Redis',

'settings': {

'url': 'redis://ip:port/database',

'default_timeout': 60 * 60 # 分布式锁的默认超时时间

}

}

启动函数start.py:

from celery import Celery

import os

# windows平台需要设置,

os.environ.setdefault('FORKED_BY_MULTIPROCESSING', '1')

celery_app = Celery()

celery_app.config_from_object('config.celeryConfig')

 

3.启动定时任务与监控

# 注意:beat,worker,flower都可以不在同一台服务器上,分布开的话,需要在其他服务器上copy一份代码

# 开启beat(start是文件名)

celery -A start.celery_app beat

# 开启worker

celery -A start.celery_app worker -c 1 -l info

# 开启flower(默认地址是localhost:5555)

celery -A start flower

以下是flower页面,可以查看worker数量、消息队列(需要rabbitmq开启rabbitmq_management)和任务执行结果等。

【Python】Celery实现分布式定时任务并开启监控(Celery-Beat、Celery-Once、flower)

4.文档参考

celery-once文档

flower文档

celery官方文档

pythoncelery定时任务

阅读 14发布于 24 分钟前

本作品系原创,采用《署名-非商业性使用-禁止演绎 4.0 国际》许可协议

avatar

SyntaxError

✈✈✈✈✈✈✈✈

190 声望

11 粉丝

0 条评论

得票时间

avatar

SyntaxError

✈✈✈✈✈✈✈✈

190 声望

11 粉丝

宣传栏

Celery实现分布式定时任务并开启监控(Celery-Beat、Celery-Once、flower)

 

1.环境准备

pip install celery

pip install flower

pip install celery_once

 

2.代码结构

代码结构如下:

【Python】Celery实现分布式定时任务并开启监控(Celery-Beat、Celery-Once、flower)

任务函数sendDingTest.py(钉钉机器人):

from dingtalkchatbot.chatbot import DingtalkChatbot

from start import celery_app

from datetime import datetime

import socket

from celery_once import QueueOnce

WEB_HOOK_SPIDER = '在钉钉机器人页面获取web_hook'

# 基于任务名及传递的参数值来确认是否是同一个任务

@celery_app.task(base=QueueOnce, once={'graceful': True})

def send_ding_test(arg1, arg2):

dingding = DingtalkChatbot(WEB_HOOK_SPIDER)

arg3 = arg1 + arg2

dingding.send_text(msg="城市数据定时任务测试-{},{} 执行时刻:{}".format(arg3, socket.gethostname(), datetime.now().strftime("%Y-%m-%d %H:%M:%S")))

配置文件config.py:

from celery.schedules import crontab

class celeryConfig(object):

# accept_content = ['json'] # 可以是set,list,tuple,pickle,yaml

# result_accept_content = ['json']

timezone = 'Asia/Shanghai' # 中国只有两个时区,一个上海,一个乌鲁木齐

broker_url = "amqp://user:[email protected]:port/vhost"

backend = ""

include = ['jobs.sendDingTest'] # worker启动时要导入的任务模块,需要在这里添加,以便worker能够找到我们的任务

beat_schedule = {

'add-every-monday-morning':

{

'task': 'jobs.sendDingTest.send_ding_test', # 这里要写全路径,否则worker找不到

'schedule': crontab(minute="*/2"),

'args': (16, 16),

},

}

# celery-once配置

ONCE = {

'backend': 'celery_once.backends.Redis',

'settings': {

'url': 'redis://ip:port/database',

'default_timeout': 60 * 60 # 分布式锁的默认超时时间

}

}

启动函数start.py:

from celery import Celery

import os

# windows平台需要设置,

os.environ.setdefault('FORKED_BY_MULTIPROCESSING', '1')

celery_app = Celery()

celery_app.config_from_object('config.celeryConfig')

 

3.启动定时任务与监控

# 注意:beat,worker,flower都可以不在同一台服务器上,分布开的话,需要在其他服务器上copy一份代码

# 开启beat(start是文件名)

celery -A start.celery_app beat

# 开启worker

celery -A start.celery_app worker -c 1 -l info

# 开启flower(默认地址是localhost:5555)

celery -A start flower

以下是flower页面,可以查看worker数量、消息队列(需要rabbitmq开启rabbitmq_management)和任务执行结果等。

【Python】Celery实现分布式定时任务并开启监控(Celery-Beat、Celery-Once、flower)

4.文档参考

celery-once文档

flower文档

celery官方文档

以上是 【Python】Celery实现分布式定时任务并开启监控(Celery-Beat、Celery-Once、flower) 的全部内容, 来源链接: utcz.com/a/113358.html

回到顶部