1. 程式人生 > >redis 初識redis

redis 初識redis

key value型的記憶體資料庫。 沒有持久化情況下 看做是一塊記憶體,所有的資料要設定失效時間。或者 自己記得key 不然資料就會一直在裡面。

背景:
    x省移動臨時提的一個需求 ,使用者上網日誌實時資料 取訪問開始時間 手機號 訪問app的編碼 實時分析每個使用者近一小時訪問次數top10的app,軟體環境:
    
    實時流處理ibm stream平臺叢集環境現成;kafka訊息源topic現成;redis 或 gemfirexd叢集現成(其中redis叢集跑的業務少 gemfirexd跑的較多)
    
    
    資料來源欄位 號碼 訪問開始時間 appId;資料來源規模 230億條記錄/24h ;均值20w/s ;但經過濾一小時前資料後 只有4/5w /s。
    
    初步設想方案
        1 流處理一條條過濾 源資料 剔除 三個欄位任何一個為空的情況,一小時前的資料直接丟棄,給每條資料加上時間片 格式2016-11-15 17:10:00 ,一分鐘加一次。
        然後寫入redis叢集 寫一次 設定下失效時間為一小時 ;然後api端做統計處理。此方法需要寫redis叢集的時間達到4/5w/s 預計
        
        2 流處理一條條過濾 源資料 剔除 三個欄位任何一個為空的情況,一小時前的資料直接丟棄,每分鐘產生一個訊號時間(當前時間) 以該時間為準 ,選一分鐘的時間視窗
        取訪問時間在當前分鐘往前推一分鐘的時間間隔裡的 使用者號碼 時間訊號 appId的次數 ,以這樣的統計結果增量入redis。
        
        api端 拆分成60個時間片區(時間訊號)取值後再統計
       

   最終按照第一方案實現

       

附程式碼:

kafka消費的stream程式碼:

namespace com.asiainfo.position;
use com.zjmcc.kafka.consumer::*;
type ti_userBeav=rstring msisdn,rstring dataTime,rstring appId;
type fullMseeage=rstring a0,rstring a1,rstring a2,rstring a3,rstring a4,rstring a5,rstring a6,rstring a7,rstring a8,rstring a9,rstring a10,rstring a11,rstring a12,rstring a13,rstring a14,rstring a15,rstring a16,rstring a17,rstring a18,rstring a19,rstring a20,rstring a21,rstring a22;
composite UserBehaviorSource {
    graph
        stream<rstring message> UserBehavStream = KafkaConsumer()
        {
            param
                vmArg:"-Xmx4096m";
                topic: "seq_101_internet";
                threadsPerTopic:15;
                propertiesFile:"etc/consumer.properties";
                //CONCURRENCY_THREAD_NUM:10;
                //topic_partition:getChannel();
            config placement:host("pc-zjqbsp15");
        }
        /*
        () as Sink5 = FileSink(UserBehavStream)                                                                      
        {                                                                                              
          param                                                                                           
            file : "/opt/data/streamdtklad/data/kafkaUserBeav.dat";    
            closeMode : dynamic;
            append : true;   
            config placement:host("pc-zjqbsp16");                                                                       
        }
        *  */
        
        
        @parallel(width=30)
        () as Res = UserBeavSignalProccess(UserBehavStream){
            config placement:host(sqlPool3);
        }
        config hostPool : sqlPool3=[  "pc-zjqbsp05","pc-zjqbsp06", "pc-zjqbsp16", "pc-zjqbsp24", "pc-zjqbsp25"];
         
        
}
composite UserBeavSignalProccess(input UserBehavStream){
    graph
        
        (stream<timestamp tag1> currentTime1) as Beaconcuu = Beacon()
        {
            param
                period : 60.0 ;
                initDelay : 60.0 ;
            output
                currentTime1 : tag1 = getTimestamp() ;
        }
         
        (stream<ti_userBeav> ub_tmp) = Custom(UserBehavStream;currentTime1)
        {
            logic state :{
                mutable list<rstring> res;
                mutable rstring msisdn;
                mutable rstring dataTime;
                mutable rstring appId;
                mutable timestamp standrtime;
            }
            
            onTuple currentTime1:{
                standrtime=currentTime1.tag1;
            }
            
            onTuple UserBehavStream:{////
                res=tokenize(UserBehavStream.message,"    ",true);
                if(size(res) >= 22&&diffAsSecs(standrtime,toTimestamp(Sys.YYYY_MM_DD_hh_mm_ss ,res[2]))<3600.0){
                    
                    
                    submit({msisdn=res[1],dataTime=res[2],appId=res[21]},ub_tmp);
                    //submit({a0=res[0],a1=res[1],a2=res[2],a3=res[3],a4=res[4],a5=res[5],a6=res[6],a7=res[7],a8=res[8],a9=res[9],a10=res[10],a11=res[11],a12=res[12],a13=res[13],a14=res[14],a15=res[15],a16=res[16],a17=res[17],a18=res[18],a19=res[19],a20=res[20],a21=res[21],a22=res[22]},fullMseeageIfs);
                    
                    
                }
                
                
                
            }
            }
            
       
        () as SiRes = Export(ub_tmp)
        {
            param
                streamId : "KafkaUBSignal_"+(rstring)getChannel() ;
        }
               
}

