1. 程式人生 > 實用技巧 >Java--Object類

Java--Object類

一、簡介

  訊息佇列就是基礎資料結構中的“先進先出”的一種資料機構,

  RabbitMQ是一個開源的AMQP實現,伺服器端用Erlang語言編寫,支援多種客戶端,如:Python、Ruby、Java、JMS、C等,支援AJAX。

  用於在分散式系統中儲存轉發訊息,在易用性、擴充套件性、高可用性等方面表現不俗。

二、RabbitMQ的作用

  1.主要應用於應用解耦

以電商應用為例,應用中有訂單系統、庫存系統、物流系統、支付系統。使用者建立訂單後,如果耦合呼叫庫存系統、物流系統、支付系統,任何一個子系統出了故障,都會造成下單操作異常。

當轉變成基於訊息佇列的方式後,系統間呼叫的問題會減少很多,比如物流系統因為發生故障,需要幾分鐘來修復。在這幾分鐘的時間裡,物流系統要處理的記憶體被快取在訊息佇列中,使用者
的下單操作可以正常完成。當物流系統恢復後,繼續處理訂單資訊即可,中單使用者感受不到物流系統的故障。提升系統的可用性

        

  2.流量削峰

舉個栗子,如果訂單系統最多能處理一萬次訂單,這個處理能力應付正常時段的下單時綽綽有餘,正常時段我們下單一秒後就能返回結果。但是在高峰期,如果有兩萬次下單作業系統
是處理不了的,只能限制訂單超過一萬後不允許使用者下單。 使用訊息佇列做緩衝,我們可以取消這個限制,把一秒內下的訂單分散成一段時間來處理,這事有些使用者可能在下單十幾秒後才能收到下單成功的操作,但是比不能下單的體驗要好。

  3.訊息分發

    生產者只負責生產訊息,消費者只要監聽了生產者 ,那麼生產者發的訊息就能被接收。

      

  4.非同步訊息

使用訊息匯流排,可以很方便解決這個問題,A呼叫B服務後,只需要監聽B處理完成的訊息,當B處理完成後,會發送一條訊息給MQ,MQ會將此訊息轉發給A服務。

這樣A服務既不用迴圈呼叫B的查詢api,也不用提供callback api。同樣B服務也不用做這些操作。A服務還能及時的得到非同步處理成功的訊息

    

  5.常見訊息佇列及比較

    

    總結:

Kafka在於分散式架構,RabbitMQ基於AMQP協議來實現,RocketMQ/思路來源於kafka,改成了主從結構,在事務性可靠性方面做了優化。廣泛來說,電商、
金融等對事務性要求很高的,可以考慮RabbitMQ和RocketMQ,對效能要求高的可考慮Kafka

三、安裝

  服務端安裝:

# 原生:
# 安裝配置epel源
# 安裝erlang
yum -y install erlang
# 安裝RabbitMQ
yum -y install rabbitmq-server


# 使用Docker
docker pull rabbitmq:management
docker run -di --name Myrabbitmq -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -p 15672:15672 -p 5672:5672 rabbitmq:management

  客戶端:

pip3 install pika

四、使用

  基本使用:

# 生產者

import pika

# 拿到連線物件
# 有使用者名稱密碼的情況
# connection = pika.BlockingConnection(pika.ConnectionParameters('10.0.0.200'))

credentials = pika.PlainCredentials("admin", "admin")
connection = pika.BlockingConnection(pika.ConnectionParameters('10.0.0.200', credentials=credentials))

# 拿到channel物件
channel = connection.channel()

# 宣告一個佇列
channel.queue_declare(queue='hello')  # 指定佇列名字

# 生產者向佇列中放一條訊息
channel.basic_publish(exchange='',
                      routing_key='hello',
                      body='hello world')
print(" Sent 'Hello World!'")
# 關閉連線
connection.close()
# 消費者
import pika


def main():
    # 無密碼
    # connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.0.0.200'))
    credentials = pika.PlainCredentials("admin", "admin")
    connection = pika.BlockingConnection(pika.ConnectionParameters('10.0.0.200', credentials=credentials))
    channel = connection.channel()

    channel.queue_declare(queue='hello')

    def callback(ch, method, properties, body):
        print(" [x] Received %r" % body)

    channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)

    channel.start_consuming()


if __name__ == '__main__':
    main()

  訊息確認(保障安全)

import pika

# 無密碼
# connection = pika.BlockingConnection(pika.ConnectionParameters('127.0.0.1'))

# 有密碼
credentials = pika.PlainCredentials("admin", "admin")
connection = pika.BlockingConnection(pika.ConnectionParameters('10.0.0.200', credentials=credentials))
channel = connection.channel()
# 宣告一個佇列(建立一個佇列)
channel.queue_declare(queue='qjk')

