1. 程式人生 > >python celery任務分發

python celery任務分發

        <div id="cnblogs_post_body" class="blogpost-body"><p>Celery是由Python開發的一個簡單、靈活、可靠的處理大量任務的分發系統,它不僅支援實時處理也支援任務排程。</p>

 

  • user:使用者程式,用於告知celery去執行一個任務。
  • broker: 存放任務(依賴RabbitMQ或Redis,進行儲存)
  • worker:執行任務

celery需要rabbitMQ、Redis、Amazon SQS、Zookeeper(測試中) 充當broker來進行訊息的接收,並且也支援多個broker和worker來實現高可用和分散式。http://docs.celeryproject.org/en/latest/getting-started/brokers/index.html

    Celery version 4.0 runs on
        Python ❨2.7, 3.4, 3.5❩
        PyPy ❨5.4, 5.5❩
    This is the last version to support Python 2.7, and from the next version (Celery 5.x) Python 3.5 or newer is required.

    If you’re running an older version of Python, you need to be running an older version of Celery:

        Python 
2.6: Celery series 3.1 or earlier. Python 2.5: Celery series 3.0 or earlier. Python 2.4 was Celery series 2.2 or earlier. Celery is a project with minimal funding, so we don’t support Microsoft Windows. Please don’t open any issues related to that platform.
版本和要求

環境準備:

  • 安裝rabbitMQ或Redis
        見:http://www.cnblogs.com/wupeiqi/articles/5132791.html
  • 安裝celery
         pip3 install celery

快速上手

import time
from celery import Celery

app = Celery('tasks', broker='redis://192.168.10.48:6379', backend='redis://192.168.10.48:6379')


@app.task
def xxxxxx(x, y):
    time.sleep(10)
    return x + y
s1.py
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from s1 import xxxxxx

# 立即告知celery去執行xxxxxx任務,並傳入兩個引數
result = xxxxxx.delay(4, 4)
print(result.id)
s2.py
from celery.result import AsyncResult
from s1 import app

async = AsyncResult(id="f0b41e83-99cf-469f-9eff-74c8dd600002", app=app)

if async.successful():
    result = async.get()
    print(result)
    # result.forget() # 將結果刪除
elif async.failed():
    print('執行失敗')
elif async.status == 'PENDING':
    print('任務等待中被執行')
elif async.status == 'RETRY':
    print('任務異常後正在重試')
elif async.status == 'STARTED':
    print('任務已經開始被執行')
s3.py

執行 s1.py 建立worker(終端執行命令):

celery worker -A s1 -l info

執行 s2.py ,建立一個任務並獲取任務ID:

python3 s2.py 

執行 s3.py ,檢查任務狀態並獲取結果:

python3 s3.py

多工結構

pro_cel
    ├── celery_tasks# celery相關資料夾
    │   ├── celery.py   # celery連線和配置相關檔案
    │   └── tasks.py    #  所有任務函式
    ├── check_result.py # 檢查結果
    └── send_task.py    # 觸發任務
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from celery import Celery

celery = Celery('xxxxxx',
                broker='redis://192.168.0.111:6379',
                backend='redis://192.168.0.111:6379',
                include=['celery_tasks.tasks'])

# 時區
celery.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
celery.conf.enable_utc = False
pro_cel/celery_tasks/celery
#!/usr/bin/env python
# -*- coding:utf-8 -*-

import time
from .celery import celery


@celery.task
def xxxxx(*args, **kwargs):
    time.sleep(5)
    return "任務結果"


@celery.task
def hhhhhh(*args, **kwargs):
    time.sleep(5)
    return "任務結果"
pro_cel/celery_tasks/tasks.py
#!/usr/bin/env python
# -*- coding:utf-8 -*-

from celery.result import AsyncResult
from celery_tasks.celery import celery

async = AsyncResult(id="ed88fa52-11ea-4873-b883-b6e0f00f3ef3", app=celery)

if async.successful():
    result = async.get()
    print(result)
    # result.forget() # 將結果刪除
