1. 程式人生 > >redis快取和mysql資料庫同步

redis快取和mysql資料庫同步

解決方案

一、對強一致要求比較高的,應採用實時同步方案,即查詢快取查詢不到再從DB查詢,儲存到快取;更新快取時,先更新資料庫,再將快取的設定過期(建議不要去更新快取內容,直接設定快取過期)。

二、對於併發程度較高的,可採用非同步佇列的方式同步,可採用kafka等訊息中介軟體處理訊息生產和消費。

三、使用阿里的同步工具canal,canal實現方式是模擬mysql slave和master的同步機制,監控DB bitlog的日誌更新來觸發快取的更新,此種方法可以解放程式設計師雙手,減少工作量,但在使用時有些侷限性。

四、採用UDF自定義函式的方式,面對mysql的API進行程式設計,利用觸發器進行快取同步,但UDF主要是c/c++語言實現,學習成本高。

實時同步

spring3+提供了註解的方式進行快取程式設計

@Cacheable(key = "caches[0].name + T(String).valueOf(#userId)",unless = "#result eq null")
@CachePut(key = "caches[0].name + T(String).valueOf(#user.userId)")
@CacheEvict(key = "caches[0].name + T(String).valueOf(#userId)" )
@Caching(evict = {@CacheEvict(key = "caches[0].name + T(String).valueOf(#userId)" ),
@CacheEvict(key = "caches[0].name + #result.name" )})
@Cacheable:查詢時使用,注意Long型別需轉換為Sting型別,否則會拋異常
@CachePut:更新時使用,使用此註解,一定會從DB上查詢資料
@CacheEvict:刪除時使用;
@Caching:組合用法      具體註解的使用可參考官網

注意:註解方式雖然能使我們的程式碼簡潔,但是註解方式有侷限性:對key的獲取,以及巢狀使用時註解無效,如下所示
複製程式碼
public class User {
    private Long userId;
    private String name;
    private Integer age;
    private String sex;
    
private String addr;
  //get set ..... }
複製程式碼

service介面

1 2 3 4 5 6 7 public interface UserService { User getUser(Long userId); User updateUser(User user); User getUserByName(String name); int insertUser(User user); User  delete (Long userId); }<br>//實現類<br>//假設有需求是由name查詢user的,一般我們是先由name->id,再由id->user,這樣會減少redis快取的冗餘資訊
@Service(value = "userSerivceImpl")
@CacheConfig(cacheNames = "user")
public class UserServiceImpl implements UserService {
private static Logger log = LoggerFactory.getLogger(UserServiceImpl.class);
@Autowired
UserMapper userMapper;

@Cacheable(key = "caches[0].name + T(String).valueOf(#userId)",unless = "#result eq null")
public User getUser(Long userId) {
User user = userMapper.selectByPrimaryKey(userId);
return user;
}
@Cacheable(key = "caches[0].name + #name")
public String getIdByName(String name){
Long userId = userMapper.getIdByName(name);
return String.valueOf(userId);
}

//使用getUserByName方式呼叫getIdByName 和getUser方法來實現查詢,但是如果用此方式在controller中直接呼叫
//getUserByName方法,快取效果是不起作用的,必須是直接呼叫getIdByName和getUser方法才能起作用
    public User getUserByName(String name) {
//通過name 查詢到主鍵 再由主鍵查詢實體
return getUser(Long.valueOf(getIdByName(name)));
}

非註解方式實現

1.先定義一個RedisCacheConfig類用於生成RedisTemplate和對CacheManager的管理

複製程式碼
@Configuration
public class RedisCacheConfig  extends CachingConfigurerSupport {

    /*定義快取資料 key 生成策略的bean
     *包名+類名+方法名+所有引數
    */
    @Bean
    public KeyGenerator keyGenerator() {
        return new KeyGenerator() {
            @Override
            public Object generate(Object target, Method method, Object... params) {
                StringBuilder sb = new StringBuilder();
                sb.append(target.getClass().getName());
                sb.append(method.getName());
                for (Object obj : params) {
                    sb.append(obj.toString());
                }
                return sb.toString();
            }
        };
    }

     //@Bean
     public CacheManager cacheManager(
             @SuppressWarnings("rawtypes") RedisTemplate redisTemplate) {
         //RedisCacheManager cacheManager = new RedisCacheManager(redisTemplate);
           //cacheManager.setDefaultExpiration(60);//設定快取保留時間(seconds)
         return cacheManager;
     }

    //1.專案啟動時此方法先被註冊成bean被spring管理
    @Bean
    public StringRedisTemplate stringRedisTemplate(RedisConnectionFactory factory) {

        StringRedisTemplate template = new StringRedisTemplate(factory);
        Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
        ObjectMapper om = new ObjectMapper();
        om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
        jackson2JsonRedisSerializer.setObjectMapper(om);
        template.setValueSerializer(jackson2JsonRedisSerializer);
        template.afterPropertiesSet();
        return template;
    }