stream彙總 及寫redis程式碼:

namespace com.asiainfo.position;
use application::*;	
composite BasicUerBehav {
	graph
	@parallel(width=30)
	stream<rstring minTime,rstring msisdn,rstring appId,int32 cnts> userBeaData = getBeaData(){}
	
	
	@parallel(width=20,partitionBy=[{port=userBeaData, attributes=[msisdn]}])
	() as redisRes = RedisSink(userBeaData)
    {
    }
    config hostPool : sqlPoollw=["pc-zjqbsp06", "pc-zjqbsp16", "pc-zjqbsp24" , "pc-zjqbsp25","pc-zjqbsp05"];    // "pc-zjqbsp04",,
     		   placement: host(sqlPoollw);
}



composite getBeaData(output addInfo){
	graph
		stream<rstring msisdn,rstring dataTime,rstring appId> Si_init = Import()
        {
        	param
        		applicationName : "com.asiainfo.position::UserBehaviorSource";
                streamId : "KafkaUBSignal_"+(rstring)getChannel() ;
        }
        
        
       	stream<rstring msisdn,rstring dataTime,rstring appId> Si_tmp = Filter(Si_init)
	    {
	    	
	    }
	    (stream<timestamp tag1> currentTime1) as Beacon_1 = Beacon()
		{
			param
				period : 60.0 ;
				//initDelay : 60.0 ;
			output
				currentTime1 : tag1 =  getTimestamp();//add(,-86400.0)
		}
	    stream<rstring msisdn,timestamp dataTime,rstring appId> userBeaData = Custom(Si_tmp;currentTime1){
			logic state:{
				
				mutable timestamp standrtime;
			}
			/*
			onTuple currentTime1:{
				standrtime=currentTime1.tag1;
			}
			
			*/
			onTuple Si_tmp:{//diffAsSecs(standrtime,toTimestamp(Sys.YYYY_MM_DD_hh_mm_ss ,Si_tmp.dataTime))<3600.0
				if(length(Si_tmp.msisdn)>0&&length(Si_tmp.dataTime)>0&&length(Si_tmp.appId)>0){
	    			//submit({msisdn=Si_tmp.msisdn,dataTime=Si_tmp.dataTime,appId=Si_tmp.appId,minTime=toString(standrtime, "%Y-%m-%d")+" "+toString(standrtime, "%H:%M"+":00")},userBeaData);
	    			submit({msisdn=Si_tmp.msisdn,dataTime=toTimestamp(Sys.YYYY_MM_DD_hh_mm_ss ,Si_tmp.dataTime),appId=Si_tmp.appId},userBeaData);
	    			//}
					
				}
			}
		}
		
		
		(stream<timestamp minTime,rstring msisdn,rstring appId,rstring dataTime> add_user_bea) as Join_Add_UserBea =
			Join(userBeaData as LS ; currentTime1 as RS)
		{
			window
				LS : sliding, time(180.000), partitioned ;
				RS : sliding, count(0) ;
			param
				algorithm : inner ;
				partitionByLHS : LS.appId ;
				//  partitionByLHS  : LS.msisdn;

				match : LS.dataTime <= RS.tag1 && diffAsSecs(RS.tag1, LS.dataTime)
					<= 60.000 ;
			output
				add_user_bea : minTime = RS.tag1, msisdn = LS.msisdn,appId=LS.appId,dataTime=toString(LS.dataTime, "%Y-%m-%d")+" "+toString(LS.dataTime, "%H:%M:%S")  ;//toString(So_all_region_grp.minTime, "%Y-%m-%d")+" "+toString(So_all_region_grp.minTime, "%H:%M:%S")     stayDura add by hezh at 20160711
		}
		
		
		(stream<timestamp minTime,rstring msisdn,rstring appId,int32 cnts> So_all_region_grp) as Aggregate_all =
			Aggregate(add_user_bea as inPort0Alias)
		{
			window
				inPort0Alias : tumbling, punct() ;
			param
				groupBy : minTime, msisdn ,appId;
			output
				So_all_region_grp : cnts = Sum(1);
		}
		
		stream<rstring minTime,rstring msisdn,rstring appId,int32 cnts> addInfo =Custom(So_all_region_grp){
			logic
			onTuple So_all_region_grp:{
				submit({minTime=toString(So_all_region_grp.minTime, "%Y-%m-%d")+" "+toString(So_all_region_grp.minTime, "%H:%M:00"),msisdn=So_all_region_grp.msisdn,appId=So_all_region_grp.appId,cnts=So_all_region_grp.cnts},addInfo);
			}
			
		}
		/*
		() as Sink5 = FileSink(addInfo)                                                                      
		{                                                                                              
		  param                                                                                           
		    file : "/opt/data/streamdtklad/data/kafkaUserBeavsnap"+(rstring)getChannel()+".dat";    
		    closeMode : dynamic;
		    append : true;   
		    config placement:host("pc-zjqbsp16");                                                                       
		}
		 */
		
}
jedis程式碼:

