1. 程式人生 > >多程序訊息佇列

多程序訊息佇列

  1. 多程序的訊息佇列
    訊息佇列”是在訊息的傳輸過程中儲存訊息的容器。 訊息佇列最經典的用法就是消費者和生成者之間通過訊息管道來傳遞訊息,消費者和生成者是不通的程序。生產者往管道中寫訊息,消費者從管道中讀訊息。 作業系統提供了很多機制來實現程序間的通訊 ,multiprocessing模組就提供了Queue和Pipe兩種方法來實現。
    使用multiprocessing裡面的Queue來實現訊息佇列。通過Mutiprocess裡面的Pipe來實現訊息佇列: •Pipe方法返回(conn1, conn2)代表一個管道的兩個端。Pipe方法有duplex引數,如果duplex引數為True(預設值),那麼這個管道是全雙工模式,也就是說conn1和conn2均可收發。duplex為False,conn1只負責接受訊息,conn2只負責傳送訊息。 •send和recv方法分別是傳送和接受訊息的方法。close方法表示關閉管道,當訊息接受結束以後,關閉管道。
  2. 訊息佇列pipe
    Python提供了Queue模組來專門實現訊息佇列
    Queue物件

       Queue.qsize():返回訊息佇列的當前空間。返回的值不一定可靠。

       Queue.empty():判斷訊息佇列是否為空,返回True或False。同樣不可靠。

       Queue.full():類似上邊,判斷訊息佇列是否滿

       Queue.put(item, block=True, timeout=None):往訊息佇列中存放訊息。block可以控制是否阻塞,timeout指定阻塞時候的等待時間。如果不阻塞或者超時,會引起一 個fullexception。

       Queue.put_nowait(item):相當於put(item, False).

       Queue.get(block=True, timeout=None):獲取一個訊息,其他同put。
    Queue物件實現一個fifo佇列(其他的還有lifo、priority佇列,這裡不再介紹)。queue只有maxsize一個構造引數,用來指定佇列容量,指定為0的時候代表容量無限。主要有以下成員函式,這兩個函式用來判斷訊息對應的任務是否完成。

       Queue.task_done():接受訊息的執行緒通過呼叫這個函式來說明訊息對應的任務已完成。

       Queue.join(): 實際上意味著等到佇列為空,再執行別的操作。

    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    # @Author: lingxiangxiang
     
    import random,threading,time
    from Queue import Queue
    #Producer thread
    class Producer(threading.Thread):
        def __init__(self, t_name, queue):
            threading.Thread.__init__(self,name=t_name)
            self.data=queue
        def run(self):
            for i in range(10):    #隨機產生10個數字 ,可以修改為任意大小
                # randomnum=random.randint(1,99)
                print "%s: %s is producing %d to the queue!" % (time.ctime(), self.getName(), i)
                self.data.put(i)  #將資料依次存入佇列
                # time.sleep(1)
            print "%s: %s finished!" %(time.ctime(), self.getName())
     
    #Consumer thread
    class Consumer_even(threading.Thread):
        def __init__(self,t_name,queue):
            threading.Thread.__init__(self,name=t_name)
            self.data=queue
        def run(self):
            while 1:
                try:
                    val_even = self.data.get(1,5)  #get(self, block=True, timeout=None) ,1就是阻塞等待,5是超時5秒
                    if val_even%2==0:
                        print "%s: %s is consuming. %d in the queue is consumed!" % (time.ctime(),self.getName(),val_even)
                        time.sleep(2)
                    else:
                        self.data.put(val_even)
                        time.sleep(2)
                except:     #等待輸入,超過5秒  就報異常
                    print "%s: %s finished!" %(time.ctime(),self.getName())
                    break
    class Consumer_odd(threading.Thread):
        def __init__(self,t_name,queue):
            threading.Thread.__init__(self, name=t_name)
            self.data=queue
        def run(self):
            while 1:
                try:
                    val_odd = self.data.get(1,5)
                    if val_odd%2!=0:
                        print "%s: %s is consuming. %d in the queue is consumed!" % (time.ctime(), self.getName(), val_odd)
                        time.sleep(2)
                    else:
                        self.data.put(val_odd)
                        time.sleep(2)
                except:
                    print "%s: %s finished!" % (time.ctime(), self.getName())
                    break
    #Main thread
    def main():
        queue = Queue()
        producer = Producer('Pro.', queue)
        consumer_even = Consumer_even('Con_even.', queue)
        consumer_odd = Consumer_odd('Con_odd.',queue)
        producer.start()
        consumer_even.start()
        consumer_odd.start()
        producer.join()
        consumer_even.join()
        consumer_odd.join()
        print 'All threads terminate!'
     
    if __name__ == '__main__':
    main()



