1. 程式人生 > 實用技巧 >Flink如何做維表關聯?

Flink如何做維表關聯?

使用

RichAsyncFunction 加 CacheBuilder


CacheBuilder.newBuilder()
        //最多儲存10000條
        .maximumSize(10000)
        //過期時間為1分鐘
        .expireAfterWrite(60, TimeUnit.SECONDS)
        .build();
public class LRU extends RichAsyncFunction<String,Order> {
    private static final Logger LOGGER = LoggerFactory.getLogger(LRU.class);
    String table = "info";
    Cache<String, String> cache = null;
    private HBaseClient client = null;
    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        //建立hbase客戶端
        client = new HBaseClient("127.0.0.1","7071");
        cache = CacheBuilder.newBuilder()
                //最多儲存10000條
                .maximumSize(10000)
                //過期時間為1分鐘
                .expireAfterWrite(60, TimeUnit.SECONDS)
                .build();
    }
    @Override
    public void asyncInvoke(String input, ResultFuture<Order> resultFuture) throws Exception {
        JSONObject jsonObject = JSONObject.parseObject(input);
        Integer cityId = jsonObject.getInteger("city_id");
        String userName = jsonObject.getString("user_name");
        String items = jsonObject.getString("items");
        //讀快取
        String cacheCityName = cache.getIfPresent(cityId);
        //如果快取獲取失敗再從hbase獲取維度資料
        if(cacheCityName != null){
            Order order = new Order();
            order.setCityId(cityId);
            order.setItems(items);
            order.setUserName(userName);
            order.setCityName(cacheCityName);
            resultFuture.complete(Collections.singleton(order));
        }else {
            client.get(new GetRequest(table,String.valueOf(cityId))).addCallback((Callback<String, ArrayList<KeyValue>>) arg -> {
                for (KeyValue kv : arg) {
                    String value = new String(kv.value());
                    Order order = new Order();
                    order.setCityId(cityId);
                    order.setItems(items);
                    order.setUserName(userName);
                    order.setCityName(value);
                    resultFuture.complete(Collections.singleton(order));
                    cache.put(String.valueOf(cityId), value);
                }
                return null;
            });
        }
    }
}