1. 程式人生 > >redis的訊息佇列和釋出訂閱demo

redis的訊息佇列和釋出訂閱demo

以前做online judge的時候用mysql+時間戳做訊息佇列,現在redis提供了一種現成的訊息佇列的模式,使用redis佇列可以直接模擬訊息通訊的方式,在將併發轉化為非併發時非常有用,同時通訊的雙方不需要關注彼此的資訊,實現解耦合。比如使用者提交了程式碼,我後臺往訊息佇列壓入題號,使用者號,提交號,剩下的就交給判題指令碼處理,判題指令碼不需要了解其他資訊,同時後臺也不需要等待判題指令碼執行結束,而是直接返回,避免了長時間佔用連線。而判題指令碼則是不停地從佇列中取資訊,並進行判題操作,同時及時更細資料庫資訊,標記此次判題的結果。

程式碼需要匯入jedis的包.

producer.java(生產者)

package redis;


import redis.clients.jedis.Jedis;

public class producer implements Runnable{
  Jedis jedis=null;
   public producer(){
	jedis=new Jedis("127.0.0.1",6379);
   }
   public void run(){//生產者不停生產
	 while(true){
		String temp=String.valueOf((int)(Math.random()*1000));
		jedis.lpush("redis-test2",temp);//創造內容 
		System.out.println("放入元素"+temp);
	    try {
			Thread.sleep(1000);
		} catch (InterruptedException e) {
			System.out.println("生產者產生物品失敗");
			e.printStackTrace();
		}
	 }
   }
  
}
consumer.java(消費者)
package redis;

import java.util.List;

import redis.clients.jedis.Jedis;

public class consumer implements Runnable{
	Jedis jedis=null;
	public consumer(){
		jedis=new Jedis("127.0.0.1",6379);
	}
	@Override
	public void run(){//不停地消費物品
		while(true){
		List<String>list=jedis.brpop(1,"redis-test2"); 
		System.out.println("取出元素:"+list);//第一個引數代表阻塞幾秒鐘如果佇列仍未空,則返回null,如果為0代表隊列為空則始終阻塞著
		if (list!=null){//返回結果是一個列表如果不為空,下標0是佇列名,下標1是取出的元素,如果為空是null
		try {
			Thread.sleep(1000);
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		}
		}
	}
	public static void main(String[] args) {
		consumer c=new consumer();
		producer p=new producer();
		Thread t1=new Thread(c);
		t1.start();
		Thread t2=new Thread(p);
		t2.start();
	}

}
使用釋出/訂閱模式,注意訂閱是個阻塞方法,需要開啟個執行緒單獨處理

producer2.java(釋出)

package redis;

import redis.clients.jedis.Jedis;

public class producer2 implements Runnable{
    String source=null;
    Jedis jedis=null;
    public producer2(String source){
        this.source=source;
        jedis=new Jedis("127.0.0.1",6379);
    }
    @Override
    public void run(){
    
        while(true){
            String temp=String.valueOf((int)(Math.random()*1000));
            System.out.println("準備公佈數字"+temp);
            jedis.publish(source,temp);
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                System.out.println("釋出出問題了");
                e.printStackTrace();
            }
        }
    }
   public static void main(String[] args) {
    consumer2 m=new consumer2("redis_chat");
    Thread t=new Thread(m);
    t.start();
    producer2 p=new producer2("redis_chat");
    Thread t2=new Thread(p);
    t2.start();
}
}



訂閱(consumer2.java)
package redis;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPubSub;

public class consumer2 implements Runnable{
	String source=null;
	Jedis jedis=null;
	JedisPubSub jedispubsub=null;
    public consumer2(String source){
    this.source=source;
    jedis=new Jedis("127.0.0.1",6379);
    jedispubsub=new JedisPubSub() {
    @Override
	public void onPMessage(String arg0, String arg1, String arg2) {
		
		
	}

	@Override
	public void onPSubscribe(String arg0, int arg1) {
		// TODO Auto-generated method stub
		
	}

	@Override
	public void onPUnsubscribe(String arg0, int arg1) {
		// TODO Auto-generated method stub
		
	}

	@Override
	public void onSubscribe(String arg0, int arg1) {
		// TODO Auto-generated method stub
		
	}

	@Override
	public void onUnsubscribe(String arg0, int arg1) {
		// TODO Auto-generated method stub
		
	}

	@Override
	public void onMessage(String arg0, String arg1) {
		System.out.println("獲得資訊:"+arg0+arg1);
		
	}
};
}   
    @Override
    public void run(){
    	jedis.subscribe(jedispubsub,source);
    }
}