1. 程式人生 > >使用Java(Springboot)操作Redis

使用Java(Springboot)操作Redis

1、 redis簡介 
redis是Nosql資料庫中使用較為廣泛的非關係型記憶體資料庫,redis內部是一個key-value儲存系統。它支援儲存的value型別相對更多,包括string(字串)、list(連結串列)、set(集合)、zset(sorted set –有序集合)和hash(雜湊型別,類似於Java中的map)。Redis基於記憶體執行並支援持久化的NoSQL資料庫,是當前最熱門的NoSql資料庫之一,也被人們稱為資料結構伺服器。 
2、 網際網路時代背景下大機遇,什麼要使用Nosql? 
1) 當資料量的總大小一個機器放不下時。 
2) 資料索引一個機器的記憶體放不下時。 
3) 訪問量(讀寫混合)一個例項放不下時。

單機時代模型 


如果每次儲存成千上萬條資料,這樣很會導致mysql的效能很差,儲存以及讀取速度很慢,然後就演變成快取+mysql+垂直拆分的方式。 

Cache作為中間快取 
將所有的資料先儲存到快取中,然後再存入mysql中,減小資料庫壓力,提高效率。 
但是當資料再次增加到又一個量級,上面的方式也不能滿足需求,由於資料庫的寫入壓力增加,Memcached只能緩解資料庫的讀取壓力。讀寫集中在一個數據庫上讓資料庫不堪重負,大部分網站開始使用主從複製技術來達到讀寫分離,以提高讀寫效能和讀庫的可擴充套件性。Mysql的master-slave模式成為這個時候的網站標配了。 


主從分離模式 


在redis的快取記憶體,MySQL的主從複製,讀寫分離的基礎之上,這時MySQL主庫的寫壓力開始出現瓶頸,而資料量的持續猛增,由於MyISAM使用表鎖,在高併發下會出現嚴重的鎖問題,大量的高併發MySQL應用開始使用InnoDB引擎代替MyISAM。


分表分庫模式 
將變化小的、業務相關的放在一個數據庫,變化多的,不相關的資料放在一個數據庫。 
3、 nosql資料庫的優勢 
1)易擴充套件 
這些型別的資料儲存不需要固定的模式,無需多餘的操作就可以進行橫向的擴充套件。相對於關係型資料庫可以減少表和欄位特別多的情況。也無型之間在架構的層面上帶來了可擴充套件的能力 
2)大資料量提高效能 
3)多樣靈活的資料模型 


在nosql中不僅可以儲存String,hash,set、Zset等資料型別,還可以儲存javaBean以及多種複雜的資料型別。 
4、 NoSql的應用 
1) 大資料時代淘寶、微信、以及微博等都廣泛的使用了redis資料庫,將一些固定不變的資料例如學校,區域等固定的資訊儲存在關係型資料庫中。然後對於經常變化的資料例如淘寶每個節日都會有比較熱門的搜尋顯示在搜尋框,當節日過去關鍵字自動刪除,為了便於管理,可以將這些資料儲存在redis資料庫中,並設定過期時間,到達時間就自動刪除。 
2)為了緩解資料庫壓力,微博首先將傳送的微博儲存到redis資料庫,自己可以立即檢視到,然後將記憶體中的資料同步到關係型資料庫。

下面通過例子,介紹利用Java操作Redis的例子。在springboot中操作Redis可以通過Jedis或者通過RedisTemplate來實現,因為springboot會自動掃描生成RedisTemplate物件,並且使用RedisTemplate操作Redis單節點或者Redis叢集都可以使用同樣的介面,這裡使用RedisTemplate來講解。

1. 首先訪問https://start.spring.io,生成springboot專案,依賴選擇web和redis,選擇Generate project。


2. 我們這裡整合了swagger-ui,可以方便你測試後臺restful api介面。在pom檔案中新增如下依賴

        <dependency>
            <groupId>io.springfox</groupId>
            <artifactId>springfox-swagger2</artifactId>
            <version>${swagger.version}</version>
        </dependency>
        <dependency>
            <groupId>io.springfox</groupId>
            <artifactId>springfox-swagger-ui</artifactId>
            <version>${swagger.version}</version>
        </dependency>
修改application.properties檔案
#for single node
spring.redis.host=127.0.0.1
spring.redis.port=6379
spring.redis.database=0
spring.redis.pool.min-idle=5
spring.redis.pool.max-idle=10

#for redis cluster
#spring.redis.cluster.nodes[0]=172.17.0.61:6379
#spring.redis.cluster.nodes[1]=172.17.0.61:6380
#spring.redis.cluster.nodes[2]=172.17.0.61:6381
#spring.redis.cluster.nodes[3]=172.17.0.61:6382
#spring.redis.cluster.nodes[4]=172.17.0.61:6383
#spring.redis.cluster.nodes[5]=172.17.0.61:6384
#spring.redis.cluster.nodes[6]=172.17.0.61:6385
#spring.redis.cluster.nodes[7]=172.17.0.61:6386
#spring.redis.cluster.nodes[8]=172.17.0.61:6387
#spring.redis.cluster.nodes[9]=172.17.0.61:6388
#spring.redis.cluster.nodes[10]=172.17.0.61:6389
#spring.redis.cluster.nodes[11]=172.17.0.61:6390
#spring.redis.cluster.nodes[12]=172.17.0.61:6391
#spring.redis.cluster.nodes[13]=172.17.0.61:6392
#spring.redis.cluster.nodes[14]=172.17.0.61:6393
3. RedisTemplate介紹

