Java--Object類
阿新 • • 發佈:2020-09-08
一、簡介
訊息佇列就是基礎資料結構中的“先進先出”的一種資料機構,
RabbitMQ是一個開源的AMQP實現,伺服器端用Erlang語言編寫,支援多種客戶端,如:Python、Ruby、Java、JMS、C等,支援AJAX。
用於在分散式系統中儲存轉發訊息,在易用性、擴充套件性、高可用性等方面表現不俗。
二、RabbitMQ的作用
1.主要應用於應用解耦
以電商應用為例,應用中有訂單系統、庫存系統、物流系統、支付系統。使用者建立訂單後,如果耦合呼叫庫存系統、物流系統、支付系統,任何一個子系統出了故障,都會造成下單操作異常。
當轉變成基於訊息佇列的方式後,系統間呼叫的問題會減少很多,比如物流系統因為發生故障,需要幾分鐘來修復。在這幾分鐘的時間裡,物流系統要處理的記憶體被快取在訊息佇列中,使用者
的下單操作可以正常完成。當物流系統恢復後,繼續處理訂單資訊即可,中單使用者感受不到物流系統的故障。提升系統的可用性
2.流量削峰
舉個栗子,如果訂單系統最多能處理一萬次訂單,這個處理能力應付正常時段的下單時綽綽有餘,正常時段我們下單一秒後就能返回結果。但是在高峰期,如果有兩萬次下單作業系統
是處理不了的,只能限制訂單超過一萬後不允許使用者下單。
使用訊息佇列做緩衝,我們可以取消這個限制,把一秒內下的訂單分散成一段時間來處理,這事有些使用者可能在下單十幾秒後才能收到下單成功的操作,但是比不能下單的體驗要好。
3.訊息分發
生產者只負責生產訊息,消費者只要監聽了生產者 ,那麼生產者發的訊息就能被接收。
4.非同步訊息
使用訊息匯流排,可以很方便解決這個問題,A呼叫B服務後,只需要監聽B處理完成的訊息,當B處理完成後,會發送一條訊息給MQ,MQ會將此訊息轉發給A服務。
這樣A服務既不用迴圈呼叫B的查詢api,也不用提供callback api。同樣B服務也不用做這些操作。A服務還能及時的得到非同步處理成功的訊息
5.常見訊息佇列及比較
總結:
Kafka在於分散式架構,RabbitMQ基於AMQP協議來實現,RocketMQ/思路來源於kafka,改成了主從結構,在事務性可靠性方面做了優化。廣泛來說,電商、
金融等對事務性要求很高的,可以考慮RabbitMQ和RocketMQ,對效能要求高的可考慮Kafka
三、安裝
服務端安裝:
# 原生: # 安裝配置epel源 # 安裝erlang yum -y install erlang # 安裝RabbitMQ yum -y install rabbitmq-server # 使用Docker docker pull rabbitmq:management docker run -di --name Myrabbitmq -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -p 15672:15672 -p 5672:5672 rabbitmq:management
客戶端:
pip3 install pika
四、使用
基本使用:
# 生產者 import pika # 拿到連線物件 # 有使用者名稱密碼的情況 # connection = pika.BlockingConnection(pika.ConnectionParameters('10.0.0.200')) credentials = pika.PlainCredentials("admin", "admin") connection = pika.BlockingConnection(pika.ConnectionParameters('10.0.0.200', credentials=credentials)) # 拿到channel物件 channel = connection.channel() # 宣告一個佇列 channel.queue_declare(queue='hello') # 指定佇列名字 # 生產者向佇列中放一條訊息 channel.basic_publish(exchange='', routing_key='hello', body='hello world') print(" Sent 'Hello World!'") # 關閉連線 connection.close()
# 消費者 import pika def main(): # 無密碼 # connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.0.0.200')) credentials = pika.PlainCredentials("admin", "admin") connection = pika.BlockingConnection(pika.ConnectionParameters('10.0.0.200', credentials=credentials)) channel = connection.channel() channel.queue_declare(queue='hello') def callback(ch, method, properties, body): print(" [x] Received %r" % body) channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True) channel.start_consuming() if __name__ == '__main__': main()
訊息確認(保障安全)
import pika # 無密碼 # connection = pika.BlockingConnection(pika.ConnectionParameters('127.0.0.1')) # 有密碼 credentials = pika.PlainCredentials("admin", "admin") connection = pika.BlockingConnection(pika.ConnectionParameters('10.0.0.200', credentials=credentials)) channel = connection.channel() # 宣告一個佇列(建立一個佇列) channel.queue_declare(queue='qjk') channel.basic_publish(exchange='', routing_key='qjk', # 訊息佇列名稱 body='hello world') connection.close()生產者
import pika def main(): credentials = pika.PlainCredentials("admin", "admin") connection = pika.BlockingConnection(pika.ConnectionParameters('10.0.0.200', credentials=credentials)) channel = connection.channel() # 宣告一個佇列(建立一個佇列) channel.queue_declare(queue='qjk') def callback(ch, method, properties, body): print("消費者接受到了任務: %r" % body) # 真正的訊息處理完了,再發確認 ch.basic_ack(delivery_tag=method.delivery_tag) # 不會自動回覆確認訊息, # auto_ack=True,佇列收到確認,就會自動把消費過的訊息刪除 channel.basic_consume(queue='qjk', on_message_callback=callback, auto_ack=False) channel.start_consuming() if __name__ == '__main__': main()消費者
持久化
import pika # 無密碼 # connection = pika.BlockingConnection(pika.ConnectionParameters('127.0.0.1')) # 有密碼 credentials = pika.PlainCredentials("admin", "admin") connection = pika.BlockingConnection(pika.ConnectionParameters('10.0.0.200', credentials=credentials)) channel = connection.channel() # 宣告一個佇列(建立一個佇列) channel.queue_declare(queue='qjk', durable=True) # 指定佇列持久化 channel.basic_publish(exchange='', routing_key='qjk', # 訊息佇列名稱 body='hello world', properties=pika.BasicProperties( delivery_mode=2, ) # 指定訊息持久化 ) connection.close()生產者
import pika def main(): credentials = pika.PlainCredentials("admin", "admin") connection = pika.BlockingConnection(pika.ConnectionParameters('10.0.0.200', credentials=credentials)) channel = connection.channel() # 宣告一個佇列(建立一個佇列) channel.queue_declare(queue='qjk', durable=True) # 指定佇列持久化 def callback(ch, method, properties, body): print("消費者接受到了任務: %r" % body) # 真正的訊息處理完了,再發確認 ch.basic_ack(delivery_tag=method.delivery_tag) # 不會自動回覆確認訊息, # auto_ack=True,佇列收到確認,就會自動把消費過的訊息刪除 channel.basic_consume(queue='qjk', on_message_callback=callback, auto_ack=False) channel.start_consuming() if __name__ == '__main__': main()消費者
閒置消費
import pika # 無密碼 # connection = pika.BlockingConnection(pika.ConnectionParameters('127.0.0.1')) # 有密碼 credentials = pika.PlainCredentials("admin", "admin") connection = pika.BlockingConnection(pika.ConnectionParameters('10.0.0.200', credentials=credentials)) channel = connection.channel() # 宣告一個佇列(建立一個佇列) channel.queue_declare(queue='qjk', durable=True) # 指定佇列持久化 channel.basic_publish(exchange='', routing_key='qjk', # 訊息佇列名稱 body='hello world', properties=pika.BasicProperties( delivery_mode=2, ) # 指定訊息持久化 ) connection.close()生產者
import pika def main(): credentials = pika.PlainCredentials("admin", "admin") connection = pika.BlockingConnection(pika.ConnectionParameters('10.0.0.200', credentials=credentials)) channel = connection.channel() # 宣告一個佇列(建立一個佇列) channel.queue_declare(queue='qjk') def callback(ch, method, properties, body): print("消費者接受到了任務: %r" % body) # 真正的訊息處理完了,再發確認 ch.basic_ack(delivery_tag=method.delivery_tag) # 不會自動回覆確認訊息, # auto_ack=True,佇列收到確認,就會自動把消費過的訊息刪除 channel.basic_consume(queue='qjk', on_message_callback=callback, auto_ack=False) channel.basic_qos(prefetch_count=1) # 誰閒置誰獲取,沒必要按照順序一個一個來 channel.start_consuming() if __name__ == '__main__': main()消費者
釋出訂閱
import pika credentials = pika.PlainCredentials("admin", "admin") connection = pika.BlockingConnection(pika.ConnectionParameters('10.0.0.200', credentials=credentials)) channel = connection.channel() # 宣告佇列沒有指定名字,指定了exchange channel.exchange_declare(exchange='logs', exchange_type='fanout') message = "info: Hello World!" channel.basic_publish(exchange='logs', routing_key='', body=message) print(" [x] Sent %r" % message) connection.close()釋出者
# 起多個都能收到訊息 import pika credentials = pika.PlainCredentials("admin", "admin") connection = pika.BlockingConnection(pika.ConnectionParameters('10.0.0.200', credentials=credentials)) channel = connection.channel() channel.exchange_declare(exchange='logs', exchange_type='fanout') result = channel.queue_declare(queue='', exclusive=True) # 隨機的名字 queue_name = result.method.queue print(queue_name) # 繫結 channel.queue_bind(exchange='logs', queue=queue_name) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r" % body) channel.basic_consume( queue=queue_name, on_message_callback=callback, auto_ack=True) channel.start_consuming()訂閱者
釋出訂閱(高階之Routing按關鍵字匹配)
import pika credentials = pika.PlainCredentials("admin", "admin") connection = pika.BlockingConnection(pika.ConnectionParameters('10.0.0.200', credentials=credentials)) channel = connection.channel() # 宣告佇列沒有指定名字,指定了exchange channel.exchange_declare(exchange='qjk123', exchange_type='direct') message = "info: asdfasdfasdfsadfasdf World!" channel.basic_publish(exchange='qjk123', routing_key='bnb', body=message) # 指定關鍵字 print(" [x] Sent %r" % message) connection.close()釋出者
import pika credentials = pika.PlainCredentials("admin", "admin") connection = pika.BlockingConnection(pika.ConnectionParameters('10.0.0.200', credentials=credentials)) channel = connection.channel() channel.exchange_declare(exchange='qjk123', exchange_type='direct') result = channel.queue_declare(queue='', exclusive=True) queue_name = result.method.queue print(queue_name) channel.queue_bind(exchange='qjk123', queue=queue_name, routing_key='nb') print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r" % body) channel.basic_consume( queue=queue_name, on_message_callback=callback, auto_ack=True) channel.start_consuming()訂閱者1
import pika credentials = pika.PlainCredentials("admin", "admin") connection = pika.BlockingConnection(pika.ConnectionParameters('10.0.0.200', credentials=credentials)) channel = connection.channel() channel.exchange_declare(exchange='qjk123', exchange_type='direct') result = channel.queue_declare(queue='', exclusive=True) queue_name = result.method.queue print(queue_name) channel.queue_bind(exchange='qjk123', queue=queue_name, routing_key='nb') channel.queue_bind(exchange='qjk123', queue=queue_name, routing_key='bnb') print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r" % body) channel.basic_consume( queue=queue_name, on_message_callback=callback, auto_ack=True) channel.start_consuming()訂閱者2
釋出訂閱高階之Topic(按關鍵字模糊匹配)
* 只能加一個單詞
# 可以加任意單詞字元
import pika credentials = pika.PlainCredentials("admin", "admin") connection = pika.BlockingConnection(pika.ConnectionParameters('10.0.0.200', credentials=credentials)) channel = connection.channel() # 宣告佇列沒有指定名字,指定了exchange channel.exchange_declare(exchange='m3', exchange_type='topic') message = "info: asdfasdfasdfsadfasdf World!" channel.basic_publish(exchange='m3', routing_key='qjk.dd.dd', body=message) print(" [x] Sent %r" % message) connection.close()釋出者
# 收不到訊息 import pika credentials = pika.PlainCredentials("admin", "admin") connection = pika.BlockingConnection(pika.ConnectionParameters('10.0.0.200', credentials=credentials)) channel = connection.channel() channel.exchange_declare(exchange='m3', exchange_type='topic') result = channel.queue_declare(queue='', exclusive=True) queue_name = result.method.queue print(queue_name) channel.queue_bind(exchange='m3', queue=queue_name, routing_key='qjk.*') print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r" % body) channel.basic_consume( queue=queue_name, on_message_callback=callback, auto_ack=True) channel.start_consuming()訂閱者1
# 可以收到訊息 import pika credentials = pika.PlainCredentials("admin", "admin") connection = pika.BlockingConnection(pika.ConnectionParameters('10.0.0.200', credentials=credentials)) channel = connection.channel() channel.exchange_declare(exchange='m3', exchange_type='topic') result = channel.queue_declare(queue='', exclusive=True) queue_name = result.method.queue print(queue_name) channel.queue_bind(exchange='m3', queue=queue_name, routing_key='qjk.#') print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r" % body) channel.basic_consume( queue=queue_name, on_message_callback=callback, auto_ack=True) channel.start_consuming()訂閱者2
五、三種方式實現RPC
1.RabbitMQ實現RPC
import pika import uuid class FibonacciRpcClient(object): def __init__(self): self.credentials = pika.PlainCredentials("admin", "admin") self.connection = pika.BlockingConnection(pika.ConnectionParameters('10.0.0.200', credentials=self.credentials)) self.channel = self.connection.channel() result = self.channel.queue_declare(queue='', exclusive=True) self.callback_queue = result.method.queue self.channel.basic_consume( queue=self.callback_queue, on_message_callback=self.on_response, auto_ack=True) def on_response(self, ch, method, props, body): if self.corr_id == props.correlation_id: self.response = body def call(self, n): self.response = None self.corr_id = str(uuid.uuid4()) self.channel.basic_publish( exchange='', routing_key='rpc_queue', properties=pika.BasicProperties( reply_to=self.callback_queue, correlation_id=self.corr_id, ), body=str(n)) while self.response is None: self.connection.process_data_events() return int(self.response) fibonacci_rpc = FibonacciRpcClient() print(" [x] Requesting fib(30)") response = fibonacci_rpc.call(20) # 外界看上去,就像呼叫本地的call()函式一樣 print(" [.] Got %r" % response)客戶端
import pika credentials = pika.PlainCredentials("admin", "admin") connection = pika.BlockingConnection(pika.ConnectionParameters('10.0.0.200', credentials=credentials)) channel = connection.channel() channel.queue_declare(queue='rpc_queue') def fib(n): if n == 0: return 0 elif n == 1: return 1 else: return fib(n - 1) + fib(n - 2) def on_request(ch, method, props, body): n = int(body) print(" [.] fib(%s)" % n) response = fib(n) ch.basic_publish(exchange='', routing_key=props.reply_to, properties=pika.BasicProperties(correlation_id= \ props.correlation_id), body=str(response)) ch.basic_ack(delivery_tag=method.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume(queue='rpc_queue', on_message_callback=on_request) print(" [x] Awaiting RPC requests") channel.start_consuming()服務端
2.內建包SimpleXMLRPCServer實現
from xmlrpc.client import ServerProxy # SimpleXMLRPCServer def xmlrpc_client(): print('xmlrpc client') c = ServerProxy('http://localhost:4242') data = {'client:' + str(i): i for i in range(100)} start = time.clock() for i in range(100): a = c.getObj() print(a) for i in range(100): c.sendObj(data) print('xmlrpc total time %s' % (time.clock() - start)) if __name__ == '__main__': xmlrpc_client()客戶端
from xmlrpc.server import SimpleXMLRPCServer class RPCServer(object): def __init__(self): super(RPCServer, self).__init__() print(self) self.send_data = {'server:' + str(i): i for i in range(100)} self.recv_data = None def getObj(self): print('get data') return self.send_data def sendObj(self, data): print('send data') self.recv_data = data print(self.recv_data) # SimpleXMLRPCServer server = SimpleXMLRPCServer(('localhost', 4242), allow_none=True) server.register_introspection_functions() server.register_instance(RPCServer()) server.serve_forever()服務端
3.ZeroRPC實現
安裝:pip3 install zerorpc
import zerorpc import time # zerorpc def zerorpc_client(): print('zerorpc client') c = zerorpc.Client() c.connect('tcp://127.0.0.1:4243') data = {'client:' + str(i): i for i in range(100)} start = time.clock() for i in range(100): a = c.getObj() print(a) for i in range(100): c.sendObj(data) print('total time %s' % (time.clock() - start)) if __name__ == '__main__': zerorpc_client()客戶端
import zerorpc class RPCServer(object): def __init__(self): super(RPCServer, self).__init__() print(self) self.send_data = {'server:' + str(i): i for i in range(100)} self.recv_data = None def getObj(self): print('get data') return self.send_data def sendObj(self, data): print('send data') self.recv_data = data print(self.recv_data) # zerorpc s = zerorpc.Server(RPCServer()) s.bind('tcp://0.0.0.0:4243') s.run()服務端