1. 程式人生 > >RabbitMQ入門(二)工作佇列

RabbitMQ入門(二)工作佇列

  在文章RabbitMQ入門(一)之Hello World,我們編寫程式通過指定的佇列來發送和接受訊息。在本文中,我們將會建立工作佇列(Work Queue),通過多個workers來分配耗時任務。
  工作佇列(Work Queue,也被成為Task Queue,任務佇列)的中心思想是,避免立即執行一個資源消耗巨大且必須等待其完成的任務。相反地,我們排程好佇列可以安排該任務稍後執行。我們將一個任務(task)封裝成一個訊息,將它傳送至佇列。一個在後臺執行的work程序將會丟擲該任務,並最終執行該任務。當你執行多個workers的時候,任務將會在它們之中共享。
  這個概念在web開發中很有用,因為通過一個短的HTTP請求不可能處理複雜的任務。
  在之前的文章中,我們傳送了一個包含“Hello World!”的訊息。現在我們將會發送代表複雜任務的字串符。我們並沒有實際上的任務,比如重新調整圖片的尺寸或者渲染PDF,我們假裝有這樣的複雜任務,通過使用time.sleep()

函式。我們將會用字串中的點(.)來代表複雜度;每一個點代表一秒中的任務。舉例來說,字串Hello...需要花費三秒。
  我們需要稍微修改下sent.py中的程式碼,允許在命令中輸入任意字串。該程式會排程任務至工作佇列,因此命名為new_task.py

import sys

message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(exchange='',
                      routing_key='hello',
                      body=message)
print(" [x] Sent %r" % message)

  我們原先的receive.py也需要改動:它需要在訊息體中將字串的每一個點代表1秒鐘的任務。它會從佇列中丟擲訊息並執行該訊息,因此命名為task.py

import time

def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    time.sleep(body.count(b'.'))
    print(" [x] Done")

Round-Robin分發(輪詢分發)

  使用工作佇列的一個好處就是它能夠輕鬆實現並行工作。如果我們建立了一項積壓的工作,那麼我們可以增加更多的worker來使它的擴充套件性更好。
  首先,我們同時執行兩個worker.py

指令碼。他們都能夠從佇列中獲取訊息,但是具體是怎麼實現的呢?讓我們接著閱讀。
  你需要開啟三個終端檢視。兩個終端用於執行worker.py指令碼。這兩個終端將會成為兩個消費者——C1和C2。

# shell 1
python worker.py
# => [*] Waiting for messages. To exit press CTRL+C
# shell 2
python worker.py
# => [*] Waiting for messages. To exit press CTRL+C

在第三個終端中,我們將會產生新的任務。一旦你啟動了這些消費者,你就可以傳送一些訊息了:

# shell 3
python new_task.py First message.
python new_task.py Second message..
python new_task.py Third message...
python new_task.py Fourth message....
python new_task.py Fifth message.....

讓我們看看這兩個workers傳遞了什麼:

# shell 1
python worker.py
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received 'First message.'
# => [x] Received 'Third message...'
# => [x] Received 'Fifth message.....'
# shell 2
python worker.py
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received 'Second message..'
# => [x] Received 'Fourth message....'

RabbitMQ預設會將每個訊息依次地傳送給下一個消費者。因此總的來說,每個消費者將會同樣數量的訊息。這種訊息分配的方法叫Round-Robin。你可以嘗試三個或者更多的worker。

訊息確認(Message Acknowledgement)

  執行一項任務需要花費幾秒鐘。你也許會好奇,如果其中一個消費者執行一項耗時很長的任務,並且在執行了一部分的時候掛掉了,將會發生什麼?根據我們現在的程式碼,一旦RabbitMQ將訊息傳送至消費者,那麼RabbitMQ就會標誌它為刪除狀態。在這種情況下,如果我們殺死某個worker,我們將會失去他正在處理的訊息。我們也會失去所有分配至這個worker的訊息,當然,這些訊息還未被處理。
  但是,我們不希望失去任何一項任務。如果有一個worker掛掉了,我們希望這些任務能夠被傳送至另一個worker。
  為了確保訊息不丟失,RabbitMQ支援訊息確認。一個ack(nowledgement)是由消費者傳送回來的,用於告訴RabbitMQ,這個特定的訊息已經被接受,被處理,可以被刪除了。
  如果一個消費者掛了(它的channel關閉了,連線關閉了,或者TCP連線丟失)但是沒有傳送一個ack,RabbitMQ就會知道這個訊息並未被完全處理,會將它重新塞進佇列。如果同時還存在著其他線上消費者,RabbbitMQ將會將這個訊息重新傳送給另一個消費者。用這種方式可以確保沒有訊息丟失,即使workers偶爾會刮掉。
  並不存在訊息超時;如果消費者掛了,RabbitMQ將會重新傳送訊息。這樣即使處理一個訊息需要消耗很長很長的時間,也是可以的。
  預設的訊息確認方式為人工訊息確認。在我們之前的例子中,我們清晰地將它關閉了,使用了auto_ack=True這個命令。當我們完成一項任務的時候,根據需要,移除這個標誌,從worker中傳送一個合適的確認。