channel.basic_publish(exchange='',
                      routing_key='qjk',  # 訊息佇列名稱
                      body='hello world')
connection.close()
生產者
import pika


def main():
    credentials = pika.PlainCredentials("admin", "admin")
    connection = pika.BlockingConnection(pika.ConnectionParameters('10.0.0.200', credentials=credentials))
    channel = connection.channel()

    # 宣告一個佇列(建立一個佇列)
    channel.queue_declare(queue='qjk')

    def callback(ch, method, properties, body):
        print("消費者接受到了任務: %r" % body)
        # 真正的訊息處理完了,再發確認
        ch.basic_ack(delivery_tag=method.delivery_tag)

    # 不會自動回覆確認訊息,
    # auto_ack=True,佇列收到確認,就會自動把消費過的訊息刪除
    channel.basic_consume(queue='qjk', on_message_callback=callback, auto_ack=False)

    channel.start_consuming()


if __name__ == '__main__':
    main()
消費者

  持久化

import pika

# 無密碼
# connection = pika.BlockingConnection(pika.ConnectionParameters('127.0.0.1'))

# 有密碼
credentials = pika.PlainCredentials("admin", "admin")
connection = pika.BlockingConnection(pika.ConnectionParameters('10.0.0.200', credentials=credentials))
channel = connection.channel()
# 宣告一個佇列(建立一個佇列)
channel.queue_declare(queue='qjk', durable=True)  # 指定佇列持久化

channel.basic_publish(exchange='',
                      routing_key='qjk',  # 訊息佇列名稱
                      body='hello world',
                      properties=pika.BasicProperties(
                          delivery_mode=2, )  # 指定訊息持久化
                      )
connection.close()
生產者
import pika


def main():
    credentials = pika.PlainCredentials("admin", "admin")
    connection = pika.BlockingConnection(pika.ConnectionParameters('10.0.0.200', credentials=credentials))
    channel = connection.channel()

    # 宣告一個佇列(建立一個佇列)
    channel.queue_declare(queue='qjk', durable=True)  # 指定佇列持久化

    def callback(ch, method, properties, body):
        print("消費者接受到了任務: %r" % body)
        # 真正的訊息處理完了,再發確認
        ch.basic_ack(delivery_tag=method.delivery_tag)

    # 不會自動回覆確認訊息,
    # auto_ack=True,佇列收到確認,就會自動把消費過的訊息刪除
    channel.basic_consume(queue='qjk', on_message_callback=callback, auto_ack=False)

    channel.start_consuming()


if __name__ == '__main__':
    main()
消費者

  閒置消費

import pika

# 無密碼
# connection = pika.BlockingConnection(pika.ConnectionParameters('127.0.0.1'))

# 有密碼
credentials = pika.PlainCredentials("admin", "admin")
connection = pika.BlockingConnection(pika.ConnectionParameters('10.0.0.200', credentials=credentials))
channel = connection.channel()
# 宣告一個佇列(建立一個佇列)
channel.queue_declare(queue='qjk', durable=True)  # 指定佇列持久化

channel.basic_publish(exchange='',
                      routing_key='qjk',  # 訊息佇列名稱
                      body='hello world',
                      properties=pika.BasicProperties(
                          delivery_mode=2, )  # 指定訊息持久化
                      )
connection.close()
生產者
import pika


def main():
    credentials = pika.PlainCredentials("admin", "admin")
    connection = pika.BlockingConnection(pika.ConnectionParameters('10.0.0.200', credentials=credentials))
    channel = connection.channel()

    # 宣告一個佇列(建立一個佇列)
    channel.queue_declare(queue='qjk')

    def callback(ch, method, properties, body):
        print("消費者接受到了任務: %r" % body)
        # 真正的訊息處理完了,再發確認
        ch.basic_ack(delivery_tag=method.delivery_tag)

    # 不會自動回覆確認訊息,
    # auto_ack=True,佇列收到確認,就會自動把消費過的訊息刪除
    channel.basic_consume(queue='qjk', on_message_callback=callback, auto_ack=False)
    channel.basic_qos(prefetch_count=1)  # 誰閒置誰獲取,沒必要按照順序一個一個來

    channel.start_consuming()


if __name__ == '__main__':
    main()
消費者

  釋出訂閱

import pika

credentials = pika.PlainCredentials("admin", "admin")
connection = pika.BlockingConnection(pika.ConnectionParameters('10.0.0.200', credentials=credentials))
channel = connection.channel()

# 宣告佇列沒有指定名字,指定了exchange
channel.exchange_declare(exchange='logs', exchange_type='fanout')

message = "info: Hello World!"
channel.basic_publish(exchange='logs', routing_key='', body=message)
print(" [x] Sent %r" % message)
connection.close()
釋出者
# 起多個都能收到訊息

