1. 程式人生 > 程式設計 >詳解Python Celery和RabbitMQ實戰教程

詳解Python Celery和RabbitMQ實戰教程

前言

Celery是一個非同步任務佇列。它可以用於需要非同步執行的任何內容。RabbitMQ是Celery廣泛使用的訊息代理。在本這篇文章中,我將使用RabbitMQ來介紹Celery的基本概念,然後為一個小型演示專案設定Celery 。最後,設定一個Celery Web控制檯來監視我的任務

基本概念

  來!看圖說話:

在這裡插入圖片描述

Broker
Broker(RabbitMQ)負責建立任務佇列,根據一些路由規則將任務分派到任務佇列,然後將任務從任務佇列交付給worker

Consumer (Celery Workers)
Consumer是執行任務的一個或多個Celery workers。可以根據用例啟動許多workers

Result Backend
後端用於儲存任務的結果。但是,它不是必需的元素,如果不在設定中包含它,就無法訪問任務的結果

安裝Celery

  首先,需要安裝好Celery,可以使用PyPI:

pip install celery

選擇一個Broker:RabbitMQ

  為什麼我們需要broker呢?這是因為Celery本身並不構造訊息佇列,所以它需要一個額外的訊息傳輸來完成這項工作。這裡可以將Celery看作訊息代理的包裝器

實際上,也可以從幾個不同的代理中進行選擇,比如RabbitMQ、Redis或資料庫(例如Django資料庫)

在這裡使用RabbitMQ作為代理,因為它功能完整、穩定,Celery推薦使用它。由於演示我的環境是在Mac OS中,安裝RabbitMQ使用Homebrew即可:

brew install rabbitmq
#如果是Ubuntu的話使用apt-get安裝

啟動RabbitMQ

  程式將在/usr/local/sbin中安裝RabbitMQ,雖然有些系統可能會有所不同。可以將此路徑新增到環境變數路徑,以便以後方便地使用。例如,開啟shell啟動檔案~/.bash_profile新增:

PATH=$PATH:/usr/local/sbin

現在,可以使用rabbitmq-server

命令啟動我們的RabbitMQ伺服器。檢查RabbitMQ伺服器成功啟動,將看到類似的輸出:

在這裡插入圖片描述

為Celery配置RabbitMQ

  RabbitMQ使用Celery之前,需要對RabbitMQ進行一些配置。簡單地說,我們需要建立一個虛擬主機和使用者,然後設定使用者許可權,以便它可以訪問虛擬主機

# 新增使用者跟密碼
$ rabbitmqctl add_user test test123
# 新增虛擬主機
$ rabbitmqctl add_vhost test_vhost
# 為使用者新增標籤
$ rabbitmqctl set_user_tags test test_tag
# 設定使用者許可權
$ rabbitmqctl set_permissions -p test_vhost test ".*" ".*" ".*"

敲黑板!RabbitMQ中有三種操作:配置、寫入和讀取

上面命令末尾的字串表示使用者test將擁有所有配置、寫入和讀取許可權

演示專案

現在讓我們建立一個簡單的專案來演示Celery的使用

在這裡插入圖片描述

celery.py中新增以下程式碼:

from __future__ import absolute_import
from celery import Celery

app = Celery('test_celery',broker='amqp://test:test123@localhost/test_vhost',backend='rpc://',include=['test_celery.tasks'])

在這裡,初始化了一個名為app的Celery例項,將用於建立一個任務。Celery的第一個引數只是專案包的名稱,即“test_celery”。

broker引數指定代理URL,對於RabbitMQ,傳輸是amqp。

後端引數指定後端URL。Celery中的後端用於儲存任務結果。因此,如果需要在任務完成時訪問任務的結果,應該為Celery設定一個後端。

rpc意味著將結果作為AMQP訊息傳送回去,這對本次演示來說是一種可接受的格式

include引數指定了在Celery工作程式啟動時要匯入的模組列表。我們在這裡添加了tasks模組,以便找到我們的任務。

tasks.py這個檔案中,定義了我們的任務add_longtime:

from __future__ import absolute_import
from test_celery.celery import app
import time

@app.task
def add_longtime(a,b):
  print 'long time task begins'
  # sleep 5 seconds
  time.sleep(5)
  print 'long time task finished'
  return a + b

可以看到,匯入了在前面的Celery模組中定義的應用程式,並將其用作任務方法的裝飾器。另外注意!app.task只是一個裝飾器。此外,我們在add_longtime任務中休眠5秒,以模擬一個耗時較長的Task

在設定好Celery之後,我們需要開始執行任務,它包含在runs_tasks.py:

from .tasks import add_longtime
import time

if __name__ == '__main__':
  result = add_longtime.delay(1,2)
	#此時,任務還未完成,它將返回False
  print 'Task finished? ',result.ready()
  print 'Task result: ',result.result
  # 延長到10秒以確保任務已經完成
  time.sleep(10)
  # 現在任務完成,ready方法將返回True
  print 'Task finished? ',result.result

這裡,我們使用delay方法呼叫任務add_longtime,如果我們想非同步處理任務,就需要使用delay方法。此外,儲存任務的結果並列印一些資訊。如果任務已經完成,ready方法將返回True,否則返回False。result屬性是任務的結果,如果任務尚未完成,則返回None。

啟動Celery

現在,可以使用下面的命令啟動Celery(注:在專案資料夾中執行):

celery -A test_celery worker --loglevel=info

Celery成功連線到RabbitMQ,你會看到這樣的東西:

在這裡插入圖片描述

執行任務

再專案檔案中輸入以下命令執行它:

python -m test_celery.run_tasks

檢視Celery控制檯,看到執行任務:

[2020-05-15 17:15:21,508: INFO/MainProcess]
Received task: test_celery.tasks.add_longtime[25ba9c87-69a7-4383-b983-1cefdb32f8b3]
[2020-05-15 17:15:21,508: WARNING/Worker-3] long time task begins
[2020-05-15 17:15:31,510: WARNING/Worker-3] long time task finished
[2020-05-15 17:15:31,512: INFO/MainProcess]
Task test_celery.tasks.add_longtime[25ba9c87-69a7-4383-b983-1cefdb32f8b3] succeeded in 15.003732774s: 3

當Celery收到一個任務,它打印出任務名稱與任務id(在括號中):

Received task: test_celery.tasks.add_longtime[7d942984-8ea6-4e4d-8097-225616f797d5]

在這一行下面是我們的任務add_longtime列印的兩行,時間延遲為5秒:

long time task begins
long time task finished

最後一行顯示我們的任務在5秒內完成,任務結果為3:

Task test_celery.tasks.add_longtime[7d942984-8ea6-4e4d-8097-225616f797d5] succeeded in 5.025242167s: 3

在當前控制檯中,您將看到以下輸出:

在這裡插入圖片描述

實時監控Celery

Flower是一款基於網路的Celery實時監控軟體。使用Flower,可以輕鬆地監視任務進度和歷史記錄

使用pip來安裝Flower:

pip install flower

要啟動Flower web控制檯,需要執行以下命令:

celery -A test_celery flower

Flower將執行具有預設埠5555的伺服器,可以通過http://localhost:5555訪問web控制檯

在這裡插入圖片描述

到此這篇關於詳解Python Celery和RabbitMQ實戰教程的文章就介紹到這了,更多相關Python Celery和RabbitMQ實戰內容請搜尋我們以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援我們!