redis的訊息佇列和釋出訂閱demo
阿新 • • 發佈:2019-01-07
以前做online judge的時候用mysql+時間戳做訊息佇列,現在redis提供了一種現成的訊息佇列的模式,使用redis佇列可以直接模擬訊息通訊的方式,在將併發轉化為非併發時非常有用,同時通訊的雙方不需要關注彼此的資訊,實現解耦合。比如使用者提交了程式碼,我後臺往訊息佇列壓入題號,使用者號,提交號,剩下的就交給判題指令碼處理,判題指令碼不需要了解其他資訊,同時後臺也不需要等待判題指令碼執行結束,而是直接返回,避免了長時間佔用連線。而判題指令碼則是不停地從佇列中取資訊,並進行判題操作,同時及時更細資料庫資訊,標記此次判題的結果。
程式碼需要匯入jedis的包.
producer.java(生產者)
consumer.java(消費者)package redis; import redis.clients.jedis.Jedis; public class producer implements Runnable{ Jedis jedis=null; public producer(){ jedis=new Jedis("127.0.0.1",6379); } public void run(){//生產者不停生產 while(true){ String temp=String.valueOf((int)(Math.random()*1000)); jedis.lpush("redis-test2",temp);//創造內容 System.out.println("放入元素"+temp); try { Thread.sleep(1000); } catch (InterruptedException e) { System.out.println("生產者產生物品失敗"); e.printStackTrace(); } } } }
使用釋出/訂閱模式,注意訂閱是個阻塞方法,需要開啟個執行緒單獨處理package redis; import java.util.List; import redis.clients.jedis.Jedis; public class consumer implements Runnable{ Jedis jedis=null; public consumer(){ jedis=new Jedis("127.0.0.1",6379); } @Override public void run(){//不停地消費物品 while(true){ List<String>list=jedis.brpop(1,"redis-test2"); System.out.println("取出元素:"+list);//第一個引數代表阻塞幾秒鐘如果佇列仍未空,則返回null,如果為0代表隊列為空則始終阻塞著 if (list!=null){//返回結果是一個列表如果不為空,下標0是佇列名,下標1是取出的元素,如果為空是null try { Thread.sleep(1000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } } public static void main(String[] args) { consumer c=new consumer(); producer p=new producer(); Thread t1=new Thread(c); t1.start(); Thread t2=new Thread(p); t2.start(); } }
producer2.java(釋出)
package redis;
import redis.clients.jedis.Jedis;
public class producer2 implements Runnable{
String source=null;
Jedis jedis=null;
public producer2(String source){
this.source=source;
jedis=new Jedis("127.0.0.1",6379);
}
@Override
public void run(){
while(true){
String temp=String.valueOf((int)(Math.random()*1000));
System.out.println("準備公佈數字"+temp);
jedis.publish(source,temp);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
System.out.println("釋出出問題了");
e.printStackTrace();
}
}
}
public static void main(String[] args) {
consumer2 m=new consumer2("redis_chat");
Thread t=new Thread(m);
t.start();
producer2 p=new producer2("redis_chat");
Thread t2=new Thread(p);
t2.start();
}
}
訂閱(consumer2.java)
package redis;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPubSub;
public class consumer2 implements Runnable{
String source=null;
Jedis jedis=null;
JedisPubSub jedispubsub=null;
public consumer2(String source){
this.source=source;
jedis=new Jedis("127.0.0.1",6379);
jedispubsub=new JedisPubSub() {
@Override
public void onPMessage(String arg0, String arg1, String arg2) {
}
@Override
public void onPSubscribe(String arg0, int arg1) {
// TODO Auto-generated method stub
}
@Override
public void onPUnsubscribe(String arg0, int arg1) {
// TODO Auto-generated method stub
}
@Override
public void onSubscribe(String arg0, int arg1) {
// TODO Auto-generated method stub
}
@Override
public void onUnsubscribe(String arg0, int arg1) {
// TODO Auto-generated method stub
}
@Override
public void onMessage(String arg0, String arg1) {
System.out.println("獲得資訊:"+arg0+arg1);
}
};
}
@Override
public void run(){
jedis.subscribe(jedispubsub,source);
}
}