AcWing245你能回答這些問題嗎(線段樹+邏輯思維)
目錄
訊息佇列
作用:
1)程式解耦 允許你獨立的擴充套件或修改兩邊的處理過程,只要確保它們遵守同樣的介面約束。 2)冗餘: 訊息佇列把資料進行持久化直到它們已經被完全處理,通過這一方式規避了資料丟失風險。 許多訊息佇列所採用的"插入-獲取-刪除"正規化中,在把一個訊息從佇列中刪除之前,需要你的處理系統明確的指出該訊息已經被處理完畢,從而確保你的資料被安全的儲存直到你使用完畢。 3)峰值處理能力: (大白話,就是本來公司業務只需要5臺機器,但是臨時的秒殺活動,5臺機器肯定受不了這個壓力,我們又不可能將整體伺服器架構提升到10臺,那在秒殺活動後,機器不就浪費了嗎?因此引入訊息佇列) 在訪問量劇增的情況下,應用仍然需要繼續發揮作用,但是這樣的突發流量並不常見。 如果為以能處理這類峰值訪問為標準來投入資源隨時待命無疑是巨大的浪費。 使用訊息佇列能夠使關鍵元件頂住突發的訪問壓力,而不會因為突發的超負荷的請求而完全崩潰。 4)可恢復性: 系統的一部分元件失效時,不會影響到整個系統。 訊息佇列降低了程序間的耦合度,所以即使一個處理訊息的程序掛掉,加入佇列中的訊息仍然可以在系統恢復後被處理。 5)順序保證: 在大多使用場景下,資料處理的順序都很重要。 大部分訊息佇列本來就是排序的,並且能保證資料會按照特定的順序來處理。(Kafka保證一個Partition內的訊息的有序性) 6)緩衝: 有助於控制和優化資料流經過系統的速度,解決生產訊息和消費訊息的處理速度不一致的情況。 7)非同步通訊: 很多時候,使用者不想也不需要立即處理訊息。比如發紅包,發簡訊等流程。 訊息佇列提供了非同步處理機制,允許使用者把一個訊息放入佇列,但並不立即處理它。想向佇列中放入多少訊息就放多少,然後在需要的時候再去處理它們。
rabbitMQ
-
安裝rabbitmq
yum -y install erlang yum -y install rabbitmq-server
-
開啟rabbitmq服務端
systemctl start rabbitmq-server
-
開啟rabbitmq的管理頁面
rabbitmq-plugins enable rabbutmq_management
-
建立rabbitmq的後臺管理使用者
sudo rabbitmqctl add_user yan 123456 #建立使用者 sudo rabbitmqctl set_user_tags yan administrator #設定使用者管理員身份 sudo rabbitmqctl set_permissions -p "/" yan ".*" ".*" ".*" #允許yan使用者,對所有的佇列都可以讀寫
-
重啟rebbitmq服務端
systemctl restart rabbitmq-server
-
檢視埠
netstat -tunlp # 訪問 192.168.16.66:15672
-
下載pika模組,實現生產消費者
pip3 install pika
單傳送單接收
建立一個pro.py
檔案,寫入如下程式碼,作為消費者
#!/usr/bin/env python import pika # 建立憑證,使用rabbitmq使用者密碼登入 # 去郵局取郵件,必須得驗證身份 credentials = pika.PlainCredentials("yan","123456") # 新建連線,這裡localhost可以更換為伺服器ip # 找到這個郵局,等於連線上伺服器 connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.16.66',credentials=credentials)) # 建立頻道 # 建造一個大郵箱,隸屬於這家郵局的郵箱,就是個連線 channel = connection.channel() # 宣告一個佇列,用於接收訊息,佇列名字叫“西遊記” channel.queue_declare(queue='西遊記') # 注意在rabbitmq中,訊息想要傳送給佇列,必須經過交換(exchange),初學可以使用空字串交換(exchange=''),它允許我們精確的指定傳送給哪個佇列(routing_key=''),引數body值傳送的資料 channel.basic_publish(exchange='', routing_key='西遊記', body='大師兄,師傅被妖怪抓走了') print("已經發送了訊息") # 程式退出前,確保重新整理網路緩衝以及訊息傳送給rabbitmq,需要關閉本次連線 connection.close()
執行:
python3 pro.py
建立一個con.py
檔案,寫入消費者的程式碼
import pika
# 建立與rabbitmq的連線
# 前四行都是連線到同一個rabbitmq服務端以及同一個佇列
credentials = pika.PlainCredentials("yan","123456")
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.16.66',credentials=credentials))
channel = connection.channel()
channel.queue_declare(queue="西遊記")
def callbak(ch,method,properties,body):
print("消費者取出了訊息:%r"%body.decode("utf8"))
# 有訊息來臨,立即執行callbak,沒有訊息則夯住,等待訊息
# 老百姓開始去郵箱取郵件啦,佇列名字是西遊記
channel.basic_consume(callbak,queue="西遊記",no_ack=True) # no_ack 表示不用確認
# 開始消費,接收訊息
channel.start_consuming()
ACK機制 -- 訊息確認
預設情況下,生產者傳送資料給佇列,消費者取出訊息後,資料將被清除。
特殊情況,如果消費者處理過程中,出現錯誤,資料處理沒有完成,那麼這段資料將從佇列丟失。
no-ack機制:不確認機制也就是說每次消費者接收到資料後,不管是否處理完畢,rabbitmq-server都會把這個訊息標記完成,從佇列中刪除。
ack機制:ACK機制用於保證消費者如果拿了佇列的訊息,客戶端處理時出錯了,那麼佇列中仍然還存在這個訊息,提供下一位消費者繼續取。
生產者 pro.py
# 訊息之ack機制
#!/usr/bin/env python
import pika
# 建立憑證,使用rabbitmq使用者密碼登入
# 去郵局取郵件,必須得驗證身份
credentials = pika.PlainCredentials("yan","123456")
# 新建連線,這裡localhost可以更換為伺服器ip
# 找到這個郵局,等於連線上伺服器
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.16.66',credentials=credentials))
# 建立頻道
# 建造一個大郵箱,隸屬於這家郵局的郵箱,就是個連線
channel = connection.channel()
# 新建一個hello佇列,用於接收訊息
# 這個郵箱可以收發各個班級的郵件,通過
channel.queue_declare(queue='金品沒')
# 注意在rabbitmq中,訊息想要傳送給佇列,必須經過交換(exchange),初學可以使用空字串交換(exchange=''),它允許我們精確的指定傳送給哪個佇列(routing_key=''),引數body值傳送的資料
channel.basic_publish(exchange='',
routing_key='金品沒',
body='大郎,起來喝藥了')
print("已經發送了訊息")
# 程式退出前,確保重新整理網路緩衝以及訊息傳送給rabbitmq,需要關閉本次連線
connection.close()
消費者 con.py no_ack=False
import pika
credentials = pika.PlainCredentials("yan","123456")
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.16.66',credentials=credentials))
channel = connection.channel()
# 宣告一個佇列(建立一個佇列)
channel.queue_declare(queue='金品沒')
def callback(ch, method, properties, body):
print("消費者接受到了任務: %r" % body.decode("utf-8"))
int('asdfasdf')
# 我告訴rabbitmq服務端,我已經取走了訊息
# 回覆方式在這
ch.basic_ack(delivery_tag=method.delivery_tag)
# 關閉no_ack,代表給與服務端ack回覆,確認給與回覆
channel.basic_consume(callback,queue='金品沒',no_ack=False) # no_ack=False 禁止不確認機制,代表需要給與服務端訊息確認回覆
channel.start_consuming()
訊息持久化
讓佇列以及訊息支援持久化,防止異常崩潰,訊息丟失。
訊息的可靠性是RabbitMQ的一大特色,那麼RabbitMQ是如何保證訊息可靠性的呢——訊息持久化。 為了保證RabbitMQ在退出或者crash等異常情況下資料沒有丟失,需要將queue,exchange和Message都持久化。
生產者 pro.py
import pika
# 無密碼
# connection = pika.BlockingConnection(pika.ConnectionParameters('123.206.16.61'))
# 有密碼
credentials = pika.PlainCredentials("yan","123456")
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.16.66',credentials=credentials))
channel = connection.channel()
# 宣告一個佇列(建立一個佇列)
# 預設此佇列不支援持久化,如果服務掛掉,資料丟失
# durable=True 開啟持久化,必須新開啟一個佇列,原本的佇列已經不支援持久化了
'''
實現rabbitmq持久化條件
delivery_mode=2
使用durable=True宣告queue是持久化
'''
channel.queue_declare(queue='LOL',durable=True) #此步表示佇列是支援持久化的引數
channel.basic_publish(exchange='',
routing_key='LOL', # 訊息佇列名稱
body='我用雙手成就你的夢想',
# 支援資料持久化
properties=pika.BasicProperties(
delivery_mode=2, # 代表訊息是持久的2
)
)
connection.close()
RPC 遠端過程呼叫
將一個函式執行在遠端計算機上並且等待獲取那裡的結果,這個稱作遠端過程呼叫(Remote Procedure Call)或者 RPC。
RPC是一個計算機通訊協議。
Callback queue 回撥佇列:一個客戶端向伺服器傳送請求,伺服器端處理請求後,將其處理結果儲存在一個儲存體中。而客戶端為了獲得處理結果,那麼客戶在向伺服器傳送請求時,同時傳送一個回撥佇列地址reply_to
。
Correlation id 關聯標識:一個客戶端可能會發送多個請求給伺服器,當伺服器處理完後,客戶端無法辨別在回撥佇列中的響應具體和那個請求時對應的。為了處理這種情況,客戶端在傳送每個請求時,同時會附帶一個獨有correlation_id
屬性,這樣客戶端在回撥佇列中根據correlation_id
欄位的值就可以分辨此響應屬於哪個請求。
利用RabbitMQ構建一個RPC系統,包含了客戶端和RPC伺服器,依舊使用pika模組。
rpc_server.py
import pika
import uuid
class FibonacciRpcClient(object):
def __init__(self):
# 客戶端啟動時,建立回撥佇列,會開啟會話用於傳送RPC請求以及接受響應
# 建立連線,指定伺服器的ip地址
self.connection = pika.BlockingConnection(pika.ConnectionParameters(
host='192.168.16.66'))
# 建立一個會話,每個channel代表一個會話任務
self.channel = self.connection.channel()
# 宣告回撥佇列,再次宣告的原因是,伺服器和客戶端可能先後開啟,該宣告是冪等的,多次宣告,但只生效一次
#exclusive=True 引數是指只對首次宣告它的連線可見
#exclusive=True 會在連線斷開的時候,自動刪除
result = self.channel.queue_declare(exclusive=True)
# 將次佇列指定為當前客戶端的回撥佇列
self.callback_queue = result.method.queue
# 客戶端訂閱回撥佇列,當回撥佇列中有響應時,呼叫`on_response`方法對響應進行處理;
self.channel.basic_consume(self.on_response, no_ack=True,
queue=self.callback_queue)
# 對回撥佇列中的響應進行處理的函式
def on_response(self, ch, method, props, body):
if self.corr_id == props.correlation_id:
self.response = body
# 發出RPC請求
# 例如這裡服務端就是一個切菜師傅,菜切好了,需要傳遞給洗菜師傅,這個過程是傳送rpc請求
def call(self, n):
# 初始化 response
self.response = None
# 生成correlation_id 關聯標識,通過python的uuid庫,生成全域性唯一標識ID,保證時間空間唯一性
self.corr_id = str(uuid.uuid4())
# 傳送RPC請求內容到RPC請求佇列`s24rpc`,同時傳送的還有`reply_to`和`correlation_id`
self.channel.basic_publish(
exchange='',
routing_key='s24rpc', properties=pika.BasicProperties( reply_to=self.callback_queue, correlation_id=self.corr_id,),
body=str(n))
while self.response is None:
self.connection.process_data_events()
return int(self.response)
# 建立客戶端
fibonacci_rpc = FibonacciRpcClient()
# 傳送RPC請求,丟進rpc佇列,等待客戶端處理完畢,給與響應
print("傳送了請求sum(99)")
response = fibonacci_rpc.call(99)
print("得到遠端結果響應%r" % response)
rpc_client.py
import pika
# 建立連線,伺服器地址為localhost,可指定ip地址
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='192.168.16.66'))
# 建立會話
channel = connection.channel()
# 宣告RPC請求佇列
channel.queue_declare(queue='s24rpc')
# 模擬一個程序,例如切菜師傅,等著洗菜師傅傳遞資料
def sum(n):
n+=100
return n
# 對RPC請求佇列中的請求進行處理
def on_request(ch, method, props, body):
print(body,type(body))
n = int(body)
print(" 正在處理sum(%s)" % n)
# 呼叫資料處理方法
response = sum(n)
# 將處理結果(響應)傳送到回撥佇列
ch.basic_publish(exchange='',
# reply_to代表回覆目標
routing_key=props.reply_to,
# correlation_id(關聯標識):用來將RPC的響應和請求關聯起來。
properties=pika.BasicProperties(correlation_id= \
props.correlation_id),
body=str(response))
ch.basic_ack(delivery_tag=method.delivery_tag)
# 負載均衡,同一時刻傳送給該伺服器的請求不超過一個
channel.basic_qos(prefetch_count=1)
channel.basic_consume(on_request, queue='s14rpc')
print("等待接收rpc請求")
# 開始消費
channel.start_consuming()