1. 程式人生 > >Flink實戰| Flink+Redis實時防刷介面作弊

Flink實戰| Flink+Redis實時防刷介面作弊

隨著人口紅利的慢慢削減,網際網路產品的廝殺愈加激烈,大家開始看好下沉市場的潛力,拼多多,趣頭條等廠商通過拉新獎勵,購物優惠等政策率先搶佔使用者,壯大起來。其他各廠商也緊隨其後,紛紛推出自己產品的極速版,如今日頭條極速版,騰訊新聞極速版等,也通過拉新獎勵,閱讀獎勵等政策來吸引使用者。

對於這類APP,實時風控是必不可少的,一個比較常見的實時風控場景就是防刷介面作弊。刷介面是黑產的一種作弊手段,APP上的各種操作,一般都會對應後臺的某個介面,使用者操作APP資料就會通過介面上報到後臺,但如果黑產通過破解獲取到了APP的新增使用者介面,那他們就能跳過登陸APP步驟直接調後臺介面構造虛假資料牟利了。對於這類業務,我們可以通過Flink + Redis來實現實時防刷介面的功能。資料流圖如下所示:


刷介面作弊一般是越過登陸APP操作,直接調Server端的介面發資料,這些使用者在APP的上報日誌裡面就不存在,那我們可以通過Flink將APP實時上報上來的新增使用者寫入Redis中,然後Server端將介面上報上來的使用者與Redis裡的使用者進行比對,如果不在Redis裡面則判為刷介面使用者。

對於這個需求,得要求實時計算引擎能達到毫秒級延遲,否則會造成使用者的誤判和影響使用者體驗。為此我們選擇了Flink作為實時計算引擎。

主要程式碼邏輯如下:

//配置flink執行環境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//val env = StreamExecutionEnvironment.createLocalEnvironment()
env.enableCheckpointing(1000 * 60 * 5)
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE)
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(1000 * 60 * 3)
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
env.setStateBackend(new FsStateBackend(checkPointPath))
env.getConfig.setLatencyTrackingInterval(1000)
env.getConfig.registerTypeWithKryoSerializer(classOf[Log], classOf[ProtobufSerializer])
env.setStreamTimeCharacteristic(EventTime)
env.setParallelism(parallel)
env.getConfig.setLatencyTrackingInterval(1000)

//kafka source,實時消費kafka中日誌解析出使用者id
val stream = env.addSource(new FlinkKafkaConsumer010[Array[Log]](topic, new LogDeserializationSchema(), properties))
val data = stream.flatMap(x => x)
  .map(log =>{
    val userid = log.getUid.getUuid
    val current_time = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())
    (userid,current_time)
  }).filter(record=>{
  val userid = record._1
  var flag = false
  if(userid != null && !"".equals(userid)){
    flag = true
  }
  flag
})

//redis sink,將APP上報日誌的使用者id寫入redis供server端匹配
data.addSink(new RedisSink[(String, String)](getJedisClusterConfig, new RedisSinkMapper))
env.execute("newsinfo_active_userid_to_redis")

其中比較重要的幾點:

1 構造kafka source

val stream = env.addSource(new FlinkKafkaConsumer010[Array[Log]](topic, new LogDeserializationSchema(), properties))

一般APP上報的都是序列化的資料,我們需要定義反序列化方法,LogDeserializationSchema 是一個protobuf型別的反序列化方法。

//將kafka中的資料解析為google protobuf 的Log,一個message可能包含多條Log
class LogDeserializationSchema extends AbstractDeserializationSchema[Array[Log]] {
  override def deserialize(message: Array[Byte]): Array[Log] = {
    val data = ArrayBuffer[Log]()
    val input = new ByteArrayInputStream(message)
    while (input.available() > 0) {
      try {
        data += Log.parseDelimitedFrom(input)
      } catch {
        case _: Throwable =>
      }
    }
    input.close()
    data.toArray
  }
}

2 redis sink

這裡用的是網上開源的flink-connector-redis依賴庫。
更多相關內容見:http://bahir.apache.org/docs/flink/current/flink-streaming-redis

Maven依賴如下

<dependency>
    <groupId>org.apache.bahir</groupId>
    <artifactId>flink-connector-redis_2.11</artifactId>
    <version>1.1-SNAPSHOT</version>
</dependency>

Redis Sink 提供用於向Redis傳送資料的介面的類。接收器可以使用三種不同的方法與不同型別的Redis環境進行通訊:

  • 單Redis伺服器
  • Redis叢集
  • Redis Sentinel

Redis Sink 核心類是 RedisMappe 是一個介面,使用時我們要編寫自己的redis操作類實現這個介面中的三個方法,如下所示:

class RedisExampleMapper extends RedisMapper[(String, String)]{
  override def getCommandDescription: RedisCommandDescription = {
    new RedisCommandDescription(RedisCommand.HSET, "HASH_NAME")
  }

  override def getKeyFromData(data: (String, String)): String = data._1

  override def getValueFromData(data: (String, String)): String = data._2
}
val conf = new FlinkJedisPoolConfig.Builder().setHost("127.0.0.1").build()
stream.addSink(new RedisSink[(String, String)](conf, new RedisExampleMapper))

使用RedisCommand設定資料結構型別時和redis結構對應關係。

以上我們利用 Flink + Redis 實時了一個基本的實時防刷介面模型。關注微信公眾號《大資料技術進階》,觀看更多大資料實戰文章