import pika

credentials = pika.PlainCredentials("admin", "admin")
connection = pika.BlockingConnection(pika.ConnectionParameters('10.0.0.200', credentials=credentials))
channel = connection.channel()

channel.exchange_declare(exchange='logs', exchange_type='fanout')

result = channel.queue_declare(queue='', exclusive=True)
# 隨機的名字
queue_name = result.method.queue
print(queue_name)

# 繫結
channel.queue_bind(exchange='logs', queue=queue_name)

print(' [*] Waiting for logs. To exit press CTRL+C')


def callback(ch, method, properties, body):
    print(" [x] %r" % body)


channel.basic_consume(
    queue=queue_name, on_message_callback=callback, auto_ack=True)

channel.start_consuming()
訂閱者

  釋出訂閱(高階之Routing按關鍵字匹配)

import pika

credentials = pika.PlainCredentials("admin", "admin")
connection = pika.BlockingConnection(pika.ConnectionParameters('10.0.0.200', credentials=credentials))
channel = connection.channel()

# 宣告佇列沒有指定名字,指定了exchange
channel.exchange_declare(exchange='qjk123', exchange_type='direct')

message = "info: asdfasdfasdfsadfasdf World!"
channel.basic_publish(exchange='qjk123', routing_key='bnb', body=message)  # 指定關鍵字
print(" [x] Sent %r" % message)
connection.close()
釋出者
import pika

credentials = pika.PlainCredentials("admin", "admin")
connection = pika.BlockingConnection(pika.ConnectionParameters('10.0.0.200', credentials=credentials))
channel = connection.channel()

channel.exchange_declare(exchange='qjk123', exchange_type='direct')

result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
print(queue_name)

channel.queue_bind(exchange='qjk123', queue=queue_name, routing_key='nb')

print(' [*] Waiting for logs. To exit press CTRL+C')


def callback(ch, method, properties, body):
    print(" [x] %r" % body)


channel.basic_consume(
    queue=queue_name, on_message_callback=callback, auto_ack=True)

channel.start_consuming()
訂閱者1
import pika

credentials = pika.PlainCredentials("admin", "admin")
connection = pika.BlockingConnection(pika.ConnectionParameters('10.0.0.200', credentials=credentials))
channel = connection.channel()

channel.exchange_declare(exchange='qjk123', exchange_type='direct')

result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
print(queue_name)

channel.queue_bind(exchange='qjk123', queue=queue_name, routing_key='nb')
channel.queue_bind(exchange='qjk123', queue=queue_name, routing_key='bnb')

print(' [*] Waiting for logs. To exit press CTRL+C')


def callback(ch, method, properties, body):
    print(" [x] %r" % body)


channel.basic_consume(
    queue=queue_name, on_message_callback=callback, auto_ack=True)

channel.start_consuming()
訂閱者2

  釋出訂閱高階之Topic(按關鍵字模糊匹配)   

    * 只能加一個單詞

    # 可以加任意單詞字元

import pika


credentials = pika.PlainCredentials("admin", "admin")
connection = pika.BlockingConnection(pika.ConnectionParameters('10.0.0.200', credentials=credentials))
channel = connection.channel()

# 宣告佇列沒有指定名字,指定了exchange
channel.exchange_declare(exchange='m3', exchange_type='topic')

message = "info: asdfasdfasdfsadfasdf World!"
channel.basic_publish(exchange='m3', routing_key='qjk.dd.dd', body=message)
print(" [x] Sent %r" % message)
connection.close()
釋出者
# 收不到訊息
import pika

credentials = pika.PlainCredentials("admin", "admin")
connection = pika.BlockingConnection(pika.ConnectionParameters('10.0.0.200', credentials=credentials))
channel = connection.channel()

channel.exchange_declare(exchange='m3', exchange_type='topic')

result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
print(queue_name)

channel.queue_bind(exchange='m3', queue=queue_name, routing_key='qjk.*')

print(' [*] Waiting for logs. To exit press CTRL+C')


def callback(ch, method, properties, body):
    print(" [x] %r" % body)


channel.basic_consume(
    queue=queue_name, on_message_callback=callback, auto_ack=True)

channel.start_consuming()
訂閱者1
# 可以收到訊息
import pika

credentials = pika.PlainCredentials("admin", "admin")
connection = pika.BlockingConnection(pika.ConnectionParameters('10.0.0.200', credentials=credentials))
channel = connection.channel()

channel.exchange_declare(exchange='m3', exchange_type='topic')

result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
print(queue_name)

channel.queue_bind(exchange='m3', queue=queue_name, routing_key='qjk.#')
print(' [*] Waiting for logs. To exit press CTRL+C')


