基於redis佇列實現的生產者消費者
一.簡介
基於redis佇列的生產者消費者實現主要是利用redis的blpop或者brpop命令,以下是官方文件對這兩個命令的描述:
BLPOP 是列表的阻塞式(blocking)彈出原語。
它是 LPOP 命令的阻塞版本,當給定列表內沒有任何元素可供彈出的時候,連線將被 BLPOP 命令阻塞,直到等待超時或發現可彈出元素為止。
當給定多個 key
引數時,按引數 key
的先後順序依次檢查各個列表,彈出第一個非空列表的頭元素。
BRPOP的描述差不多,這裡就不重複了。
那麼有了這兩個命令,實現生產者消費者模式就有思路了,我們從外界資料來源不停的傳入資料到redis指定的list裡面,此時不管有沒有消費者,我們的資料是會儲存在list裡的。
然後消費者的程式只需要呼叫blpop命令,如果指定的list裡面有資料,就能從裡面取得list最左邊的資料;如果指定的list裡面沒有資料,那麼就會阻塞在那,直到list裡面來了新資料或者已經達到阻塞時間為止。
二.普通生產者消費者程式碼:
生產者我們就用自己生成的資料模仿。
然後消費者得整合Thread類,重寫run方法,我們可以在run方法裡面寫一些對取出來的資料需要進行的業務操作,我這裡就是簡單的打印出來判斷是否取出資料。public class RedisProducer { public static void main(String[] args) throws InterruptedException { Jedis jedis=JavaRedisUtils.getJedis(); jedis.select(4); int count=0; while(count<100){ Thread.sleep(300); jedis.lpush("mylist",String.valueOf(count)); count++; } jedis.close(); } }
public class Consumer extends Thread{ String name; public Consumer(String name) { this.name = name; } @Override public void run(){ Jedis jedis = JavaRedisUtils.getJedis(); while(true) { jedis.select(4); //阻塞式brpop,List中無資料時阻塞 //引數0表示一直阻塞下去,直到List出現數據 List<String> list = jedis.blpop(0, "mylist"); for(String s : list) { System.out.println(name+" "+s); } jedis.close(); } } }
下面是程式的consumer執行類:
public class RedisConsumer {
public static void main(String[] args) {
Consumer mq1=new Consumer("mq1");
Consumer mq2=new Consumer("mq2");
mq1.start();
mq2.start();
}
下面是程式執行部分結果:
我們可以從結果中看到,我們的消費者是真的取到了資料並且在原始沒有資料的時候,我們的消費者是阻塞了的,直到新資料來臨才繼續取資料。
為了更加方便的觀看到生產者和消費者的程式執行情況,我們將從"mylist"中的消費資料利用redis的brpoplpush命令將資料從mylist消費到各個消費者自己名字的列表中。
下面是brpoplpush的解釋:
當列表 source
為空時, BRPOPLPUSH 命令將阻塞連線,直到等待超時,或有另一個客戶端對 source
執行 LPUSH 或 RPUSH 命令為止。
超時引數 timeout
接受一個以秒為單位的數字作為值。超時引數設為 0
表示阻塞時間可以無限期延長(block indefinitely) 。
- 返回值:
- 假如在指定時間內沒有任何元素被彈出,則返回一個
nil
和等待時長。反之,返回一個含有兩個元素的列表,第一個元素是被彈出元素的值,第二個元素是等待時長。
public class Consumer extends Thread{
String name;
public Consumer(String name) {
this.name = name;
}
@Override
public void run(){
Jedis jedis = JavaRedisUtils.getJedis();
jedis.select(4);
while(true) {
//呼叫brpoplpush方法 從mylist取出來然後放到對應name的list去
String a=jedis.brpoplpush("mylist",name,0);
}
}
}
執行程式之後,redis庫中出現了mq1以及mq2的list,並且他們分別消費了mylist中的所有資料:
以及它們分別消費的數目:
三.在消費過程中新增加消費者
上面我們已經做過實驗了,它能夠做到生產者和消費者能做到的事情:當list沒有資料的時候,消費者會阻塞,當list新來資料的時候,它會接著進行消費。那麼當新來一個新的消費者的時候,它會有什麼變化呢?
新加入消費者的程式碼如下:
public class addconsumer {
public static void main(String[] args) {
Consumer mq3=new Consumer("mq3");
mq3.start();
}
}
下面我們先執行生產者,緊接著執行消費者mq1和mq2,等它們消費一段時間,並且生產者資料還在傳輸的時候,我們開啟消費者mq3。讓我們來看看結果會是怎麼樣。
redis資料庫中產生了三個列表。
它們分別的資料量為:
說明,當新加入消費者的時候,它會和其它兩個消費者內部競爭,然後一起消費沒有消費過的資料。
以上是redis佇列實現的消費者和生產者demo,希望可以給大家提供到幫助。