spring 封裝了 RedisTemplate 物件來進行對redis的各種操作,它支援所有的 redis 原生的 api。

RedisTemplate中定義了5中資料結構的操作。

redisTemplate.opsForValue();//操作字串
redisTemplate.opsForHash();//操作hash
redisTemplate.opsForList();//操作list
redisTemplate.opsForSet();//操作set
redisTemplate.opsForZSet();//操作有序set

spring-data-redis的序列化類有下面這幾個:

  • GenericToStringSerializer: 可以將任何物件泛化為字串並序列化
  • Jackson2JsonRedisSerializer: 跟JacksonJsonRedisSerializer實際上是一樣的
  • JacksonJsonRedisSerializer: 序列化object物件為json字串
  • JdkSerializationRedisSerializer: 序列化java物件
  • StringRedisSerializer: 簡單的字串序列化
具體可以參考這篇文章,關於Spring Data redis幾種物件序列化的比較

我們這裡選擇Key使用StringRedisSerialize,Value使用Jackson2JsonRedisSerializer的方式。避免Key被序列化後無法通過Redis客戶端訪問的問題。

@Component
public class RedisUtils {
@Autowired
@Qualifier("redisTemplate")
RedisTemplate template;


@Autowired
ApplicationContextProvider provider;


private Logger log = LoggerFactory.getLogger(this.getClass());


@Bean
RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
MessageListenerAdapter listenerAdapter) {


RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
// container.addMessageListener(listenerAdapter, new PatternTopic("chat"));


return container;
}


@Bean
MessageListenerAdapter listenerAdapter(Receiver receiver) {
return new MessageListenerAdapter(receiver, "receiveMessage");
}


@Bean
Receiver receiver(CountDownLatch latch) {
return new Receiver(latch);
}


@Bean
CountDownLatch latch() {
return new CountDownLatch(1);
}


@Bean
StringRedisTemplate stringTemplate(RedisConnectionFactory connectionFactory) {
return new StringRedisTemplate(connectionFactory);
}


@PostConstruct
public void init() {
RedisSerializer<String> stringSerializer = new StringRedisSerializer();
Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<Object>(
Object.class);
ObjectMapper om = new ObjectMapper();
om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
jackson2JsonRedisSerializer.setObjectMapper(om);
template.setKeySerializer(stringSerializer);
template.setValueSerializer(jackson2JsonRedisSerializer);
template.setHashKeySerializer(stringSerializer);
template.setHashValueSerializer(jackson2JsonRedisSerializer);
template.afterPropertiesSet();
}
}
4. 為了避免每個service都去生成RedisTemplate,我包裝了一個RedisUtils物件,在物件裡面實現了所有Redis相關操作。

4.1 設定key/value值,並且可以設定超時時間。

	public void setValue(String key, Object val) {
		template.opsForValue().set(key, val);
	}

	public void setValue(String key, Object val, int time, TimeUnit unit) {
		template.opsForValue().set(key, val, time, unit);
	}
4.2 獲取key/value值
	public Object getValue(String key) {
		return template.opsForValue().get(key);
	}
4.3 同時設定多個key/value
	public void multiSet(Map<String, Object> map) {
		template.opsForValue().multiSet(map);
	}
4.4 同時獲取多個key/value
	public List<Object> multiGet(Collection<String> keys) {
		return template.opsForValue().multiGet(keys);
	}
4.5 操作取號器
	public long incr(String key, long delta) {
		return template.opsForValue().increment(key, delta);
	}
4.6 操作佇列,在高併發的情況下,如果同時上千個執行緒同時操作資料庫,資料庫很可能會因此而宕機。這個時候我們可以利用redis佇列,進行排隊依次寫入資料庫。利用lpush和rpop,可以形成一個queue。
	public void lpush(String key, String value) {
		template.opsForList().leftPush(key, value);
	}

	public List<Object> range(String key, int start, int end) {
		return template.opsForList().range(key, start, end);
	}

	public Object rpop(String key) {
		return template.opsForList().rightPop(key);
	}
4.7 有時候我們不光希望儲存key/value,也希望快取某個物件,比如使用者所有資料到redis。我們可以使用Redis的Hash資料結構來快取。
	public void setHash(String key, Map<String, Object> map) {
		template.opsForHash().putAll(key, map);
	}

	public Object getHash(String key, String prop) {
		return template.opsForHash().get(key, prop);
	}

	public Map getHashAll(String key) {
		Map map = new HashMap();
		map.put("keys", template.opsForHash().keys(key));
		map.put("vals", template.opsForHash().values(key));
		return map;
	}