def callback(ch, method, properties, body):
    print(" [x] %r" % body)


channel.basic_consume(
    queue=queue_name, on_message_callback=callback, auto_ack=True)

channel.start_consuming()
訂閱者2

五、三種方式實現RPC

  1.RabbitMQ實現RPC

import pika
import uuid


class FibonacciRpcClient(object):

    def __init__(self):

        self.credentials = pika.PlainCredentials("admin", "admin")
        self.connection = pika.BlockingConnection(pika.ConnectionParameters('10.0.0.200', credentials=self.credentials))
        self.channel = self.connection.channel()

        result = self.channel.queue_declare(queue='', exclusive=True)
        self.callback_queue = result.method.queue

        self.channel.basic_consume(
            queue=self.callback_queue,
            on_message_callback=self.on_response,
            auto_ack=True)

    def on_response(self, ch, method, props, body):
        if self.corr_id == props.correlation_id:
            self.response = body

    def call(self, n):
        self.response = None
        self.corr_id = str(uuid.uuid4())
        self.channel.basic_publish(
            exchange='',
            routing_key='rpc_queue',
            properties=pika.BasicProperties(
                reply_to=self.callback_queue,
                correlation_id=self.corr_id,
            ),
            body=str(n))
        while self.response is None:
            self.connection.process_data_events()
        return int(self.response)


fibonacci_rpc = FibonacciRpcClient()

print(" [x] Requesting fib(30)")
response = fibonacci_rpc.call(20)  # 外界看上去,就像呼叫本地的call()函式一樣
print(" [.] Got %r" % response)
客戶端
import pika

credentials = pika.PlainCredentials("admin", "admin")
connection = pika.BlockingConnection(pika.ConnectionParameters('10.0.0.200', credentials=credentials))
channel = connection.channel()

channel.queue_declare(queue='rpc_queue')


def fib(n):
    if n == 0:
        return 0
    elif n == 1:
        return 1
    else:
        return fib(n - 1) + fib(n - 2)


def on_request(ch, method, props, body):
    n = int(body)

    print(" [.] fib(%s)" % n)
    response = fib(n)

    ch.basic_publish(exchange='',
                     routing_key=props.reply_to,
                     properties=pika.BasicProperties(correlation_id= \
                                                         props.correlation_id),
                     body=str(response))
    ch.basic_ack(delivery_tag=method.delivery_tag)


channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='rpc_queue', on_message_callback=on_request)

print(" [x] Awaiting RPC requests")
channel.start_consuming()
服務端

  2.內建包SimpleXMLRPCServer實現

from xmlrpc.client import ServerProxy


# SimpleXMLRPCServer
def xmlrpc_client():
    print('xmlrpc client')
    c = ServerProxy('http://localhost:4242')
    data = {'client:' + str(i): i for i in range(100)}
    start = time.clock()
    for i in range(100):
        a = c.getObj()
        print(a)
    for i in range(100):
        c.sendObj(data)
    print('xmlrpc total time %s' % (time.clock() - start))


if __name__ == '__main__':
    xmlrpc_client()
客戶端
from xmlrpc.server import SimpleXMLRPCServer


class RPCServer(object):

    def __init__(self):
        super(RPCServer, self).__init__()
        print(self)
        self.send_data = {'server:' + str(i): i for i in range(100)}
        self.recv_data = None

    def getObj(self):
        print('get data')
        return self.send_data

    def sendObj(self, data):
        print('send data')
        self.recv_data = data
        print(self.recv_data)


# SimpleXMLRPCServer
server = SimpleXMLRPCServer(('localhost', 4242), allow_none=True)
server.register_introspection_functions()
server.register_instance(RPCServer())
server.serve_forever()
服務端

  3.ZeroRPC實現

    安裝:pip3 install zerorpc

import zerorpc
import time


# zerorpc
def zerorpc_client():
    print('zerorpc client')
    c = zerorpc.Client()
    c.connect('tcp://127.0.0.1:4243')
    data = {'client:' + str(i): i for i in range(100)}
    start = time.clock()
    for i in range(100):
        a = c.getObj()
        print(a)
    for i in range(100):
        c.sendObj(data)

    print('total time %s' % (time.clock() - start))


if __name__ == '__main__':
    zerorpc_client()
客戶端
import zerorpc


class RPCServer(object):

    def __init__(self):
        super(RPCServer, self).__init__()
        print(self)
        self.send_data = {'server:' + str(i): i for i in range(100)}
        self.recv_data = None

    def getObj(self):
        print('get data')
        return self.send_data

    def sendObj(self, data):
        print('send data')
        self.recv_data = data
        print(self.recv_data)


# zerorpc
s = zerorpc.Server(RPCServer())
s.bind('tcp://0.0.0.0:4243')
s.run()
服務端