def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    time.sleep( body.count('.') )
    print(" [x] Done")
    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_consume(queue='hello', on_message_callback=callback)

使用上述程式碼,我們可以確保,即使我們使用CTRL+C命令殺死了一個正在處理訊息的woker,也不會丟失什麼。這個worker掛掉後不久,所有未確認的訊息將會被重新傳送。
  訊息確認必須在同一個傳輸訊息的channel中傳送。嘗試著在不同的channel中進行訊息確認將會引發channel-level protocol exception。

訊息持久化(Message Durability)

  我們已經學習瞭如何在消費者掛掉的情況下,任務不會丟失。但是,當RabbitMQ server停止時,我們的任務仍然會丟失。
  當RabbitMQ停止或崩潰時,它將會忘記所有的佇列和訊息,除非你告訴它不這麼做。在這種情況下,需要做兩個事情確保訊息不會丟失:我們需要將佇列和訊息都設定為持久化。
  首先,我們需要確保RabbitMQ不會丟失佇列。為了實現這個,我們需要將佇列宣告為持久化:

channel.queue_declare(queue='hello', durable=True)

儘管這個命令是正確的,但他仍會不會起作用。這是因為,我們已經建立了一個叫為hello的非持久化佇列。RabbitMQ不允許你重新定義一個已經存在的佇列而引數不一樣,所有這樣做的程式只會引發錯誤。但是有一個快速的應變辦法——我們可以建立一個不同名稱的佇列,比如task_queue

channel.queue_declare(queue='task_queue', durable=True)

queue_declare需要同時應用於生產者和消費者。
  在這點上我們可以確保task_queue佇列不會丟失訊息即使RabbitMQ重啟。現在,我們需要宣告訊息為持久化——將delivery_mode這個引數設定為2。

channel.basic_publish(exchange='',
                      routing_key="task_queue",
                      body=message,
                      properties=pika.BasicProperties(
                         delivery_mode = 2, # make message persistent
                      ))

公平分發(Fair Dispatch)

  你也許注意到了,剛才的訊息分發機制並不會嚴格地按照我們所希望的方式進行。舉這樣一個例子,設想有兩個worker,而所有的奇數訊息都很重而偶數訊息都是輕量級的,這樣其中一個worker就會一直很忙而另一個worker幾乎不做什麼工作。然而,RabbitMQ對此一無所知,它仍然會平均分配訊息。
  這種情況的發生是因為RabbitMQ僅僅是當訊息進入佇列的時候就會分發這個訊息。它並不會注意消費者所接收的未確認的訊息數量。它盲目地將第n個訊息傳送至第n個消費者。

  為了克服這種情況,我們可以在basic.qos方法中設定prefetch_count=1。這就告訴RabbitMQ一次不要將多於一個的訊息傳送給一個worker。換句話說,不要分發一個新的訊息給worker除非這個worker已經處理好之前的訊息並且進行了訊息確認。也就說,RabbitMQ將會將這個訊息分發給下一個不是很忙的worker。

channel.basic_qos(prefetch_count=1)

實戰1

  為了對上面的例子有一個好的理解,我們需要寫程式碼進行實際操練一下。
  生產者new_task.py的程式碼如下:

# -*- coding: utf-8 -*-

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)

message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(
    exchange='',
    routing_key='task_queue',
    body=message,
    properties=pika.BasicProperties(
        delivery_mode=2,  # make message persistent
    ))
print(" [x] Sent %r" % message)
connection.close()

  消費者worker.py的完整程式碼如下:

# -*- coding: utf-8 -*-

import pika
import time

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)
print(' [*] Waiting for messages. To exit press CTRL+C')


def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    time.sleep(body.count(b'.'))
    print(" [x] Done")
    ch.basic_ack(delivery_tag=method.delivery_tag)


channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='task_queue', on_message_callback=callback)

channel.start_consuming()

  開啟三個終端,訊息的傳送和接收情況如下:

  如果我們停掉其中一個worker,那麼訊息的接收情況如下:

可以看到,現在所有傳送的訊息都會被這個仍在工作的worker接收到。

實戰2

  接下來,我們將會使用RabbitMQ的這種工作佇列的方式往MySQL資料庫中的表插入資料。
  資料庫為orm_test,表格為exam_user,表結構如下:

  接下來,我們需要往這張表中插入隨機建立的資料。如果我們利用Python的第三方模組pymysql,每一次插入一條記錄,那麼一分鐘插入53237條記錄。
  利用RabbitMQ,我們的生產者程式碼如下:

# -*- coding: utf-8 -*-
# author: Jclian91
# place: Pudong Shanghai
# time: 2020-01-13 23:23
import pika
from random import choice