reidsCoonfig.java

package redis;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.Properties;
import java.io.InputStream;
import java.lang.reflect.Field;
public class RedisConfig {
	/******************Message*************************/	
	public static String MSG_SPLIT=",";
	public static int MSG_NO_MSISDN=0;
	public static int MSG_NO_LAC=2;
	public static int MSG_NO_CELL=3;
	public static int MSG_NO_EVENTTIME=4;
	
	/******************Redis*************************/
	//public static String REDIS_HOST = "redis-single";
	public static String REDIS_HOST = "10.70.134.140";
	public static int REDIS_PORT = 16379;
	public static int REDIS_CONN_TOTAL = 100;
	public static int REDIS_CONN_IDLE = 5;
	public static Long REDIS_CONN_WAIT = 30000l;
	public static boolean REDIS_TESTONBORROW = false;
	public static int REDIS_DB_TOTAL = 10;
	public static int REDIS_DB_NO = 0;
	
	/******************Kafka*************************/
	
	public static String KAFKA_ZK_CONN = "192.168.1.20:2181";
	public static String KAFKA_GRP_ID = "TEST";
	public static String KAFKA_TOPIC = "signal0000";
	public static String KAFKA_ZK_TMOUT = "4000";
	public static String KAFKA_ZK_SYNC = "200";
	public static String KAFKA_AUTO_COMM = "1000";
	public static String KAFKA_OFFSET = "smallest";	
	
	/*******************File****************************/
	public static int FILE_FLUSH_BATCH = 5000;
	public static String FILE_LOCUS_DIR = "F:\\Test\\Output\\";
	
