1. 程式人生 > >Flume-ng 資料傳送速度限制

Flume-ng 資料傳送速度限制

        按理說,應該在sink端限制資料的傳送速度,但flume-ng提供了非常便利的interceptor模式,因此本文,就只是在source端簡單的實現了對資料傳送速度的限制。

package com.xxx.flume.core.interceptor;

import java.util.List;
import org.slf4j.Logger;
import org.apache.flume.Event;
import org.slf4j.LoggerFactory;
import org.apache.flume.Context;
import org.apache.flume.interceptor.Interceptor;


public class LimitInterceptor implements Interceptor {
    private static final Logger logger = LoggerFactory.getLogger(LimitInterceptor.class);

    private static long KB = 1024L;

    private long lastEventSentTick = System.nanoTime();

    private long pastSentLength = 0L;
    private long max;
    private long timeCostPerCheck = 1000000000L;

    private long headerSize = 0L;

    private boolean flag = true;

    private int num = 0;

    public LimitInterceptor(long limitRate, long headerSize) {
        this.max = (limitRate * KB);
        this.headerSize = headerSize;
    }

    public void initialize() {
    }

    public Event intercept(Event event) {
        this.num += 1;
        if (this.pastSentLength > this.max) {
            long nowTick = System.nanoTime();
            long multiple = this.pastSentLength / this.max;
            long missedTime = multiple * this.timeCostPerCheck - (nowTick - this.lastEventSentTick);
            if (missedTime > 0L) {
                try {
                    System.out.printf("Limit source send rate, headerLength:%d,pastSentLength:%d,lastEventSentTick:%d,sleepTime:%d, num:%d\n",
                        headerSize, pastSentLength, lastEventSentTick, missedTime / 1000000, num);
                    Thread.sleep(missedTime / 1000000L, (int) (missedTime % 1000000L));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            this.num = 0;
            this.pastSentLength = 0L;
            this.lastEventSentTick = (nowTick + (missedTime > 0L ? missedTime : 0L));
        }
        this.pastSentLength += this.headerSize + event.getBody().length;

        return event;
    }

    public List<Event> intercept(List<Event> events) {
        for (Event event : events) {
            intercept(event);
        }
        return events;
    }

    public void close() {
    }

    public static class Builder implements Interceptor.Builder {
        private long limitRate;
        private long headerSize;

        public Interceptor build() {
            return new LimitInterceptor(this.limitRate, this.headerSize);
        }

        public void configure(Context context) {
            this.limitRate = context.getLong(Constants.LIMIT_RATE, Long.valueOf(Constants.DEFAULT_RATE)).longValue();
            this.headerSize = context.getLong(Constants.HEADER_SIZE, Long.valueOf(Constants.DEFAULT_SIZE)).longValue();
        }

        public static class Constants {
            public static long DEFAULT_RATE = 500L;
            public static long DEFAULT_SIZE = 16L;
            public static String LIMIT_RATE = "limitRate";
            public static String HEADER_SIZE = "headerSize";
        }
    }
}

打包後扔到flume-ng的lib目錄下,然後在flume-conf配置檔案,增加流速攔截器:

agent.sources.seqGenSrc.interceptors = limitrate
agent.sources.seqGenSrc.interceptors.limitrate.type = com.xxx.flume.core.interceptor.LimitInterceptor$Builder
agent.sources.seqGenSrc.interceptors.limitrate.limitRate = 500
agent.sources.seqGenSrc.interceptors.limitrate.headerSize = 8

這裡限制傳送速度500kb/s(可以為不同的source設定不同的傳送速度),設定每個event頭大小約為8位元組。

傳送端輸出資訊:

Limit source send rate, headerLength:8,pastSentLength:512113,lastEventSentTick:79649436601991827,sleepTime:929, num:3437
Limit source send rate, headerLength:8,pastSentLength:512113,lastEventSentTick:79649437601991827,sleepTime:929, num:3437
Limit source send rate, headerLength:8,pastSentLength:512113,lastEventSentTick:79649438601991827,sleepTime:929, num:3437
Limit source send rate, headerLength:8,pastSentLength:512113,lastEventSentTick:79649439601991827,sleepTime:927, num:3437
Limit source send rate, headerLength:8,pastSentLength:512113,lastEventSentTick:79649440601991827,sleepTime:928, num:3437
Limit source send rate, headerLength:8,pastSentLength:512113,lastEventSentTick:79649441601991827,sleepTime:928, num:3437
Limit source send rate, headerLength:8,pastSentLength:512113,lastEventSentTick:79649442601991827,sleepTime:928, num:3437
Limit source send rate, headerLength:8,pastSentLength:512113,lastEventSentTick:79649443601991827,sleepTime:925, num:3437
Limit source send rate, headerLength:8,pastSentLength:512113,lastEventSentTick:79649444601991827,sleepTime:929, num:3437
Limit source send rate, headerLength:8,pastSentLength:512025,lastEventSentTick:79649445601991827,sleepTime:889, num:3421
Limit source send rate, headerLength:8,pastSentLength:512113,lastEventSentTick:79649446601991827,sleepTime:924, num:3437
Limit source send rate, headerLength:8,pastSentLength:512113,lastEventSentTick:79649447601991827,sleepTime:927, num:3437
Limit source send rate, headerLength:8,pastSentLength:512113,lastEventSentTick:79649448601991827,sleepTime:928, num:3437
Limit source send rate, headerLength:8,pastSentLength:512113,lastEventSentTick:79649449601991827,sleepTime:925, num:3437
Limit source send rate, headerLength:8,pastSentLength:512113,lastEventSentTick:79649450601991827,sleepTime:928, num:3437