1. 程式人生 > >分散式專案(四)Mapping Server 資料對映

分散式專案(四)Mapping Server 資料對映

上回說道CoAp client和server的實現,資料也按照既定格式傳送到了kafka中,接下來就是Mapping server的實現,物理裝置資料對映到抽象裝置上,並賦予資料業務含義。

iot-mapping

構建iot-mapping模組,引入kafka公共模組

SourceListener

SourceListener監聽Coap server 傳送的原始資料,並從redis中取出web manage快取的產品物模型和裝置資料,由於redis公用的比較多,所以這也構建一個iot-redis模組,用與redis操作。

邏輯說明

  • 獲取kafka中的資料轉成KafkaSourceVO
  • 根據imei獲取redis中的RedisDeviceVO
  • 根據productId獲取RedisProductVO
  • 根據RedisProductVO中的format,確定是byte處理還是json處理
@Autowired
    private BaseRedisUtil redisUtil;
    @KafkaListener(topics = SOURCE_TOPIC)
    public void iotListener(String msg){
        System.out.println("-----------"+msg);
        KafkaSourceVO sourceVO = JSONObject.parseObject(msg,
		KafkaSourceVO.class);
        //裝置資訊
        RedisDeviceVO deviceVO = redisUtil.get(sourceVO.getImei());
        //產品資訊
        RedisProductVO productVO = redisUtil.get(deviceVO.getProductId());
        if (EDataFormat.BYTE.getFormat().equals(productVO.getFormat())){
            analysisByte(sourceVO,productVO,deviceVO);
        }else if (EDataFormat.JSON.getFormat().equals(
		productVO.getFormat())){
            analysisJson(sourceVO,productVO,deviceVO);
        }
    }

byte 解析

  • 獲取KafkaSourceVO中的原始資料data,並轉成char[]
  • 獲取產品物模型中的屬性模型RedisPropertyVO
  • 獲取屬性模型,按照屬性模型中定義的ofset,在原始資料char[]中獲取對應的值
  • 把解析出來的資料封裝為KafkaDownVO
    /**
     * 解析
     */
    public void analysisByte(KafkaSourceVO sourceVO,RedisProductVO
	productVO,RedisDeviceVO deviceVO){
        char[] chars = sourceVO.getData().toCharArray();
        List<RedisPropertyVO> propertys = productVO.getPropertys();
        List<KafkaDownVO> downVOList = new ArrayList<>(propertys.size());
        propertys.forEach(property->{
            String[] str = property.getOfset().split("-");
            int begin = Integer.valueOf(str[0]);
            int end = Integer.valueOf(str[1]);
            KafkaDownVO vo = new KafkaDownVO();
            vo.setDeviceId(deviceVO.getId());
            vo.setPropertyId(property.getId());
            StringBuilder sb = new StringBuilder();
            for (int i = begin;i <= end; i++){
                sb.append(chars[i]);
            }
            vo.setData(sb.toString());
            downVOList.add(vo);
        });
	System.out.println("byte---"+downVOList);
    }

json 解析

  • json定義中,物理裝置傳送的key與屬性模型中的identifier一一對應,而在封裝RedisPropertyVO的時候,為了與byte保持統一把identifier賦值給了ofset,所以這裡獲取屬性模型,並轉成Map<ofset,id> 格式。
  • KafkaSourceVO中的原始資料也序列化成map<key,value>
  • 便利屬性模型對應的map(propertyMap),並從原始資料map(dataMap)取出對應的資料
  • 為了資料的統一格式,這裡也把資料封裝成KafkaDownVO
public void analysisJson(KafkaSourceVO sourceVO,RedisProductVO
		productVO,RedisDeviceVO deviceVO){
        Map<String,Long> propertyMap = productVO.getPropertys().stream().
		collect(Collectors.toMap(RedisPropertyVO :: getOfset,
		RedisPropertyVO::getId));
        Map<String,String> dataMap = JSONObject.parseObject(
		sourceVO.getData(), HashMap.class);
        List<KafkaDownVO> downVOList = new ArrayList<>(dataMap.size());
        dataMap.forEach((key,val)->{
            KafkaDownVO vo = new KafkaDownVO();
            vo.setPropertyId(propertyMap.get(key));
            vo.setDeviceId(deviceVO.getId());
            vo.setData(val);
            vo.setCollTime(sourceVO.getCollTime());
            downVOList.add(vo);
        });
	System.out.println("json---"+downVOList);
    }

啟動專案,檢驗一下資料是否封裝正確

按iot-pt架構設計,現在需要把對映好的資料,再次寫入kakfa中,供訂閱費服務使用

kafka 寫入

在iot-kafka模組中新增對Mapping 資料的寫入

[@Component](https://my.oschina.net/u/3907912)
public class KafkaMapping {

    @Autowired
    private KafkaTemplate kafkaTemplate;

    public void send(List<KafkaDownVO> list){
        String json = JSONArray.toJSONString(list);
        kafkaTemplate.send(DOWN_TOPIC,json);
    }

}

修改analysisByte()和analysisJson()

public void analysisByte(KafkaSourceVO sourceVO,
		RedisProductVO productVO,RedisDeviceVO deviceVO){
        char[] chars = sourceVO.getData().toCharArray();
        List<RedisPropertyVO> propertys = productVO.getPropertys();
        List<KafkaDownVO> downVOList = new ArrayList<>(propertys.size());
        propertys.forEach(property->{
            String[] str = property.getOfset().split("-");
            int begin = Integer.valueOf(str[0]);
            int end = Integer.valueOf(str[1]);
            KafkaDownVO vo = new KafkaDownVO();
            vo.setDeviceId(deviceVO.getId());
            vo.setPropertyId(property.getId());
            StringBuilder sb = new StringBuilder();
            for (int i = begin;i <= end; i++){
                sb.append(chars[i]);
            }
            vo.setData(sb.toString());
            vo.setCollTime(sourceVO.getCollTime());
            downVOList.add(vo);
        });
        kafkaMapping.send(downVOList);
    }
public void analysisJson(KafkaSourceVO sourceVO,
		RedisProductVO productVO,RedisDeviceVO deviceVO){
        Map<String,Long> propertyMap = productVO.getPropertys().stream().
		collect(Collectors.toMap(
		RedisPropertyVO :: getOfset,RedisPropertyVO::getId));
        Map<String,String> dataMap = JSONObject.parseObject(sourceVO.getData(), HashMap.class);
        List<KafkaDownVO> downVOList = new ArrayList<>(dataMap.size());
        dataMap.forEach((key,val)->{
            KafkaDownVO vo = new KafkaDownVO();
            vo.setPropertyId(propertyMap.get(key));
            vo.setDeviceId(deviceVO.getId());
            vo.setData(val);
            vo.setCollTime(sourceVO.getCollTime());
            downVOList.add(vo);
        });
        kafkaMapping.send(downVOList);
    }

再次啟動專案,教研Mapping資料是否成寫入kakfa

結束語

接下來就是訂閱服務的實現了,請聽下回分解,具體的程式碼細節在git

https://gitee.com/d