elif async.failed():
    print('執行失敗')
elif async.status == 'PENDING':
    print('任務等待中被執行')
elif async.status == 'RETRY':
    print('任務異常後正在重試')
elif async.status == 'STARTED':
    print('任務已經開始被執行')
pro_cel/check_result.py
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import celery_tasks.tasks

# 立即告知celery去執行xxxxxx任務,並傳入兩個引數
result = celery_tasks.tasks.xxxxx.delay(4, 4)

print(result.id)
pro_cel/send_task.py

更多配置:http://docs.celeryproject.org/en/latest/userguide/configuration.html

定時任務

1. 設定時間讓celery執行一個任務

import datetime
from celery_tasks.tasks import xxxxx
"""
from datetime import datetime

v1 = datetime(2017, 4, 11, 3, 0, 0)
print(v1)

v2 = datetime.utcfromtimestamp(v1.timestamp())
print(v2)

"""
ctime = datetime.datetime.now()
utc_ctime = datetime.datetime.utcfromtimestamp(ctime.timestamp())

s10 = datetime.timedelta(seconds=10)
ctime_x = utc_ctime + s10

# 使用apply_async並設定時間
result = xxxxx.apply_async(args=[1, 3], eta=ctime_x)
print(result.id)

2. 類似於contab的定時任務

"""
celery beat -A proj
celery worker -A proj -l info

"""
from celery import Celery
from celery.schedules import crontab

app = Celery('tasks', broker='amqp://47.98.134.86:5672', backend='amqp://47.98.134.86:5672', include=['proj.s1', ])
app.conf.timezone = 'Asia/Shanghai'
app.conf.enable_utc = False

app.conf.beat_schedule = {
    # 'add-every-10-seconds': {
    #     'task': 'proj.s1.add1',
    #     'schedule': 10.0,
    #     'args': (16, 16)
    # },
    'add-every-12-seconds': {
        'task': 'proj.s1.add1',
        'schedule': crontab(minute=42, hour=8, day_of_month=11, month_of_year=4),
        'args': (16, 16)
    },
}

注:如果想要定時執行類似於crontab的任務,需要定製 Scheduler來完成。

Flask中應用Celery

pro_flask_celery/
├── app.py
├── celery_tasks
    ├── celery.py
    └── tasks.py
#!/usr/bin/env python
# -*- coding:utf-8 -*-

from flask import Flask
from celery.result import AsyncResult

from celery_tasks import tasks
from celery_tasks.celery import celery

app = Flask(__name__)

TASK_ID = None


@app.route('/')
def index():
    global TASK_ID
    result = tasks.xxxxx.delay()
    # result = tasks.task.apply_async(args=[1, 3], eta=datetime(2018, 5, 19, 1, 24, 0))
    TASK_ID = result.id

    return "任務已經提交"


@app.route('/result')
def result():
    global TASK_ID
    result = AsyncResult(id=TASK_ID, app=celery)
    if result.ready():
        return result.get()
    return "xxxx"


if __name__ == '__main__':
    app.run()
app.py
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from celery import Celery
from celery.schedules import crontab

celery = Celery('xxxxxx',
                broker='redis://192.168.10.48:6379',
                backend='redis://192.168.10.48:6379',
                include=['celery_tasks.tasks'])

# 時區
celery.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
celery.conf.enable_utc = False
celery_tasks/celery.py
#!/usr/bin/env python
# -*- coding:utf-8 -*-

import time
from .celery import celery


@celery.task
def hello(*args, **kwargs):
    print('執行hello')
    return "hello"


@celery.task
def xxxxx(*args, **kwargs):
    print('執行xxxxx')
    return "xxxxx"


@celery.task
def hhhhhh(*args, **kwargs):
    time.sleep(5)
    return "任務結果"
celery_task/tasks.py

Django中應用Celery

一、基本使用