names = ['Jack', 'Rose', 'Mark', 'Hill', 'Docker', 'Lilei', 'Lee', 'Bruce', 'Dark',
         'Super', 'Cell', 'Fail', 'Suceess', 'Su', 'Alex', 'Bob', 'Cook', 'David',
         'Ella', 'Lake', 'Moon', 'Nake', 'Zoo']
places = ['Beijing', 'Shanghai', 'Guangzhou', 'Dalian', 'Qingdao']
types = ['DG001', 'DG002', 'DG003', 'DG004', 'DG005', 'DG006', 'DG007', 'DG008',
         'DG009', 'DG010', 'DG020']


connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)

for id in range(1, 20000001):
    name = choice(names)
    place = choice(places)
    type2 = choice(types)
    message = "insert into exam_users values(%s, '%s', '%s', '%s');" % (id, name, place, type2)

    channel.basic_publish(
        exchange='',
        routing_key='task_queue',
        body=message,
        properties=pika.BasicProperties(
            delivery_mode=2,  # make message persistent
        ))
    print(" [x] Sent %r" % message)
connection.close()

  消費者程式碼如下:

# -*- coding: utf-8 -*-
# author: Jclian91
# place: Pudong Shanghai
# time: 2020-01-13 23:28
# -*- coding: utf-8 -*-
# author: Jclian91
# place: Sanya Hainan
# time: 2020-01-12 13:45
import pika
import time
import pymysql

# 開啟資料庫連線
db = pymysql.connect(host="localhost", port=3306, user="root", password="", db="orm_test")

# 使用 cursor() 方法建立一個遊標物件 cursor
cursor = db.cursor()

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)
print(' [*] Waiting for messages. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    cursor.execute(body)
    db.commit()
    print(" [x] Insert successfully!")
    ch.basic_ack(delivery_tag=method.delivery_tag)


channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='task_queue', on_message_callback=callback)

channel.start_consuming()

我們開啟9個終端,其中8個消費者1個生產者,先啟動消費者,然後生產者,按照上面的資料匯入方式,一分鐘插入了133084條記錄,是普通方式的2.50倍,效率有大幅度提升!
  讓我們稍微修改下生產者和消費者的程式碼,一次提交插入多條記錄,減少每提交一次就插入一條記錄的消耗時間。新的生產者程式碼如下:

# -*- coding: utf-8 -*-
# author: Jclian91
# place: Pudong Shanghai
# time: 2020-01-13 23:23
import pika
from random import choice
import json

names = ['Jack', 'Rose', 'Mark', 'Hill', 'Docker', 'Lilei', 'Lee', 'Bruce', 'Dark',
         'Super', 'Cell', 'Fail', 'Suceess', 'Su', 'Alex', 'Bob', 'Cook', 'David',
         'Ella', 'Lake', 'Moon', 'Nake', 'Zoo']
places = ['Beijing', 'Shanghai', 'Guangzhou', 'Dalian', 'Qingdao']
types = ['DG001', 'DG002', 'DG003', 'DG004', 'DG005', 'DG006', 'DG007', 'DG008',
         'DG009', 'DG010', 'DG020']


connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)

for _ in range(1, 200001):

    values = []
    for i in range(100):
        name = choice(names)
        place = choice(places)
        type2 = choice(types)
        values.append([100*_+i+1, name, place, type2])
    message = json.dumps(values)


    channel.basic_publish(
        exchange='',
        routing_key='task_queue',
        body=message,
        properties=pika.BasicProperties(
            delivery_mode=2,  # make message persistent
        ))
    print(" [x] Sent %r" % message)

connection.close()

  新的消費者的程式碼如下:

# -*- coding: utf-8 -*-
# author: Jclian91
# place: Pudong Shanghai
# time: 2020-01-13 23:28
# -*- coding: utf-8 -*-
# author: Jclian91
# place: Sanya Hainan
# time: 2020-01-12 13:45
import pika
import json
import time
import pymysql

# 開啟資料庫連線
db = pymysql.connect(host="localhost", port=3306, user="root", password="", db="orm_test")

# 使用 cursor() 方法建立一個遊標物件 cursor
cursor = db.cursor()

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)
print(' [*] Waiting for messages. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    sql = 'insert into exam_users values(%s, %s, %s, %s)'
    cursor.executemany(sql, json.loads(body))
    db.commit()
    print(" [x] Insert successfully!")
    ch.basic_ack(delivery_tag=method.delivery_tag)


channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='task_queue', on_message_callback=callback)

channel.start_consuming()

跟剛才一樣,我們開啟9個終端,其中8個消費者1個生產者,先啟動消費者,然後生產者,按照上面的資料匯入方式,一分鐘插入了3170600條記錄,是普通方式的59.56倍,是先前一次只提交一條記錄的插入方式的23.82倍。這樣的提速無疑是非常驚人的!
  當然還有更高效的資料插入方法,本文的方法僅僅是為了演示RabbitMQ的工作佇列以及在插入資料方面的提速。

  本次分享到此結束,感謝大家閱