Flink如何做維表關聯?
阿新 • • 發佈:2020-11-25
使用
RichAsyncFunction 加 CacheBuilder
CacheBuilder.newBuilder()
//最多儲存10000條
.maximumSize(10000)
//過期時間為1分鐘
.expireAfterWrite(60, TimeUnit.SECONDS)
.build();
public class LRU extends RichAsyncFunction<String,Order> { private static final Logger LOGGER = LoggerFactory.getLogger(LRU.class); String table = "info"; Cache<String, String> cache = null; private HBaseClient client = null; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); //建立hbase客戶端 client = new HBaseClient("127.0.0.1","7071"); cache = CacheBuilder.newBuilder() //最多儲存10000條 .maximumSize(10000) //過期時間為1分鐘 .expireAfterWrite(60, TimeUnit.SECONDS) .build(); } @Override public void asyncInvoke(String input, ResultFuture<Order> resultFuture) throws Exception { JSONObject jsonObject = JSONObject.parseObject(input); Integer cityId = jsonObject.getInteger("city_id"); String userName = jsonObject.getString("user_name"); String items = jsonObject.getString("items"); //讀快取 String cacheCityName = cache.getIfPresent(cityId); //如果快取獲取失敗再從hbase獲取維度資料 if(cacheCityName != null){ Order order = new Order(); order.setCityId(cityId); order.setItems(items); order.setUserName(userName); order.setCityName(cacheCityName); resultFuture.complete(Collections.singleton(order)); }else { client.get(new GetRequest(table,String.valueOf(cityId))).addCallback((Callback<String, ArrayList<KeyValue>>) arg -> { for (KeyValue kv : arg) { String value = new String(kv.value()); Order order = new Order(); order.setCityId(cityId); order.setItems(items); order.setUserName(userName); order.setCityName(value); resultFuture.complete(Collections.singleton(order)); cache.put(String.valueOf(cityId), value); } return null; }); } } }