4.8 如果我們需要一個小型的訊息中介軟體,可以選擇redis的訂閱/釋出來實現。
	public void subscribe(String channel) {
		RedisMessageListenerContainer container = provider.getBean(RedisMessageListenerContainer.class);
		MessageListenerAdapter listenerAdapter = provider.getBean(MessageListenerAdapter.class);
		container.addMessageListener(listenerAdapter, new PatternTopic(channel));
	}

	public void publish(String channel, String message) throws InterruptedException {
		CountDownLatch latch = provider.getBean(CountDownLatch.class);

		log.info("Sending message...");
		template.convertAndSend(channel, message);

		latch.await();
	}

5. 生成一個TemplateController作為我們的啟動Controller,用它來呼叫RedisUtils裡面實現的功能。
@Controller
public class TemplateController {
	private Logger log = LoggerFactory.getLogger(this.getClass());
	
	@Autowired
	RedisUtils utils;
	
    @ApiOperation(value = "操作字串/整數/浮點數", notes = "字串", httpMethod = "GET")
    @GetMapping(value = "/setString")
    @ResponseBody
    public String setString(@ApiParam(value = "存入key", required = true) @RequestParam String key,
    		 @ApiParam(value = "存入value", required = true) @RequestParam String value) {
    	utils.setValue(key,value,100,TimeUnit.SECONDS);
    	String ret = utils.getValue(key).toString();
    	return ret;
    }

    @ApiOperation(value = "一次性設定多組資料,字串/整數/浮點數", notes = "多組資料", httpMethod = "POST")
    @PostMapping(value = "/multiSet")
    @ResponseBody
    public List<Object> multiSet(@ApiParam(value = "存入key/value", required = true) @RequestBody Map keys) {
    	utils.multiSet(keys);
    	List<Object> ret = utils.multiGet(keys.keySet());
    	return ret;
    }

    @ApiOperation(value = "取號器 增量", notes = "取號器", httpMethod = "GET")
    @GetMapping(value = "/incr")
    @ResponseBody
    public long incr(@ApiParam(value = "key", required = true) @RequestParam String key,
    						@ApiParam(value = "增量 整數", required = true) @RequestParam long delta) {
    	return utils.incr(key, delta);
    }

    @ApiOperation(value = "插入佇列", notes = "插入佇列", httpMethod = "GET")
    @GetMapping(value = "/lpush")
    @ResponseBody
    public List<Object> lpush(@ApiParam(value = "key", required = true) @RequestParam String key,
    						@ApiParam(value = "內容", required = true) @RequestParam String content) {
    	utils.lpush(key, content);
    	return utils.range(key, 0, -1);
    }

    @ApiOperation(value = "推出佇列", notes = "推出佇列", httpMethod = "GET")
    @GetMapping(value = "/rpop")
    @ResponseBody
    public Object rpop(@ApiParam(value = "key", required = true) @RequestParam String key) {
    	return utils.rpop(key);
    }

    @ApiOperation(value = "Hashset", notes = "Hashset", httpMethod = "GET")
    @GetMapping(value = "/setHash")
    @ResponseBody
    public void setHash(@ApiParam(value = "key", required = true) @RequestParam String key,
    		@ApiParam(value = "Json字串", required = true) @RequestParam String json) {
    	Map map = (Map)JSON.parse(json);
    	utils.setHash(key, map);
    }

    @ApiOperation(value = "HashGetAll", notes = "HashGetAll", httpMethod = "GET")
    @GetMapping(value = "/getHashAll")
    @ResponseBody
    public Map getHashAll(@ApiParam(value = "key", required = true) @RequestParam String key) {
    	return utils.getHashAll(key);
    }

    @ApiOperation(value = "HashGet", notes = "HashGet", httpMethod = "GET")
    @GetMapping(value = "/getHash")
    @ResponseBody
    public Object getHash(@ApiParam(value = "key", required = true) @RequestParam String key,
    					@ApiParam(value = "prop", required = true) @RequestParam String prop) {
    	
    	Object ret = utils.getHash(key, prop);
    	log.info("key:" + key + " prop:" + prop + " val:" + ret.toString());
    	return ret;
    }

    @ApiOperation(value = "訂閱", notes = "subscribe", httpMethod = "GET")
    @GetMapping(value = "/subscribe")
    @ResponseBody
    public void subscribe(@ApiParam(value = "渠道", required = true) @RequestParam String channel) throws InterruptedException {   	
    	utils.subscribe(channel);
    }

    @ApiOperation(value = "釋出", notes = "publish", httpMethod = "GET")
    @GetMapping(value = "/publish")
    @ResponseBody
    public void publish(@ApiParam(value = "渠道", required = true) @RequestParam String channel,
    		@ApiParam(value = "訊息", required = true) @RequestParam String message) throws InterruptedException {   	
    	utils.publish(channel, message);
    }
}
6. 執行springboot專案,訪問http://localhost:8080/swagger-ui.html#/,測試所有完成的功能。

未解決問題:在redis叢集環境,訂閱的時候要報連線redis node失敗。