    @Bean
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory connectionFactory) {
        RedisTemplate<String, Object> template = new RedisTemplate<>();
        template.setConnectionFactory(connectionFactory);

        //使用Jackson2JsonRedisSerializer來序列化和反序列化redis的value值
        Jackson2JsonRedisSerializer serializer = new Jackson2JsonRedisSerializer(Object.class);

        System.out.println("==============obj:"+Object.class.getName());
        ObjectMapper mapper = new ObjectMapper();
        mapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        mapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
        serializer.setObjectMapper(mapper);

        template.setValueSerializer(serializer);
        //使用StringRedisSerializer來序列化和反序列化redis的key值
        template.setKeySerializer(new StringRedisSerializer());
        template.afterPropertiesSet();
        return template;
    }
}
複製程式碼

2.定義一個redisUtil類用於存取快取值

複製程式碼
@Component
public class RedisCacheUtil {

    @Autowired
    private StringRedisTemplate stringRedisTemplate;
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;

    /**
     * 儲存字串
     * @param key string型別的key
     * @param value String型別的value
     */
    public void set(String key, String value) {
        stringRedisTemplate.opsForValue().set(key, value);
    }

    /**
     * 儲存物件
     * @param key String型別的key
     * @param value Object型別的value
     */
    public void set(String key, Object value) {
        redisTemplate.opsForValue().set(key, value);
    }

    /**
     * 儲存物件
     * @param key String型別的key
     * @param value Object型別的value
     */
    public void set(String key, Object value,Long timeOut) {
        redisTemplate.opsForValue().set(key, value,timeOut, TimeUnit.SECONDS);
    }

    /**
     * 根據key獲取字串資料
     * @param key
     * @return
     */
    public String getValue(String key) {
        return stringRedisTemplate.opsForValue().get(key);
    }

//    public Object getValue(String key) {
//        return redisTemplate.opsForValue().get(key);
//    }
    /**
     * 根據key獲取物件
     * @param key
     * @return
     */
    public Object getValueOfObject(String key) {
        return redisTemplate.opsForValue().get(key);
    }
    /**
     * 根據key刪除快取資訊
     * @param key
     */
    public void delete(String key) {
        redisTemplate.delete(key);
    }
    /**
     * 查詢key是否存在
     * @param key
     * @return
     */
    @SuppressWarnings("unchecked")
    public boolean exists(String key) {
        return redisTemplate.hasKey(key);
    }
}
複製程式碼

3.實現類

複製程式碼
/**
 * Created by yexin on 2017/9/8.
 *
 * 在Impl基礎上+ 防止快取雪崩和快取穿透功能
 */
@Service(value = "userServiceImpl4")
public class UserServiceImpl4 implements UserService {

    @Autowired
    UserMapper userMapper;

    @Autowired
    RedisCacheUtil redisCacheUtil;

    @Value("${timeOut}")
    private long timeOut;