	/**
	 * 
	 * @param file
	 */
	public static void overStaticInfo(File file){
		Properties prop = new Properties();
        InputStream fis = null;
		try {
			fis = new FileInputStream(file);
			prop.load(fis);
			Field[] field = RedisConfig.class.getDeclaredFields();
			for(int i=0;i<field.length;i++){
				String val = prop.getProperty(field[i].getName());
				if(val != null){
					if(field[i].getType() == boolean.class){
						field[i].set(null,Boolean.parseBoolean(val));						
					}else if(field[i].getType() == Long.class){
						field[i].set(null,Long.parseLong(val));		
					}else if(field[i].getType() == int.class){
						field[i].set(null,Integer.parseInt(val));
					}else{
						field[i].set(null,val);
					}
				}
			}
		} catch (Exception e) {
			System.out.println("ps:ERROR Init RedisConfig failed !");
			e.printStackTrace();
			System.exit(-1);
		}finally{
			try {
				if(fis != null){
					fis.close();					
				}
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
	}
	
	public static void printInfo(){
		System.out.println("########----Config Info:----########");
		Field[] field = RedisConfig.class.getDeclaredFields();
		for(int i=0;i<field.length;i++){
			try {
				System.out.println("ps:INFO "+field[i].getName()+"="+field[i].get(null));
			} catch (Exception e) {
				e.printStackTrace();
			}
		}
		System.out.println("########----:Config Info----########");
	}

}

redis例項:
package redis;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;

import org.apache.commons.pool2.impl.GenericObjectPoolConfig;

import redis.RedisConfig;


import redis.clients.jedis.Jedis;
import redis.clients.jedis.HostAndPort;
//import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisCluster;
import redis.clients.jedis.JedisPoolConfig;
import redis.clients.jedis.JedisShardInfo;
import redis.clients.jedis.ShardedJedis;
import redis.clients.jedis.ShardedJedisPool;

public class RedisDataSource {
    //private Jedis jedis;//?..?.?瀹㈡.絝.??
    //private static JedisPool jedisPool;//?..?.??ユ?
    //private ShardedJedis shardedJedis;//?..棰..?風.榪..
    //private ShardedJedisPool shardedJedisPool;//?..榪..奼
    private static List<JedisPool> JedisPoolList = new ArrayList<>();
    private static JedisPool jedisPool;
    private static JedisCluster jedisCluster//= new JedisCluster(getJedisClusterNodes(), 10000,getCfg())
    ;
    
    public static Jedis getSource(int database){
    	if(JedisPoolList.size() == 0){
    		initJedisPoolList();
    	}
    	if(JedisPoolList.size()  < database){
    		return null;
    	}else{
    		return JedisPoolList.get(database).getResource();    		
    	}
    }
    
    public static JedisCluster getSource(){
    	/*if(jedisPool  == null){
    		initJedisPool();
    	}
    	return jedisPool.getResource();*/
    	if(jedisCluster==null){
    		initJedisPool();
    	}
    	return jedisCluster;
     }
    
    private static GenericObjectPoolConfig getCfg(){
        GenericObjectPoolConfig config = new GenericObjectPoolConfig();
        config.setMaxTotal(RedisConfig.REDIS_CONN_TOTAL);
        config.setMaxIdle(RedisConfig.REDIS_CONN_IDLE);
        config.setMaxWaitMillis(RedisConfig.REDIS_CONN_WAIT);
        config.setTestOnBorrow(RedisConfig.REDIS_TESTONBORROW);
        return config;
    }
    
    private static Set<HostAndPort> getJedisClusterNodes(){
    	Set<HostAndPort> jedisClusterNodes = new HashSet<HostAndPort>();
    	//Jedis Cluster will attempt to discover cluster nodes automatically
    	jedisClusterNodes.add(new HostAndPort(RedisConfig.REDIS_HOST, RedisConfig.REDIS_PORT));
    	return jedisClusterNodes;
    }
    
    private static void initJedisPoolList(){
    	for(int i=0;i<RedisConfig.REDIS_DB_TOTAL;i++){
    		JedisPoolList.add(new JedisPool(getCfg(),RedisConfig.REDIS_HOST,RedisConfig.REDIS_PORT,10000,null,i));
    	}
    }
    
    private static void initJedisPool(){
    	 Set<HostAndPort> jedisClusterNodes = new HashSet<HostAndPort>();
    	//Jedis Cluster will attempt to discover cluster nodes automatically
    	jedisClusterNodes.add(new HostAndPort(RedisConfig.REDIS_HOST, RedisConfig.REDIS_PORT));
    	jedisCluster= new JedisCluster(jedisClusterNodes, 10000,getCfg());
    	//jedisCluster= new JedisCluster(jedisClusterNodes, 10000);
    	//singal instance redis
    	//jedisPool = new JedisPool(getCfg(),RedisConfig.REDIS_HOST,RedisConfig.REDIS_PORT,10000,null,RedisConfig.REDIS_DB_NO);
    }
    /*
    //?.??..?..奼
    private static void initialPool()
    {
        // 奼..?..緗
        GenericObjectPoolConfig config = new GenericObjectPoolConfig();
           config.setMaxTotal(1);
           config.setMaxIdle(5);
           config.setMaxWaitMillis(1000l);
           config.setTestOnBorrow(false);
        jedisPool = new JedisPool(config,"192.168.1.22",6379);
    }
    
    // ?.??..?.? 
    private void initialShardedPool() 
    { 
        // 奼..?..緗.
    	GenericObjectPoolConfig config = new GenericObjectPoolConfig();
        config.setMaxTotal(20);
        config.setMaxIdle(5);
        config.setMaxWaitMillis(1000l); 
        config.setTestOnBorrow(false); 
        // slave?炬. 
        List<JedisShardInfo> shards = new ArrayList<JedisShardInfo>(); 
        shards.add(new JedisShardInfo("192.169.1.22", 6379, "master"));

        // ?..? 
        shardedJedisPool = new ShardedJedisPool(config, shards); 
    } 

    public void show() {     
        KeyOperate(); 
        StringOperate(); 
        ListOperate(); 
        SetOperate();
        SortedSetOperate();
        HashOperate();
    } 

      private void KeyOperate() {
         
      }

      private void StringOperate() {
  
      }

      private void ListOperate() {

      }

      private void SetOperate() {
     
      }

      private void SortedSetOperate() {
     
      }
    
      private void HashOperate() {
        
      }
      
      */
}

繼承stream的java類 呼叫獲取例項:
package application;


import java.io.IOException;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.Map.Entry;

import org.apache.log4j.Logger;

import redis.RedisDataSource;
import redis.clients.jedis.JedisCluster;

import com.ibm.streams.operator.AbstractOperator;
import com.ibm.streams.operator.OperatorContext;
import com.ibm.streams.operator.StreamSchema;
import com.ibm.streams.operator.Type;
import com.ibm.streams.operator.StreamingData.Punctuation;
import com.ibm.streams.operator.StreamingInput;
import com.ibm.streams.operator.Tuple;
import com.ibm.streams.operator.model.InputPortSet;
import com.ibm.streams.operator.model.InputPortSet.WindowMode;
import com.ibm.streams.operator.model.InputPortSet.WindowPunctuationInputMode;
import com.ibm.streams.operator.model.InputPorts;
import com.ibm.streams.operator.model.Libraries;
import com.ibm.streams.operator.model.PrimitiveOperator;

/**
 * Class for an operator that consumes tuples and does not produce an output stream. 
 * This pattern supports a number of input streams and no output streams. 
 * <P>
 * The following event methods from the Operator interface can be called:
 * </p>
 * <ul>
 * <li><code>initialize()</code> to perform operator initialization</li>
 * <li>allPortsReady() notification indicates the operator's ports are ready to process and submit tuples</li> 
 * <li>process() handles a tuple arriving on an input port 
 * <li>processPuncuation() handles a punctuation mark arriving on an input port 
 * <li>shutdown() to shutdown the operator. A shutdown request may occur at any time, 
 * such as a request to stop a PE or cancel a job. 
 * Thus the shutdown() may occur while the operator is processing tuples, punctuation marks, 
 * or even during port ready notification.</li>
 * </ul>
 * <p>With the exception of operator initialization, all the other events may occur concurrently with each other, 
 * which lead to these methods being called concurrently by different threads.</p> 
 */
@PrimitiveOperator(name="RedisSink", namespace="application",
description="Java Operator RedisSink")
@InputPorts({@InputPortSet(description="Port that ingests tuples", cardinality=1, optional=false, windowingMode=WindowMode.NonWindowed, windowPunctuationInputMode=WindowPunctuationInputMode.Oblivious), @InputPortSet(description="Optional input ports", optional=true, windowingMode=WindowMode.NonWindowed, windowPunctuationInputMode=WindowPunctuationInputMode.Oblivious)})
@Libraries({"impl/lib/jedis-2.8.0.jar","impl/lib/commons-pool2-2.4.2.jar"})//
public class RedisSink extends AbstractOperator {
	protected JedisCluster jedisCluster;
	protected HashMap<Integer,Object> map=new HashMap<Integer,Object>(); 
	private String msisdn;
	private String cnts;
	private String appId;
	private String minTime;
	
	private void initSchema(StreamSchema ss){
//		trace.log(TraceLevel.INFO, "");
		for(int i=1;i<=ss.getAttributeCount();i++){
			map.put(i, ss.getAttribute(i-1).getType().getMetaType());
			//trace.info("log  "+ss.getAttribute(i-1).getType().getMetaType());
			//System.out.println("syso  "+ss.getAttribute(i-1).getType().getMetaType());
		}
	}
	
	private void setTupeValue(Tuple tuple) throws SQLException{
		for(java.util.Iterator<Entry<Integer, Object>> it=map.entrySet().iterator();it.hasNext();){
			Entry<Integer, Object> entry = it.next();
//			System.out.println("----------------"+entry.getValue());
			if(entry.getValue().equals(Type.MetaType.RSTRING)){
				//pstmt.setString(entry.getKey(),tuple.getString(entry.getKey()-1));
				if(entry.getKey().equals(1)){
					minTime=tuple.getString(entry.getKey()-1);
				}else if(entry.getKey().equals(2)){
					msisdn=tuple.getString(entry.getKey()-1);
				}else if(entry.getKey().equals(3)){
					appId=tuple.getString(entry.getKey()-1);
				}
		}if(entry.getValue().equals(Type.MetaType.INT32)){
			if(entry.getKey().equals(4)){
				cnts=tuple.getString(entry.getKey()-1);
			}
		}
		}
	}
	
    /**
     * Initialize this operator. Called once before any tuples are processed.
     * @param context OperatorContext for this operator.
     * @throws Exception Operator failure, will cause the enclosing PE to terminate.
     */
	@Override
	public synchronized void initialize(OperatorContext context)
			throws Exception {
    	// Must call super.initialize(context) to correctly setup an operator.
		super.initialize(context);
        //Logger.getLogger(this.getClass()).trace("Operator " + context.getName() + " initializing in PE: " + context.getPE().getPEId() + " in Job: " + context.getPE().getJobId() );
		initSchema(getInput(0).getStreamSchema());
        jedisCluster = RedisDataSource.getSource();
        // TODO:
        // If needed, insert code to establish connections or resources to communicate an external system or data store.
        // The configuration information for this may come from parameters supplied to the operator invocation, 
        // or external configuration files or a combination of the two.
	}

    /**
     * Notification that initialization is complete and all input and output ports 
     * are connected and ready to receive and submit tuples.
     * @throws Exception Operator failure, will cause the enclosing PE to terminate.
     */
    @Override
    public synchronized void allPortsReady() throws Exception {
    	// This method is commonly used by source operators. 
    	// Operators that process incoming tuples generally do not need this notification. 
        OperatorContext context = getOperatorContext();
        Logger.getLogger(this.getClass()).trace("Operator " + context.getName() + " all ports are ready in PE: " + context.getPE().getPEId() + " in Job: " + context.getPE().getJobId() );
    }

    /**
     * Process an incoming tuple that arrived on the specified port.
     * @param stream Port the tuple is arriving on.
     * @param tuple Object representing the incoming tuple.
     * @throws Exception Operator failure, will cause the enclosing PE to terminate.
     */
    @Override
    public void process(StreamingInput<Tuple> stream, Tuple tuple)
            throws Exception {
        // TODO Insert code here to process the incoming tuple, 
    	// typically sending tuple data to an external system or data store.
    	// String value = tuple.getString("AttributeName");
    	//String minTime;
    	//System.out.println("Takes[" + (System.currentTimeMillis() - tmp) + "] ms");	
    	setTupeValue(tuple);
    	//System.out.println("minTime:"+minTime+"msisdn:"+msisdn+"appId:"+appId+"cnts:"+cnts);
    	//System.out.println(System.currentTimeMillis());
    	//long a=System.currentTimeMillis();
    	
    	if(appId!=null&&cnts!=null){
    		
    		jedisCluster.rpush(msisdn+","+minTime, appId+"|"+cnts);//如果是redis叢集 要用jedisCluster的方法 塞值 如果用jedis物件 會報錯。
    	    
        	//long b=System.currentTimeMillis();
        	//System.out.println("rpush...."+(b-a));
    		jedisCluster.expire(msisdn+","+minTime, 3600);
    		
    	}
    	
		//long c=System.currentTimeMillis();
		//System.out.println("expire...."+(c-b));
		
    }
    
    /**
     * Process an incoming punctuation that arrived on the specified port.
     * @param stream Port the punctuation is arriving on.
     * @param mark The punctuation mark
     * @throws Exception Operator failure, will cause the enclosing PE to terminate.
     */
    @Override
    public void processPunctuation(StreamingInput<Tuple> stream,
    		Punctuation mark) throws Exception {
    	// TODO: If window punctuations are meaningful to the external system or data store, 
    	// insert code here to process the incoming punctuation.
    }

    /**
     * Shutdown this operator.
     * @throws Exception Operator failure, will cause the enclosing PE to terminate.
     */
    @Override
    public synchronized void shutdown() throws Exception {
        OperatorContext context = getOperatorContext();
        Logger.getLogger(this.getClass()).trace("Operator " + context.getName() + " shutting down in PE: " + context.getPE().getPEId() + " in Job: " + context.getPE().getJobId() );
        
        // TODO: If needed, close connections or release resources related to any external system or data store.
        
        jedisCluster.close();
        // Must call super.shutdown()
        super.shutdown();
    }
    
}


相關推薦

Redis--初識Redis

不同 其他 關系數據庫 請求 特性 客戶端 客戶 內存數據庫 之間 Redis 是一個遠程內存數據庫,它不僅性能強勁,而且還具有復制特性以及為解決問題而生的獨一無二的數據模型。Redis 提供了 5 種不同類型的數據結構,各式各樣的問題都可以很自然的映射到這些數據結構上。

Redis——初識Redis

Redis簡介 Redis的資料結構致力於幫助使用者解決問題,而不是像關係型資料庫那樣,要求使用者扭曲問題來適應資料庫。除此之外,通過複製、持久化和客戶端分片(client-side sharding)等特性,使用者可以很方便的將Redis擴充套件成一個能夠包含數百GB資料、每秒處理上百

redis 初識redis

key value型的記憶體資料庫。 沒有持久化情況下 看做是一塊記憶體,所有的資料要設定失效時間。或者 自己記得key 不然資料就會一直在裡面。 背景:     x省移動臨時提的一個需求 ,使用者上網日誌實時資料 取訪問開始時間 手機號 訪問app的編碼 實時分析每個使

初識Redis

使用 不能 初學 之間 alt 大量 處理海量數據 存在 非關系型 Redis作為NOSQL,本人初學Redis(以下是對其簡單的認識): 一.Redis(在數據庫部分有提到) 優點:1、海量數據的增刪改查,非常輕松應對    2、海量數據的維護非常輕松。 缺點:1、數

初識Redis系列之三:Redis支持的數據類型及使用

ted print 數據類型 eight 排序 sorted ring hang 無序 支持的數據類型有五種: string(字符串)、hash(哈希)、list(列表)、set(集合)及zset(sorted set:有序集合); 下面分別對這幾種類型進行簡單的Redis

01.redis初識

redis;Redis學習:redis是什麽?Redis 是一個開源(BSD許可)的,內存中的數據結構存儲系統,它可以用作數據庫、緩存和消息中間件。不過Redis在生產環境中使用最多的功能是緩存系統。至於其他作用比如數據庫和消息中間件,則不會展開。在大型的互聯網系統架構中,redis可以使用緩存技術減輕數據庫

初識Redis(安裝,持久化,數據類型)

BE 數據 rdb持久化 alt foreign 權重 分享 erro 行數據 [toc] 初識Redis(安裝,持久化,數據類型) 一、Redis介紹: [ ] Redis和Memcached類似,也屬於k-v數據存儲,但是功能和操作性要比Memcached好很多。 [

redis學習1--初識redisredis的安裝,啟動。。。

redis 3.0 啟動 一個 工作 持久 集群 端口號 daemonize Linux 環境下 下載redis wget http://download.redis.io/releases/redis-3.0.7.tar.gz 解壓 tar -zxvf redis-3.0

redis初識01

grep number oca 文件 yun 目錄 所有 redis tar 一、Redis簡介 1.1 Redis是什麽 REmote DIctionary Server(Redis) 是一個key-value存儲系統。 1.2 Redis優點 性能極高 – Redis能

初識redis-cluster

數據文件 argument version blank open root nal 啟用 acc 安裝redis 1 [root@localhost ~]# cd /datas/soft/ 2 [root@localhost soft]# ll redis-5.0.0

redis學習1:初識redis

redis Redis是一個開源的使用ANSI C語言編寫、支援網路、可基於記憶體亦可持久化的日誌型、Key-Value資料庫,並提供多種語言的API。簡而言之redis就是放在遠端網路上的一個key-value資料結構。 優點 redis豐富的資料結構——更加方便操作 re

Redis(1)-----初識Redis-----windows,linux系統下安裝Redis及其視覺化工具RedisDesktopManager配置

一,windows系統 1.1,安裝 要安裝Redis,首先要獲取安裝包。 Windows的Redis安裝包需要到以下GitHub連結找到。 連結:https://github.com/MSOpenTech/redis   開啟網站後,找到Release,點選前往下載頁面。  

Redis入門與實踐--Redis初識(一)

Redis 入門到實踐 甚贊Redis 高效能Key-Value伺服器 多中資料結構 豐富的功能 高可用分散式支援 Redis 目標 Redis 初始 API的理解和使用 Redis 客戶端的使用 Redis 持久化的

Redis(2)-----初識Redis-----基礎redis命令

hash型別(雜湊map)HMSET mymap1 name "qingruihappy" description "suning" age "20" sex "man" sex "man" sex "man" sex "man"HMSET mymap2 name "qingruihappy" descrip

Redis(3)-----初識Redis-----基礎redis+java程式碼

一,連結redis static Jedis jedis = new Jedis("127.0.01"); /** * https://blog.csdn.net/z23546498/article/details/73556260 測試連結 */ @Test

Redis(7)-----初識Redis-----客戶端對Redis叢集的使用方法

記得連結之前關閉防火牆,或者把本埠號新增到防火牆例外 [[email protected]0723 bin]# service iptables stop -A INPUT -m state --state ESTABLISHED,RELATED -j ACCEPT -A INPUT -p

redis初識(一)

redis初識(一) 之前工作的地方技術用的相對陳舊,用的也都是企業那種授權,開源技術不多,Redis出來好長時間一直沒有具體體驗過,自己最近工作上原因要去研究試用一下。 Redis是什麼 Redis結構上可以說是一個大Map,也可以說是一個特殊的資料庫,作為快取來說也有他自己很

(一)redis初識與安裝(windows)

Redis是什麼? Redis是一個開源免費的,效能較高的key-value資料庫,我們常見的mysql叫關係型資料庫,redis屬於非關係型資料庫。 所謂效能高,讀的速度11000次/s,寫的速度是81000次/s,(直接在記憶體中)注意是理論速度,實測沒

初識---Redis

博主最近在做一個練手Web專案,接觸到了以前被我跳過的內容Redis,哈哈出來混遲早是要還的。於是呢在此也進行了一個簡單的入門學習,拒絕懵逼狀態。 在介紹Redis之前我們先談談什麼是NoSQL。對我來說談到NoSQL最先想到是的是啥呢,當然是Hbase,Hbase在我

第1章 初識Redis

推薦書籍 《Redis實戰》Josiah L.Carlson 著 Redis Redis是一個記憶體資料庫(或者說記憶體資料結構)伺服器 Redis是一個遠端記憶體資料庫,它不僅效能強勁,而且還具有複製特性以及為解決問題而生的獨一無二的資料模型。 Redis提供