相關推薦

程序訊息佇列

多程序的訊息佇列訊息佇列”是在訊息的傳輸過程中儲存訊息的容器。 訊息佇列最經典的用法就是消費者和生成者之間通過訊息管道來傳遞訊息,消費者和生成者是不通的程序。生產者往管道中寫訊息,消費者從管道中讀訊息。 作業系統提供了很多機制來實現程序間的通訊 ,multiprocessi

python 64式: 第14式、程序佇列與鎖

#!/usr/bin/env python # -*- coding: utf-8 -*- import multiprocessing import time ''' 關鍵: 1 multiprocessing.Process(group=None, target=None, args=(),

Python執行緒/程序操作佇列

最近在做深度學習,需要採用多程序的方式實現資料處理。為了實現資料的快速處理,我先後嘗試了把資料一口氣讀入記憶體、多執行緒和多程序的方式。當然,肯定是多程序雙佇列的方式最好,因為可以充分利用多核和cpu。 一般來說匯入佇列可以這樣操作: import Queue myqueu

vc/mfc 程序訊息佇列,執行緒訊息佇列,和系統訊息佇列,該如何處理

佇列訊息和非佇列訊息   從訊息的傳送途徑來看,訊息可以分成2種:佇列訊息和非佇列訊息。訊息佇列由可以分成系統訊息佇列和執行緒訊息佇列。系統訊息佇列由Windows維護,執行緒訊息佇列則由每個GUI執行緒自己進行維護,為避免給non-GUI現成建立訊息佇列,所有執行緒產生

PHP開發者該知道的程序消費佇列

引言 最近開發一個小功能,用到了佇列mcq,啟動一個程序消費佇列資料,後邊發現一個程序處理不過來了,又加了一個程序,過了段時間又處理不過來了… 這種方式每次都要修改crontab,如果程序掛掉了,不會及時的啟動,要等到下次crontab執行的時候才會啟動。關閉(重啟)程序的時候用的是kill,這可能會丟

Linux 工程式設計——程序間通訊:訊息佇列(Message Queues)

概述 訊息佇列提供了一種在兩個不相關的程序之間傳遞資料的簡單高效的方法,其特點如下: 1)訊息佇列可以實現訊息的隨機查詢。訊息不一定要以先進先出的次序讀取,程式設計時可以按訊息的型別讀取。 2)訊息佇列允許一個或多個程序向它寫入或者讀取訊息。 3)與無名管道、命名管道一

python 程序間通訊 訊息佇列

import multiprocessing import time #使用佇列,將訊息寫進佇列,需要的程序到佇列取 #佇列由父程序建立,子程序共享佇列 def write(qe): print("啟動子程序 write") for chr in ['A','B','C','D

程序程式設計之程序間通訊-管道和訊息佇列

1.程序間通訊 Linux作為一種新興的作業系統,幾乎支援所有的Unix下常用的程序間通訊方法:管道、訊息佇列、共享記憶體、訊號量、套介面等等。 2.管道 管道是程序間通訊中最古老的方式,它包括無名管道(或者匿名管道)和有名管道兩種,前者用於父程序和

