分散式專案(四)Mapping Server 資料對映
阿新 • • 發佈:2019-04-21
上回說道CoAp client和server的實現,資料也按照既定格式傳送到了kafka中,接下來就是Mapping server的實現,物理裝置資料對映到抽象裝置上,並賦予資料業務含義。
iot-mapping
構建iot-mapping模組,引入kafka公共模組
SourceListener
SourceListener監聽Coap server 傳送的原始資料,並從redis中取出web manage快取的產品物模型和裝置資料,由於redis公用的比較多,所以這也構建一個iot-redis模組,用與redis操作。
邏輯說明
- 獲取kafka中的資料轉成KafkaSourceVO
- 根據imei獲取redis中的RedisDeviceVO
- 根據productId獲取RedisProductVO
- 根據RedisProductVO中的format,確定是byte處理還是json處理
@Autowired private BaseRedisUtil redisUtil; @KafkaListener(topics = SOURCE_TOPIC) public void iotListener(String msg){ System.out.println("-----------"+msg); KafkaSourceVO sourceVO = JSONObject.parseObject(msg, KafkaSourceVO.class); //裝置資訊 RedisDeviceVO deviceVO = redisUtil.get(sourceVO.getImei()); //產品資訊 RedisProductVO productVO = redisUtil.get(deviceVO.getProductId()); if (EDataFormat.BYTE.getFormat().equals(productVO.getFormat())){ analysisByte(sourceVO,productVO,deviceVO); }else if (EDataFormat.JSON.getFormat().equals( productVO.getFormat())){ analysisJson(sourceVO,productVO,deviceVO); } }
byte 解析
- 獲取KafkaSourceVO中的原始資料data,並轉成char[]
- 獲取產品物模型中的屬性模型RedisPropertyVO
- 獲取屬性模型,按照屬性模型中定義的ofset,在原始資料char[]中獲取對應的值
- 把解析出來的資料封裝為KafkaDownVO
/** * 解析 */ public void analysisByte(KafkaSourceVO sourceVO,RedisProductVO productVO,RedisDeviceVO deviceVO){ char[] chars = sourceVO.getData().toCharArray(); List<RedisPropertyVO> propertys = productVO.getPropertys(); List<KafkaDownVO> downVOList = new ArrayList<>(propertys.size()); propertys.forEach(property->{ String[] str = property.getOfset().split("-"); int begin = Integer.valueOf(str[0]); int end = Integer.valueOf(str[1]); KafkaDownVO vo = new KafkaDownVO(); vo.setDeviceId(deviceVO.getId()); vo.setPropertyId(property.getId()); StringBuilder sb = new StringBuilder(); for (int i = begin;i <= end; i++){ sb.append(chars[i]); } vo.setData(sb.toString()); downVOList.add(vo); }); System.out.println("byte---"+downVOList); }
json 解析
- json定義中,物理裝置傳送的key與屬性模型中的identifier一一對應,而在封裝RedisPropertyVO的時候,為了與byte保持統一把identifier賦值給了ofset,所以這裡獲取屬性模型,並轉成Map<ofset,id> 格式。
- KafkaSourceVO中的原始資料也序列化成map<key,value>
- 便利屬性模型對應的map(propertyMap),並從原始資料map(dataMap)取出對應的資料
- 為了資料的統一格式,這裡也把資料封裝成KafkaDownVO
public void analysisJson(KafkaSourceVO sourceVO,RedisProductVO
productVO,RedisDeviceVO deviceVO){
Map<String,Long> propertyMap = productVO.getPropertys().stream().
collect(Collectors.toMap(RedisPropertyVO :: getOfset,
RedisPropertyVO::getId));
Map<String,String> dataMap = JSONObject.parseObject(
sourceVO.getData(), HashMap.class);
List<KafkaDownVO> downVOList = new ArrayList<>(dataMap.size());
dataMap.forEach((key,val)->{
KafkaDownVO vo = new KafkaDownVO();
vo.setPropertyId(propertyMap.get(key));
vo.setDeviceId(deviceVO.getId());
vo.setData(val);
vo.setCollTime(sourceVO.getCollTime());
downVOList.add(vo);
});
System.out.println("json---"+downVOList);
}
啟動專案,檢驗一下資料是否封裝正確
按iot-pt架構設計,現在需要把對映好的資料,再次寫入kakfa中,供訂閱費服務使用
kafka 寫入
在iot-kafka模組中新增對Mapping 資料的寫入
[@Component](https://my.oschina.net/u/3907912)
public class KafkaMapping {
@Autowired
private KafkaTemplate kafkaTemplate;
public void send(List<KafkaDownVO> list){
String json = JSONArray.toJSONString(list);
kafkaTemplate.send(DOWN_TOPIC,json);
}
}
修改analysisByte()和analysisJson()
public void analysisByte(KafkaSourceVO sourceVO,
RedisProductVO productVO,RedisDeviceVO deviceVO){
char[] chars = sourceVO.getData().toCharArray();
List<RedisPropertyVO> propertys = productVO.getPropertys();
List<KafkaDownVO> downVOList = new ArrayList<>(propertys.size());
propertys.forEach(property->{
String[] str = property.getOfset().split("-");
int begin = Integer.valueOf(str[0]);
int end = Integer.valueOf(str[1]);
KafkaDownVO vo = new KafkaDownVO();
vo.setDeviceId(deviceVO.getId());
vo.setPropertyId(property.getId());
StringBuilder sb = new StringBuilder();
for (int i = begin;i <= end; i++){
sb.append(chars[i]);
}
vo.setData(sb.toString());
vo.setCollTime(sourceVO.getCollTime());
downVOList.add(vo);
});
kafkaMapping.send(downVOList);
}
public void analysisJson(KafkaSourceVO sourceVO,
RedisProductVO productVO,RedisDeviceVO deviceVO){
Map<String,Long> propertyMap = productVO.getPropertys().stream().
collect(Collectors.toMap(
RedisPropertyVO :: getOfset,RedisPropertyVO::getId));
Map<String,String> dataMap = JSONObject.parseObject(sourceVO.getData(), HashMap.class);
List<KafkaDownVO> downVOList = new ArrayList<>(dataMap.size());
dataMap.forEach((key,val)->{
KafkaDownVO vo = new KafkaDownVO();
vo.setPropertyId(propertyMap.get(key));
vo.setDeviceId(deviceVO.getId());
vo.setData(val);
vo.setCollTime(sourceVO.getCollTime());
downVOList.add(vo);
});
kafkaMapping.send(downVOList);
}
再次啟動專案,教研Mapping資料是否成寫入kakfa
結束語
接下來就是訂閱服務的實現了,請聽下回分解,具體的程式碼細節在git