原创

Django异步任务celery

一、介绍

Celery是一个功能完备即插即用的任务队列。它使得我们不需要考虑复杂的问题,使用非常简单。celery看起来似乎很庞大,本章节我们先对其进行简单的了解,然后再去学习其他一些高级特性。 celery适用异步处理问题,当发送邮件、或者文件上传, 图像处理等等一些比较耗时的操作,我们可将其异步执行,这样用户不需要等待很久,提高用户体验。

特点:

简单,易于使用和维护,有丰富的文档。
高效,单个celery进程每分钟可以处理数百万个任务。
灵活,celery中几乎每个部分都可以自定义扩展。

二、配置Celery

Celery需要使用redis、rabbitmq,本文档使用的是redis,redis安装文档:redis安装

1、安装Celery和Redis模块

通过Python的PIP安装:
pip install celery
pip install redis

2、配置Django的配置文件

(项目路径下的settings.py文件)

# 连接redis
CELERY_BROKER_URL = "redis://192.168.30.131:6379/0"
# 时区
CELERY_TIMEZONE = 'Asia/Shanghai'
# celery内容等消息的格式设置,默认json
CELERY_ACCEPT_CONTENT = ['application/json', ]
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'

# 任务限流
CELERY_TASK_ANNOTATIONS = {'tasks.add': {'rate_limit': '10/s'}}

# Worker并发数量,一般默认CPU核数,可以不设置
CELERY_WORKER_CONCURRENCY = 4

# 每个worker执行了多少任务就会死掉,默认是无限的
CELERY_WORKER_MAX_TASKS_PER_CHILD = 20

3、在项目目录下创建celery.py文件

import os
from celery import Celery

# 设置环境变量
os.environ.setdefault("DJANGO_SETTINGS_MODULE","Data_query.settings")

# 实例化
app = Celery("Data_query")

app.config_from_object("django.conf:settings",namespace="CELERY")

app.autodiscover_tasks()


4、Celery启动测试

Windows启动celery服务需要安装gevent

pip install gevent

Windows启动命令

 Celery -A 项目名称 worker -l info -P eventlet

启动后如下:(说明redis已经连接成功了)

(venv) D:\pythonProject\Data_query> Celery -A Data_query worker -l info -P eventlet

 -------------- celery@WIN-7H1KVV73I31 v5.2.7 (dawn-chorus)
--- ***** -----
-- ******* ---- Windows-10-10.0.19044-SP0 2022-08-10 11:10:10
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app:         Data_query:0x1ee2fbedfd0
- ** ---------- .> transport:   redis://192.168.30.131:6379/0
- ** ---------- .> results:     disabled://
- *** --- * --- .> concurrency: 4 (eventlet)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery


[tasks]
  . Data_query.celery.debug_task
  . app.kph.tasks.add_Dqxxyhqjyz
  . app.kph.tasks.add_sfzcl_task
  . app.region_data_query.tasks.export_csv
  . app.region_data_query.tasks.region_kpy_data
  . app.region_data_query.tasks.ssq_data_query
  . app.region_data_query.tasks.ssq_export_csv

5、创建异步脚本

在django的app下创建一个tasks.py

tasks.py

# @File    :tasks.py

from celery import shared_task
import time


@shared_task
def test():
    for i in range(0,100)
        time.sleep(1)
        print(i)

views.py

# @File :views.py

from django.views.generic import View
from .tasks import *

class Test(View):
    def get(self):
        # 执行异步任务
        test.delay()

urls.py

urlpatterns = [
    path("test",Test.as_view()),
]

一个简单的异步任务就完成了,get请求test的url时,执行views的Test的get方法,get方法执行异步脚本里的test函数,异步脚本执行时不影响Django的响应。


正文到此结束
评论

登录后才能发表评论 登录/注册

0评论
  • 还没有评论,快来抢沙发吧!