python程序通訊之訊息佇列

在linux C中,訊息佇列可以通過key來建立,在使用某個佇列時,可根據key來獲取佇列,進而進行資料的收發;且佇列的前4個位元組,可用於判斷目標程序,不匹配則不接收,也就是多個程序可共用一個佇列進行訊息的收發。 在python中則不同,沒有key,直接get接收。這就要

Linux:使用執行緒程式設計和訊息佇列,實現兩個程序之間的聊天

思路: 一個檔案:建立一個執行緒和主函式,或者建立兩個執行緒主函式呼叫(我用這種)。 建立兩個訊息佇列, 一共兩個檔案,兩個佇列,四個程序 a.c    一個程序寫(訊息型別為1)   ---->>佇列     一個程序讀(訊息型別為2) b.c   一

程序通訊之訊息佇列

下面是原始碼:#include <unistd.h> #include <stdlib.h> #include <stdio.h> #include <string.h> #include <errno.h> #include &

基於訊息佇列程序伺服器

目錄一、思路二、實現2. 修改2.1 思路2.2 程式碼 一、思路 1)server程序接收時, 指定msgtyp為0, 從隊首不斷接收訊息; 2)server程序傳送時, 將mtype指定為接收到的client程序的pid; 3)client程序傳送的時候

Linux-程序通訊-訊息佇列/訊號燈/共享記憶體

訊息佇列     訊息佇列提供了程序間傳送資料塊的方法,每個資料塊都可以被認為是有一個型別,接受者接受的資料塊可以有不同的型別;我們可以通過傳送訊息來避免命名管道的同步和阻塞問題;訊息佇列與命名管道一樣,每個資料塊都有一個最大長度的限制;我們可以將每個資料塊當作是一

Linux:程序間通訊(匿名管道命名管道)(共享記憶體,訊息佇列,訊號量)

目錄 程序間通訊的介紹 管道 匿名管道 原理: 程式碼實現 匿名管道特性 實現管道符 |  命名管道 命名管道特性 程式碼實現 管道讀寫規則 作業系統中ipc的相關命令 共享記憶體(重點) 生命週期: 程式碼實現 程式碼實現獲

作業系統(11)程序--程序通訊:訊號、管道、訊息佇列、共享記憶體

文章目錄 1. 程序通訊相關概念 1. 通訊流程、屬性、鏈路 2. 程序通訊方式:直接通訊、間接通訊 2. 程序通訊的機制 1. 訊號 2. 管道 3. 訊息佇列

Queue訊息佇列和Pool程序

目錄 Queue訊息佇列 Queue常見方法 Pool程序池 Queue訊息佇列 在python中,多個程序之間是無法共享全域性變數的。但有時候我們又必須使用同一些資料。

php程序間通訊--訊息佇列

首先我們來看一下如何建立一個訊息佇列。 //建立訊息佇列 $msg_key = ftok( __FILE__, 'a' ); $msg_queue = msg_get_queue( $msg_key, 0666 );  在php中通過這兩句話就可以建立一個訊息佇列。 ftok 函式,是可以

Linux程序通訊——訊息佇列

總結下別人比較好的博文+自己寫的一個栗子   1.ftok()    https://blog.csdn.net/u013485792/article/details/50764224      &

Linux關於程序間通訊訊息佇列

訊息佇列概念 訊息佇列提供了一個從一個程序向另外一個程序傳送一塊資料的方法 每個資料塊都被認為是有一個型別,接收者程序接收的資料塊可以有不同的型別值 訊息佇列也有管道一樣的不足,就是每個資料塊的最大長度是有上限的,系統上全體佇列的最大總長度也有一個上限 訊息佇列函式操作

Python 訊息佇列rabbitmq使用之工作佇列使用個worker接收訊息

前面已經介紹過怎麼安裝rabbitmq以及要使用的三方庫 因此這裡直接進入例項 1、釋出端程式碼 # new_task.py import pika # 匯入pika import sys