    @Override
    public User getUser(Long userId) {

        String key = "user" + userId;
        User user = (User) redisCacheUtil.getValueOfObject(key);
        String keySign = key + "_sign";
        String valueSign = redisCacheUtil.getValue(keySign);
        if(user == null){//防止第一次查詢時返回時空結果
            //防止快取穿透
            if(redisCacheUtil.exists(key)){
                return  null;
            }
            user = userMapper.selectByPrimaryKey(userId);

            redisCacheUtil.set(key,user);
            redisCacheUtil.set(keySign,"1",timeOut *(new Random().nextInt(10) + 1));
//            redisCacheUtil.set(keySign,"1",0L);  //過期時間不能設定為0,必須比0大的數
            return user;
        }

        if(valueSign != null){
            return user;
        }else {
            //設定標記的實效時間
            Long tt = timeOut * (new Random().nextInt(10) + 1);
            System.out.println("tt:"+tt);
            redisCacheUtil.set(keySign,"1",tt);
            //非同步處理快取更新  應對與高併發的情況,會產生髒讀的情況
            ThreadPoolUtil.getExecutorService().execute(new Runnable(){
                public void run() { //
                    System.out.println("-----執行非同步操作-----");
                    User user1 = userMapper.selectByPrimaryKey(userId);
                    redisCacheUtil.set(key,user1);
                }
            });

//            new Thread(){
//                public void run() { //應對與高併發的情況,會產生髒讀的情況
//                    System.out.println("-----執行非同步操作-----");
//                    User user1 = userMapper.selectByPrimaryKey(userId);
//                    redisCacheUtil.set(key,user1);
//                }
//            }.start();
        }
        return user;
    }
}
複製程式碼

非同步實現

非同步實現通過kafka作為訊息佇列實現,非同步只針對更新操作,查詢無需非同步,實現類如下

1.pom檔案需依賴

<dependency>
     <groupId>org.springframework.cloud</groupId>
     <artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>

2.生產著程式碼

複製程式碼
@EnableBinding(Source.class)
public class SendService {
    @Autowired
    private Source source;
    public void sendMessage(String msg) {
        try{
            source.output().send(MessageBuilder.withPayload(msg).build());
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
//接受的是一個實體類,具體配置在application.yml
    public void sendMessage(TransMsg msg) {
        try {
            //MessageBuilder.withPayload(msg).setHeader(KafkaHeaders.TOPIC,"111111").build();
            source.output().send(MessageBuilder.withPayload(msg).build());
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
複製程式碼

3.消費者程式碼

複製程式碼
@EnableBinding(Sink.class)
public class MsgSink {
    @Resource(name = "userSerivceImpl3")
    UserService userService;
    @StreamListener(Sink.INPUT)
    public void process(TransMsg<?> msg) throws NoSuchMethodException, InvocationTargetException, IllegalAccessException, ClassNotFoundException {
        System.out.println("sink......"+msg);
        System.out.println("opt db strat ----");
        userService.updateUser((User) msg.getParams());
        System.out.println("執行db結束------");
    }
}
複製程式碼

4.application.yml配置

複製程式碼
spring:
  application:
    name: demo-provider
  redis:
    database: 0
    host: 192.168.252.128
    #host: localhost
    port: 6379
    password:
    pool:
      max-active: 50
      max-wait: -1
      max-idle: 50
    timeout: 0
#kafka
  cloud:
      stream:
        kafka:
          binder:
            brokers: 192.168.252.128:9092
            zk-nodes: 192.168.252.128:2181
            minPartitionCount: 1
            autoCreateTopics: true
            autoAddPartitions: true
        bindings:
          input:
            destination: topic-02
#            content-type: application/json
            content-type: application/x-java-object   #此種類型配置在消費端接受到的為一個實體類
            group: t1
            consumer:
              concurrency: 1
              partitioned: false
          output:
            destination: topic-02
            content-type: application/x-java-object             
            producer:
              partitionCount: 1
        instance-count: 1
        instance-index: 0
複製程式碼

5.實現類

複製程式碼
@Service(value = "userServiceImpl2")
public class UserServiceImpl2  implements UserService{
    @Autowired
    UserMapper userMapper;
    @Autowired
    RedisCacheUtil redisCacheUtil;
    private static Logger log = LoggerFactory.getLogger(UserServiceImpl.class);
    @Autowired
    SendService sendService;
    public User updateUser(User user) {
        System.out.println("   impl2   active   ");
        String key = "user"+ user.getUserId();
        System.out.println("key:"+key);
        //是否存在key
        if(!redisCacheUtil.exists(key)){
         return userMapper.updateByPrimaryKeySelective(user) == 1 ? user : null;
        }
        /*  更新key對應的value
            更新佇列
         */
        User user1 = (User)redisCacheUtil.getValueOfObject(key);
        try {
            redisCacheUtil.set(key,user);
            TransMsg<User> msg = new TransMsg<User>(key,user,this.getClass().getName(),"updateUser",user);
            sendService.sendMessage(msg);

        }catch (Exception e){
            redisCacheUtil.set(key,user1);
        }
        return user;
    }
}
複製程式碼

注意:kafka與zookeeper的配置在此不介紹

canal實現方式

先要安裝canal,配置canal的example檔案等,配置暫不介紹

複製程式碼
package org.example.canal;

import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.common.utils.AddressUtils;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
import org.example.canal.util.RedisUtil;

import java.net.InetSocketAddress;
import java.util.List;

public class CanalClient {

    public static void main(String[] args) {
        // 建立連結
        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),
                11111), "example", "", "");
        int batchSize = 1000;

        try {
            connector.connect();
            connector.subscribe(".*\\..*");
            connector.rollback();
            while (true) {
                Message message = connector.getWithoutAck(batchSize); // 獲取指定數量的資料
                long batchId = message.getId();
                int size = message.getEntries().size();
                if (batchId == -1 || size == 0) {
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                } else {
                    printEntry(message.getEntries());
                }
                connector.ack(batchId); // 提交確認
                // connector.rollback(batchId); // 處理失敗, 回滾資料
            }
        } finally {
            connector.disconnect();
        }
    }

    private static void printEntry( List<Entry> entrys) {
        for (Entry entry : entrys) {
            if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
                continue;
            }