通過celery非同步處理一個查詢任務的完整程式碼
今天介紹通過celery實現一個非同步任務。有這樣一個需求,前端發起一個查詢的請求,但是發起查詢後,查詢可能不會立即返回結果。這時候,發起查詢後,後端可以把這次查詢當作一個task,並立即返回一個能唯一表明該task的值,如taskID(使用者後面可以通過這個taskID 隨時檢視結果),使用者收到這個taskID後,可以轉去處理其他任務,而不必一直等待查詢結果。後端API呼叫celery來處理這個task,並將結果值儲存在一個csv檔案中,後面使用者通過taskID 查詢時返回結果。
def application(environ,start_response): """部分程式碼省略""" query_string = environ['QUERY_STRING'] serviceGroupName = "" for getParam in query_string.split("&"): params = getParam.split("=") resultInfo = "" if params[0] == "type": alertType = params[1] elif params[0] == "projectName": projectName = params[1] elif params[0] == "serviceGroupName": serviceGroupName = params[1] else: resultInfo = error_info(-1,"GET引數只能為type=<?>&projectName=<?>&serviceGroupName=<?>;必須指定三個引數",{}) return [resultInfo] taskId = 1 result_file_name = '/var/www/dba_api/api/test/'+ str(taskId) + '.csv' contentInfo = json.dumps({"taskId":1,"opType":"continue","serviceGroupName":serviceGroupName,"dbHost":dbHost,"dbPasswd":dbPasswd,"dbUser":dbUser,"dbPort":dbPort}) result = getServiceInfo.apply_async((contentInfo,),queue="getServiceInfo") taskInfo = "任務已經建立,詳情請檢視:http://10.4.34.254/api/task?taskId=%s"% (taskId) return [resultInfo]
getServiceInfo.apply_async((contentInfo,queue=”getServiceInfo”),重點是這一行,apply_async()方法會返回一個AsyncResult例項,通過這個例項可以跟蹤任務狀態軌跡。
要使用此功能,需要提供結果後臺(result backend),這樣才有地方儲存任務狀態等資訊。其中,getServiceInfo是自定義的一個task,後續會介紹到,contentInfo是傳遞的一個引數,queue是指定佇列名稱。
上面這個函式的原型如下:
task.apply_async(args[,kwargs[,…]])
其中 args 和 kwargs 分別是 task 接收的引數,當然它也接受額外的引數對任務進行控制。
在 Celery 中執行任務的方法一共有三種:
1. delay, 用來進行最簡單便捷的任務執行(delay在第3小節的測試中使用過,它可以看作是apply_async的一個快捷方式);
2. apply_async, 對於任務的執行附加額外的引數,對任務進行控制;
3. app.send_task, 可以執行未在 Celery 中進行註冊的任務。
celery檔案配置
在python的庫存放路徑中(一般是/usr/lib/python2.6/site-packages),建立一個資料夾proj,進入proj目錄,建立三個檔案,init,將proj宣告一個python包,celepy,其內容如下:
#_*_ coding:utf-8 _*_ from __future__ import absolute_import from celery import Celery app = Celery("proj",broker="amqp://user:password@localhost//",backend="amqp",include=["proj.tasks"] ) app.conf.update( CELERY_ROUTES={ "proj.tasks.getServerInfo":{"queue":"getServerInfo"},} ) if __name__=="__main__": app.start()
這裡我們定義了模組名稱proj以及celery 路由。
還有一個檔案,task.py
#_*_ coding:utf-8 _*_i from __future__ import absolute_import from proj.celery import app import random import simplejson as json import types import time import MySQLdb import urllib2 import ConfigParser as cparser import hmac import hashlib import base64 @app.task def getServiceInfo(contentInfo): contentInfo = json.loads(contentInfo) serviceGroupName = contentInfo['serviceGroupName'] dbHost = contentInfo['dbHost'] dbPort = int(contentInfo['dbPort']) dbUser = contentInfo['dbUser'] dbPasswd = contentInfo['dbPasswd'] msgLib = MessageLib.MessageLib() Sql = "Your SQL" #第三步:連線資料庫,執行程式碼邏輯 try: db_connection = MySQLdb.connect(host=dbHost,port=dbPort,passwd=dbPasswd,db="cmdb",user=dbUser,connect_timeout=2,charset="utf8") cursor = db_connection.cursor() cursor.execute(getServiceGroupHostSql) row = cursor.fetchall() result = [] for line in row: ... result.append(tempMysqlHighInfo) resultInfo = msgLib.success_info(result) return resultInfo except Exception,e: raise errorInfo = "dbhost:%s,port:%s,error:%s" % (dbHost,dbPort,str(e)) #return getServiceGroupHostSql,errorInfo return msgLib.error_info(-1,errorInfo,{})
啟動celery
celery -A proj worker -Q getServiceInfo -l debug -c 6
最後,寫一個結果,專門獲取查詢結果的結果,傳入的引數為taskID,部分程式碼如下:
def application(environ,start_response): status = '400 ERROR' response_headers = [('Content-type','application/json;charset=utf-8')] start_response(status,response_headers) status = '200 OK' response_headers = [('Content-type',response_headers) if environ['REQUEST_METHOD'] != "GET": resultInfo = msgLib.error_info(-1,"http請求型別不是GET",{}) return [resultInfo] query_string = environ['QUERY_STRING'] serviceGroupName = "" for getParam in query_string.split("&"): params = getParam.split("=") resultInfo = "" if params[0] == "taskId": taskId = params[1] else: resultInfo = msgLib.error_info(-1,"GET引數無比指定taskId這個引數",{}) return [resultInfo] logging.info(query_string) result_file_name = '/var/www/dba_api/api/test/'+ str(taskId) + '.csv' result = [] try: with open (result_file_name,'rb') as fp: lines = csv.reader(fp) for line in lines : result.append(line) resultInfo = msgLib.success_info(result) return resultInfo except Exception,e: errorInfo = "some thing wrong" return msgLib.error_info(-1,{})
以上這篇通過celery非同步處理一個查詢任務的完整程式碼就是小編分享給大家的全部內容了,希望能給大家一個參考,也希望大家多多支援我們。