redis快取和mysql資料庫同步
阿新 • • 發佈:2019-02-20
解決方案
一、對強一致要求比較高的,應採用實時同步方案,即查詢快取查詢不到再從DB查詢,儲存到快取;更新快取時,先更新資料庫,再將快取的設定過期(建議不要去更新快取內容,直接設定快取過期)。
二、對於併發程度較高的,可採用非同步佇列的方式同步,可採用kafka等訊息中介軟體處理訊息生產和消費。
三、使用阿里的同步工具canal,canal實現方式是模擬mysql slave和master的同步機制,監控DB bitlog的日誌更新來觸發快取的更新,此種方法可以解放程式設計師雙手,減少工作量,但在使用時有些侷限性。
四、採用UDF自定義函式的方式,面對mysql的API進行程式設計,利用觸發器進行快取同步,但UDF主要是c/c++語言實現,學習成本高。
實時同步
spring3+提供了註解的方式進行快取程式設計
@Cacheable(key = "caches[0].name + T(String).valueOf(#userId)",unless = "#result eq null")
@CachePut(key = "caches[0].name + T(String).valueOf(#user.userId)")
@CacheEvict(key = "caches[0].name + T(String).valueOf(#userId)" )
@Caching(evict = {@CacheEvict(key = "caches[0].name + T(String).valueOf(#userId)" ),
@CacheEvict(key = "caches[0].name + #result.name" )})
@Cacheable:查詢時使用,注意Long型別需轉換為Sting型別,否則會拋異常
@CachePut:更新時使用,使用此註解,一定會從DB上查詢資料
@CacheEvict:刪除時使用;
@Caching:組合用法 具體註解的使用可參考官網
注意:註解方式雖然能使我們的程式碼簡潔,但是註解方式有侷限性:對key的獲取,以及巢狀使用時註解無效,如下所示
public class User { private Long userId; private String name; private Integer age; private String sex;private String addr;
//get set ..... }
service介面
1 2 3 4 5 6 7 |
public interface UserService
{
User
getUser(Long userId);
User
updateUser(User user);
User
getUserByName(String name);
int insertUser(User
user);
User
delete (Long userId);
}<br> //實現類<br>//假設有需求是由name查詢user的,一般我們是先由name->id,再由id->user,這樣會減少redis快取的冗餘資訊
|
@Service(value = "userSerivceImpl")
@CacheConfig(cacheNames = "user")
public class UserServiceImpl implements UserService {
private static Logger log = LoggerFactory.getLogger(UserServiceImpl.class);
@Autowired
UserMapper userMapper;
@Cacheable(key = "caches[0].name + T(String).valueOf(#userId)",unless = "#result eq null")
public User getUser(Long userId) {
User user = userMapper.selectByPrimaryKey(userId);
return user;
}
@Cacheable(key = "caches[0].name + #name")
public String getIdByName(String name){
Long userId = userMapper.getIdByName(name);
return String.valueOf(userId);
}
//使用getUserByName方式呼叫getIdByName 和getUser方法來實現查詢,但是如果用此方式在controller中直接呼叫
//getUserByName方法,快取效果是不起作用的,必須是直接呼叫getIdByName和getUser方法才能起作用
public User getUserByName(String name) {
//通過name 查詢到主鍵 再由主鍵查詢實體
return getUser(Long.valueOf(getIdByName(name)));
}
非註解方式實現
1.先定義一個RedisCacheConfig類用於生成RedisTemplate和對CacheManager的管理
@Configuration public class RedisCacheConfig extends CachingConfigurerSupport { /*定義快取資料 key 生成策略的bean *包名+類名+方法名+所有引數 */ @Bean public KeyGenerator keyGenerator() { return new KeyGenerator() { @Override public Object generate(Object target, Method method, Object... params) { StringBuilder sb = new StringBuilder(); sb.append(target.getClass().getName()); sb.append(method.getName()); for (Object obj : params) { sb.append(obj.toString()); } return sb.toString(); } }; } //@Bean public CacheManager cacheManager( @SuppressWarnings("rawtypes") RedisTemplate redisTemplate) { //RedisCacheManager cacheManager = new RedisCacheManager(redisTemplate); //cacheManager.setDefaultExpiration(60);//設定快取保留時間(seconds) return cacheManager; } //1.專案啟動時此方法先被註冊成bean被spring管理 @Bean public StringRedisTemplate stringRedisTemplate(RedisConnectionFactory factory) { StringRedisTemplate template = new StringRedisTemplate(factory); Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class); ObjectMapper om = new ObjectMapper(); om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL); jackson2JsonRedisSerializer.setObjectMapper(om); template.setValueSerializer(jackson2JsonRedisSerializer); template.afterPropertiesSet(); return template; } @Bean public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory connectionFactory) { RedisTemplate<String, Object> template = new RedisTemplate<>(); template.setConnectionFactory(connectionFactory); //使用Jackson2JsonRedisSerializer來序列化和反序列化redis的value值 Jackson2JsonRedisSerializer serializer = new Jackson2JsonRedisSerializer(Object.class); System.out.println("==============obj:"+Object.class.getName()); ObjectMapper mapper = new ObjectMapper(); mapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); mapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL); serializer.setObjectMapper(mapper); template.setValueSerializer(serializer); //使用StringRedisSerializer來序列化和反序列化redis的key值 template.setKeySerializer(new StringRedisSerializer()); template.afterPropertiesSet(); return template; } }
2.定義一個redisUtil類用於存取快取值
@Component public class RedisCacheUtil { @Autowired private StringRedisTemplate stringRedisTemplate; @Autowired private RedisTemplate<String, Object> redisTemplate; /** * 儲存字串 * @param key string型別的key * @param value String型別的value */ public void set(String key, String value) { stringRedisTemplate.opsForValue().set(key, value); } /** * 儲存物件 * @param key String型別的key * @param value Object型別的value */ public void set(String key, Object value) { redisTemplate.opsForValue().set(key, value); } /** * 儲存物件 * @param key String型別的key * @param value Object型別的value */ public void set(String key, Object value,Long timeOut) { redisTemplate.opsForValue().set(key, value,timeOut, TimeUnit.SECONDS); } /** * 根據key獲取字串資料 * @param key * @return */ public String getValue(String key) { return stringRedisTemplate.opsForValue().get(key); } // public Object getValue(String key) { // return redisTemplate.opsForValue().get(key); // } /** * 根據key獲取物件 * @param key * @return */ public Object getValueOfObject(String key) { return redisTemplate.opsForValue().get(key); } /** * 根據key刪除快取資訊 * @param key */ public void delete(String key) { redisTemplate.delete(key); } /** * 查詢key是否存在 * @param key * @return */ @SuppressWarnings("unchecked") public boolean exists(String key) { return redisTemplate.hasKey(key); } }
3.實現類
/** * Created by yexin on 2017/9/8. * * 在Impl基礎上+ 防止快取雪崩和快取穿透功能 */ @Service(value = "userServiceImpl4") public class UserServiceImpl4 implements UserService { @Autowired UserMapper userMapper; @Autowired RedisCacheUtil redisCacheUtil; @Value("${timeOut}") private long timeOut; @Override public User getUser(Long userId) { String key = "user" + userId; User user = (User) redisCacheUtil.getValueOfObject(key); String keySign = key + "_sign"; String valueSign = redisCacheUtil.getValue(keySign); if(user == null){//防止第一次查詢時返回時空結果 //防止快取穿透 if(redisCacheUtil.exists(key)){ return null; } user = userMapper.selectByPrimaryKey(userId); redisCacheUtil.set(key,user); redisCacheUtil.set(keySign,"1",timeOut *(new Random().nextInt(10) + 1)); // redisCacheUtil.set(keySign,"1",0L); //過期時間不能設定為0,必須比0大的數 return user; } if(valueSign != null){ return user; }else { //設定標記的實效時間 Long tt = timeOut * (new Random().nextInt(10) + 1); System.out.println("tt:"+tt); redisCacheUtil.set(keySign,"1",tt); //非同步處理快取更新 應對與高併發的情況,會產生髒讀的情況 ThreadPoolUtil.getExecutorService().execute(new Runnable(){ public void run() { // System.out.println("-----執行非同步操作-----"); User user1 = userMapper.selectByPrimaryKey(userId); redisCacheUtil.set(key,user1); } }); // new Thread(){ // public void run() { //應對與高併發的情況,會產生髒讀的情況 // System.out.println("-----執行非同步操作-----"); // User user1 = userMapper.selectByPrimaryKey(userId); // redisCacheUtil.set(key,user1); // } // }.start(); } return user; } }
非同步實現
非同步實現通過kafka作為訊息佇列實現,非同步只針對更新操作,查詢無需非同步,實現類如下
1.pom檔案需依賴
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
2.生產著程式碼
@EnableBinding(Source.class) public class SendService { @Autowired private Source source; public void sendMessage(String msg) { try{ source.output().send(MessageBuilder.withPayload(msg).build()); } catch (Exception e) { e.printStackTrace(); } } //接受的是一個實體類,具體配置在application.yml public void sendMessage(TransMsg msg) { try { //MessageBuilder.withPayload(msg).setHeader(KafkaHeaders.TOPIC,"111111").build(); source.output().send(MessageBuilder.withPayload(msg).build()); } catch (Exception e) { e.printStackTrace(); } } }
3.消費者程式碼
@EnableBinding(Sink.class) public class MsgSink { @Resource(name = "userSerivceImpl3") UserService userService; @StreamListener(Sink.INPUT) public void process(TransMsg<?> msg) throws NoSuchMethodException, InvocationTargetException, IllegalAccessException, ClassNotFoundException { System.out.println("sink......"+msg); System.out.println("opt db strat ----"); userService.updateUser((User) msg.getParams()); System.out.println("執行db結束------"); } }
4.application.yml配置
spring: application: name: demo-provider redis: database: 0 host: 192.168.252.128 #host: localhost port: 6379 password: pool: max-active: 50 max-wait: -1 max-idle: 50 timeout: 0 #kafka cloud: stream: kafka: binder: brokers: 192.168.252.128:9092 zk-nodes: 192.168.252.128:2181 minPartitionCount: 1 autoCreateTopics: true autoAddPartitions: true bindings: input: destination: topic-02 # content-type: application/json content-type: application/x-java-object #此種類型配置在消費端接受到的為一個實體類 group: t1 consumer: concurrency: 1 partitioned: false output: destination: topic-02 content-type: application/x-java-object producer: partitionCount: 1 instance-count: 1 instance-index: 0
5.實現類
@Service(value = "userServiceImpl2") public class UserServiceImpl2 implements UserService{ @Autowired UserMapper userMapper; @Autowired RedisCacheUtil redisCacheUtil; private static Logger log = LoggerFactory.getLogger(UserServiceImpl.class); @Autowired SendService sendService; public User updateUser(User user) { System.out.println(" impl2 active "); String key = "user"+ user.getUserId(); System.out.println("key:"+key); //是否存在key if(!redisCacheUtil.exists(key)){ return userMapper.updateByPrimaryKeySelective(user) == 1 ? user : null; } /* 更新key對應的value 更新佇列 */ User user1 = (User)redisCacheUtil.getValueOfObject(key); try { redisCacheUtil.set(key,user); TransMsg<User> msg = new TransMsg<User>(key,user,this.getClass().getName(),"updateUser",user); sendService.sendMessage(msg); }catch (Exception e){ redisCacheUtil.set(key,user1); } return user; } }
注意:kafka與zookeeper的配置在此不介紹
canal實現方式
先要安裝canal,配置canal的example檔案等,配置暫不介紹
package org.example.canal; import com.alibaba.fastjson.JSONObject; import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.client.CanalConnectors; import com.alibaba.otter.canal.common.utils.AddressUtils; import com.alibaba.otter.canal.protocol.Message; import com.alibaba.otter.canal.protocol.CanalEntry.Column; import com.alibaba.otter.canal.protocol.CanalEntry.Entry; import com.alibaba.otter.canal.protocol.CanalEntry.EntryType; import com.alibaba.otter.canal.protocol.CanalEntry.EventType; import com.alibaba.otter.canal.protocol.CanalEntry.RowChange; import com.alibaba.otter.canal.protocol.CanalEntry.RowData; import org.example.canal.util.RedisUtil; import java.net.InetSocketAddress; import java.util.List; public class CanalClient { public static void main(String[] args) { // 建立連結 CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(), 11111), "example", "", ""); int batchSize = 1000; try { connector.connect(); connector.subscribe(".*\\..*"); connector.rollback(); while (true) { Message message = connector.getWithoutAck(batchSize); // 獲取指定數量的資料 long batchId = message.getId(); int size = message.getEntries().size(); if (batchId == -1 || size == 0) { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } else { printEntry(message.getEntries()); } connector.ack(batchId); // 提交確認 // connector.rollback(batchId); // 處理失敗, 回滾資料 } } finally { connector.disconnect(); } } private static void printEntry( List<Entry> entrys) { for (Entry entry : entrys) { if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) { continue; }