1. 程式人生 > 其它 >Python實現目錄檔案的全量和增量備份

Python實現目錄檔案的全量和增量備份

常識

  • 開發語言是erlang,安裝的話要先安裝語言環境
  • 中介軟體要考慮的:
    1. 持久化
    2. 高可用(叢集,主從)
  • kafka效能最高
  • 名詞
    1. Broker / RabbitMQ Server / Message Broker 接受和分發訊息的應用
    2. Exchange 一個交換機可以繫結多個佇列
    3. Queue 交換機和佇列在Broker裡
    4. Producer
    5. Consumer
    6. connection c/s之間的一個tcp/ip連結
    7. Channel 一個連結有多個通道,通道是為了減少連結開銷,是一種邏輯連結
    8. vhost 一個vhost裡有交換機和佇列

AMQP協議

協議是在 tcp/ip協議基礎之上約定的,tcp/ip過於簡單,就是在tcp/ip上做個封裝(http也不符合需求)

  • MQTT協議
  • OpenMana
  • Kafka協議

訊息持久化

支援檔案儲存(自己定義的檔案儲存格式來做持久化)

訊息分發機制

RabbitMQ kafka
釋出訂閱
輪訓分發(公平)
公平分發(能者多勞)
重發
訊息拉取

簡單佇列模式

getChannel

factory = new connectionFactory()
factory  -> setHost  setUsername  setPassword
Connection conn = factory.newConn()
Channel chan = conn.newChan()

生產者:

  1. 哪個佇列declQueue
chan = getChan()
chan.declQueue(queue_name,isdurable,exclusive訊息共享,autoDelete自動刪除,arguments其他引數)
Message = "hello"
chan.publish(exchange哪個交換機,queue_name 路由key,props 其他引數,message.getBytes()訊息體)

消費者:

  1. 哪個佇列
  2. 消費成功是否自動應答
  3. 成功和不成功的回撥
chan = getChan()
chan.Consume(queue,true,declconsume,sucess)

工作佇列 Working Queues

多個工作執行緒處理一個訊息佇列(一個訊息只能處理一次),採用輪詢分發

一樣的,多開幾個消費執行緒就是

訊息應答

消費者把訊息處理完了之後,給佇列一個應答,然後佇列才刪除訊息

  1. 自動應答
    只要他接到訊息立馬給應答,但是其實後續還有很多處理
    所以在高吞吐量的情況下還是可能丟失訊息
  2. 手動應答
    可以批量應答multiple(跟網路那個批量確認差不多,建議為false)
    basicAck肯定
void deliverMessage(consumeTag, message){
    // do something .....
    message.getbody()
    // over
    chan.basicAck(message.getEnvelop(),message.getTag(),multiple=false)
}
chan.consume(autoAck=false,deliverMessage)

basicNack否定
basicReject否定

重新入隊

如果沒有ack,佇列知道訊息丟了,然後訊息重新入隊,給另一個消費者,總之就是要保證訊息不要丟失

持久化

  1. 佇列持久化
  2. 訊息持久化
channel.basicPublish(exchange,queue_name,messageProperties.PERSISTENT,message.getBytes)

負載均衡

公平分發

能者多勞,在消費者那裡設定這個,誰能消費就把訊息給誰

channel.basicQos(perfectCount=1)

預取值

相當於權值,相當於設定堆積到通道上的訊息有多少條

channel.basicQos(perfectCount=x)

釋出確認

訊息持久化了之後再給生產者發訊息確認

單個

同步的,發一個確認一個,上一個沒確認下一個也不發。缺點明顯,就是慢

while(true){
    channel.publish()
    bool flag = chan.waitForConfirms()
    if(flag){
        printf("sucess")
    }
}

批量

仍然是同步的,但是一旦出問題無法確認是具體哪條訊息出問題(沒有被確認)

int batchSize = 100
for (i:1~1000) {
    chan.publish()
    if(i%batchSize==0){
        chan.waitForConfirms()
    }
}

非同步批量

producer只需要發訊息就夠了,然後是非同步的,broker會對訊息處理

chan.confirmSelect()
ConcurrentSkipListMap<int,String> confirms
//準備訊息監聽器
ackCallback(){
    //收到了,刪除掉已經確認的
    confirms.delete()
}
nackcallback(){
    //沒收到,做些操作,比如放回佇列
}
chan.addConfirmListener(ackCallback,nackCallback) //這是多執行緒
//直接
for (i : message) {
    chan.publish(message)
    confirms.put(num,message)
}

交換機

訊息不會直接傳送給佇列,只能傳送給交換機,交換機拿到訊息,通過RoutingKey來決定把訊息路由到哪個佇列裡
exchange -> RoutingKey -> Queue

臨時佇列

不帶有持久化,佇列名稱讓伺服器來給我們隨機出來,斷開連線之後,臨時佇列自動銷燬

string queue_name = chan.declQueue().getQueue()

Fanout

廣播,交換機把訊息廣播到所有佇列

chan = getChan()
chan.exchangeDecl(exchange_name, exchange_type="fanout")
chan.queueDecl().getQueue()
chan.queueBind(queue_name,exchange_name,routingKey)

chan.publish(exchange_name,routing_key,props,)

Direct

就是繫結的RoutingKey不一樣,可以多重繫結啥的

chan = getChan()
chan.exchangeDecl(exchange_name, exchange_type="fanout")
chan.queueDecl(queue_name1,callback)
chan.queueDecl(queue_name2,callback)
chan.queueBind(queue_name1,exchange_name,routingKey)
chan.queueBind(queue_name2,exchange_name,routingKey)

chan.publish(exchange_name,routing_key,props,)

Topic

類似於正則表示式

死信

訊息在訊息佇列裡,但是出於某些原因無法被消費者消費,為了防止這些訊息過期,需要把他們放到死信佇列裡

  • 來源
  1. 訊息TTL過期
  2. 佇列滿了,加不了新訊息
  3. basic.reject,basic.nack requeue=false
  • dead_exchange和dead_queue
chan = getChan()
chan.exchangeDel(normal, type=direct)
chan.exchangeDel(dead, type=direct)
map<string, object> arguments = new hashMap  //引數設定
arguments.put("x-dead-letter-exchange",dead_exchange)  //第一個引數是固定的
arguments.put("x-dead-letter-routing-key","routing_key") 
chan.queueDecl(normal_queue_name,durable,exclusive,autoDelete,arguments=arguments)

延遲佇列

結構和死信的ttl狀態很像,就是normal_queue沒有消費者

  • 使用場景
    1. 某個事件發生後一段時間,某個事件發生前一段時間 需要做某件事
    2. 訂單十分鐘內未支付,註冊成功三天沒登入,預定會議前10分提醒
arguments.put("x-dead-letter-exchange",dead_exchange)  //第一個引數是固定的,指定死信佇列
arguments.put("x-dead-letter-routing-key","routing_key")
arguments.put("x-message-ttl",4000)  //第一個引數是固定的