django_celery_demo
├── app01
│   ├── __init__.py
│   ├── admin.py
│   ├── apps.py
│   ├── migrations
│   ├── models.py
│   ├── tasks.py
│   ├── tests.py
│   └── views.py
├── db.sqlite3
├── django_celery_demo
│   ├── __init__.py
│   ├── celery.py
│   ├── settings.py
│   ├── urls.py
│   └── wsgi.py
├── manage.py
├── red.py
└── templates
#!/usr/bin/env python
# -*- coding:utf-8 -*-

import os
from celery import Celery

# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'django_celery_demo.settings')

app = Celery('django_celery_demo')

# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
#   should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')

# Load task modules from all registered Django app configs.
app.autodiscover_tasks()
django_celery_demo/celery.py
from .celery import app as celery_app

__all__ = ('celery_app',)
django_celery_demo/__init__.py
from celery import shared_task


@shared_task
def add(x, y):
    return x + y


@shared_task
def mul(x, y):
    return x * y


@shared_task
def xsum(numbers):
    return sum(numbers)
app01/tasks.py
...
....
.....
# ######################## Celery配置 ########################
CELERY_BROKER_URL = 'redis://10.211.55.20:6379'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_RESULT_BACKEND = 'redis://10.211.55.20:6379'
CELERY_TASK_SERIALIZER = 'json'
django_celery_demo/settings.py
from django.shortcuts import render, HttpResponse
from app01 import tasks
from django_celery_demo import celery_app
from celery.result import AsyncResult


def index(request):
    result = tasks.add.delay(1, 8)
    print(result)
    return HttpResponse('...')


def check(request):
    task_id = request.GET.get('task')
    async = AsyncResult(id=task_id, app=celery_app)
    if async.successful():
        data = async.get()
        print('成功', data)
    else:
        print('任務等待中被執行')

    return HttpResponse('...')
app01/views.py
"""django_celery_demo URL Configuration

The `urlpatterns` list routes URLs to views. For more information please see:
    https://docs.djangoproject.com/en/1.11/topics/http/urls/
Examples:
Function views
    1. Add an import:  from my_app import views
    2. Add a URL to urlpatterns:  url(r'^$', views.home, name='home')
Class-based views
    1. Add an import:  from other_app.views import Home
    2. Add a URL to urlpatterns:  url(r'^$', Home.as_view(), name='home')
Including another URLconf
    1. Import the include() function: from django.conf.urls import url, include
    2. Add a URL to urlpatterns:  url(r'^blog/', include('blog.urls'))
"""
from django.conf.urls import url
from django.contrib import admin
from app01 import views

urlpatterns = [
    url(r'^admin/', admin.site.urls),
    url(r'^index/', views.index),
    url(r'^check/', views.check),
]
django_celery_demo/urls.py

二、定時任務

1. 安裝

install django-celery-beat

2. 註冊app

INSTALLED_APPS = (
    ...,
    'django_celery_beat',
)

3. 資料庫去遷移生成定時任務相關表

python manage.py migrate

4. 設定定時任務

  • 方式一:程式碼中配置
    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    
    import os
    from celery import Celery
    
    # set the default Django settings module for the 'celery' program.
    os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'django_celery_demo.settings')
    
    app = Celery('django_celery_demo')
    
    # Using a string here means the worker doesn't have to serialize
    # the configuration object to child processes.
    # - namespace='CELERY' means all celery-related configuration keys
    #   should have a `CELERY_` prefix.
    app.config_from_object('django.conf:settings', namespace='CELERY')
    
    
    app.conf.beat_schedule = {
        'add-every-5-seconds': {
            'task': 'app01.tasks.add',
            'schedule': 5.0,
            'args': (16, 16)
        },
    }
    
    
    # Load task modules from all registered Django app configs.
    app.autodiscover_tasks()
    django_celery_demo/celery.py
  • 方式二:資料表錄入

5. 後臺程序建立任務

celery -A django_celery_demo beat -l info --scheduler django_celery_beat.schedulers:DatabaseScheduler

6. 啟動worker執行任務

celery -A django_celery_demo worker -l INFO  

官方參考:http://docs.celeryproject.org/en/latest/django/first-steps-with-django.html#using-celery-with-django