Python實現目錄檔案的全量和增量備份
阿新 • • 發佈:2021-10-27
常識
- 開發語言是erlang,安裝的話要先安裝語言環境
- 中介軟體要考慮的:
- 持久化
- 高可用(叢集,主從)
- kafka效能最高
- 名詞
- Broker / RabbitMQ Server / Message Broker 接受和分發訊息的應用
- Exchange 一個交換機可以繫結多個佇列
- Queue 交換機和佇列在Broker裡
- Producer
- Consumer
- connection c/s之間的一個tcp/ip連結
- Channel 一個連結有多個通道,通道是為了減少連結開銷,是一種邏輯連結
- vhost 一個vhost裡有交換機和佇列
AMQP協議
協議是在 tcp/ip協議基礎之上約定的,tcp/ip過於簡單,就是在tcp/ip上做個封裝(http也不符合需求)
- MQTT協議
- OpenMana
- Kafka協議
訊息持久化
支援檔案儲存(自己定義的檔案儲存格式來做持久化)
訊息分發機制
RabbitMQ | kafka | |
---|---|---|
釋出訂閱 | √ | √ |
輪訓分發(公平) | √ | √ |
公平分發(能者多勞) | √ | √ |
重發 | √ | |
訊息拉取 | √ | √ |
簡單佇列模式
getChannel
factory = new connectionFactory() factory -> setHost setUsername setPassword Connection conn = factory.newConn() Channel chan = conn.newChan()
生產者:
- 哪個佇列declQueue
chan = getChan()
chan.declQueue(queue_name,isdurable,exclusive訊息共享,autoDelete自動刪除,arguments其他引數)
Message = "hello"
chan.publish(exchange哪個交換機,queue_name 路由key,props 其他引數,message.getBytes()訊息體)
消費者:
- 哪個佇列
- 消費成功是否自動應答
- 成功和不成功的回撥
chan = getChan() chan.Consume(queue,true,declconsume,sucess)
工作佇列 Working Queues
多個工作執行緒處理一個訊息佇列(一個訊息只能處理一次),採用輪詢分發
一樣的,多開幾個消費執行緒就是
訊息應答
消費者把訊息處理完了之後,給佇列一個應答,然後佇列才刪除訊息
- 自動應答
只要他接到訊息立馬給應答,但是其實後續還有很多處理
所以在高吞吐量的情況下還是可能丟失訊息 - 手動應答
可以批量應答multiple(跟網路那個批量確認差不多,建議為false)
basicAck肯定
void deliverMessage(consumeTag, message){
// do something .....
message.getbody()
// over
chan.basicAck(message.getEnvelop(),message.getTag(),multiple=false)
}
chan.consume(autoAck=false,deliverMessage)
basicNack否定
basicReject否定
重新入隊
如果沒有ack,佇列知道訊息丟了,然後訊息重新入隊,給另一個消費者,總之就是要保證訊息不要丟失
持久化
- 佇列持久化
- 訊息持久化
channel.basicPublish(exchange,queue_name,messageProperties.PERSISTENT,message.getBytes)
負載均衡
公平分發
能者多勞,在消費者那裡設定這個,誰能消費就把訊息給誰
channel.basicQos(perfectCount=1)
預取值
相當於權值,相當於設定堆積到通道上的訊息有多少條
channel.basicQos(perfectCount=x)
釋出確認
訊息持久化了之後再給生產者發訊息確認
單個
同步的,發一個確認一個,上一個沒確認下一個也不發。缺點明顯,就是慢
while(true){
channel.publish()
bool flag = chan.waitForConfirms()
if(flag){
printf("sucess")
}
}
批量
仍然是同步的,但是一旦出問題無法確認是具體哪條訊息出問題(沒有被確認)
int batchSize = 100
for (i:1~1000) {
chan.publish()
if(i%batchSize==0){
chan.waitForConfirms()
}
}
非同步批量
producer只需要發訊息就夠了,然後是非同步的,broker會對訊息處理
chan.confirmSelect()
ConcurrentSkipListMap<int,String> confirms
//準備訊息監聽器
ackCallback(){
//收到了,刪除掉已經確認的
confirms.delete()
}
nackcallback(){
//沒收到,做些操作,比如放回佇列
}
chan.addConfirmListener(ackCallback,nackCallback) //這是多執行緒
//直接
for (i : message) {
chan.publish(message)
confirms.put(num,message)
}
交換機
訊息不會直接傳送給佇列,只能傳送給交換機,交換機拿到訊息,通過RoutingKey來決定把訊息路由到哪個佇列裡
exchange -> RoutingKey -> Queue
臨時佇列
不帶有持久化,佇列名稱讓伺服器來給我們隨機出來,斷開連線之後,臨時佇列自動銷燬
string queue_name = chan.declQueue().getQueue()
Fanout
廣播,交換機把訊息廣播到所有佇列
chan = getChan()
chan.exchangeDecl(exchange_name, exchange_type="fanout")
chan.queueDecl().getQueue()
chan.queueBind(queue_name,exchange_name,routingKey)
chan.publish(exchange_name,routing_key,props,)
Direct
就是繫結的RoutingKey不一樣,可以多重繫結啥的
chan = getChan()
chan.exchangeDecl(exchange_name, exchange_type="fanout")
chan.queueDecl(queue_name1,callback)
chan.queueDecl(queue_name2,callback)
chan.queueBind(queue_name1,exchange_name,routingKey)
chan.queueBind(queue_name2,exchange_name,routingKey)
chan.publish(exchange_name,routing_key,props,)
Topic
類似於正則表示式
死信
訊息在訊息佇列裡,但是出於某些原因無法被消費者消費,為了防止這些訊息過期,需要把他們放到死信佇列裡
- 來源
- 訊息TTL過期
- 佇列滿了,加不了新訊息
- basic.reject,basic.nack requeue=false
- dead_exchange和dead_queue
chan = getChan()
chan.exchangeDel(normal, type=direct)
chan.exchangeDel(dead, type=direct)
map<string, object> arguments = new hashMap //引數設定
arguments.put("x-dead-letter-exchange",dead_exchange) //第一個引數是固定的
arguments.put("x-dead-letter-routing-key","routing_key")
chan.queueDecl(normal_queue_name,durable,exclusive,autoDelete,arguments=arguments)
延遲佇列
結構和死信的ttl狀態很像,就是normal_queue沒有消費者
- 使用場景
- 某個事件發生後一段時間,某個事件發生前一段時間 需要做某件事
- 訂單十分鐘內未支付,註冊成功三天沒登入,預定會議前10分提醒
arguments.put("x-dead-letter-exchange",dead_exchange) //第一個引數是固定的,指定死信佇列
arguments.put("x-dead-letter-routing-key","routing_key")
arguments.put("x-message-ttl",4000) //第一個引數是固定的