基於flume1.7開發自定義Sink元件-一鍵打包
阿新 • • 發佈:2018-12-19
概要
開始
pom檔案
ide使用idea神器,工程組織使用maven,下面是工程的pom檔案:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.*.*</groupId>
<artifactId>your name </artifactId>
<version>1.1.0</version>
<name>your procet name</name>
<!--<一些版本管理>-->
<properties>
<project.build.sourceEncoding >UTF-8</project.build.sourceEncoding>
<flume.version>1.7.0</flume.version>
<netty-all.version>4.0.23.Final</netty-all.version>
</properties>
<!--<定義一些profile 後面一鍵打包使用>-->
<profiles>
<profile>
<id >msg_store_test</id> <!--<name>-->
<properties>
<conf.path>conf/msg/test_jd</conf.path> <!--<定義一些路徑>-->
<bin.path>bin/msg</bin.path>
<package.name>StoreMsgAgent</package.name>
</properties>
</profile>
</profiles>
<!--<指明依賴的jar包>-->
<dependencies>
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId> <!--<核心依賴包,必須有>-->
<version>${flume.version}</version>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-configuration</artifactId> <!--<配置功能依賴包,必須有>-->
<version>${flume.version}</version>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-node</artifactId> <!--<最終可執行包的入口,必須有>-->
<version>${flume.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId> <!--<為了解決運行當中nett-all版本過低的問題>-->
<version>${netty-all.version}</version>
</dependency>
<dependency>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId> <!--<為了解決運行當中版本過低的問題>-->
<version>1.1.1</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId> <!--<我們這裡就是藉助assembly來實現自定義一鍵打可執行jar包功能,並直接打成壓縮包>-->
<version>2.4</version>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
<configuration>
<!--<參見上面的profile和project屬性>-->
<finalName>${package.name}_v${project.version}</finalName>
<descriptors> <!--<自定義功能都在指定路徑下的xml中>-->
<descriptor>*/*/*/bin.xml</descriptor>
</descriptors>
<tarLongFileMode>gnu</tarLongFileMode>
<!--<descriptorRefs>-->
<!--<descriptorRef>jar-with-dependencies</descriptorRef>-->
<!--</descriptorRefs>-->
</configuration>
</execution>
</executions>
</plugin>
<!--編譯外掛-->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration> <!--<指定sdk版本>-->
<source>1.7</source>
<target>1.7</target>
<encoding>utf-8</encoding>
</configuration>
</plugin>
</plugins>
</build>
</project>
assembly檔案
<assembly
xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd">
<id>bin</id> <!--<sssembly標識,原則和其檔名稱一樣>-->
<formats><!--<一鍵打壓縮包相關配置>-->
<format>dir</format>
<format>tar.gz</format>
</formats>
<!--<這個指定壓縮包的名稱,同上pom中定義的屬性>-->
<baseDirectory>${package.name}_v${project.version}-bin</baseDirectory>
<dependencySets><!--<把依賴拷貝到指定路徑下>-->
<dependencySet>
<!--<outputDirectory>plugins.d/${project.name}/lib</outputDirectory>-->
<outputDirectory>lib</outputDirectory>
<unpack>false</unpack>
<useProjectArtifact>false</useProjectArtifact>
<!--<excludes>-->
<!--<exclude>commons-cli:commons-cli</exclude>-->
<!--<exclude>org.apache.commons:commons-compress</exclude>-->
<!--<exclude>org.apache.avro:avro</exclude>-->
<!--<exclude>org.mortbay.jetty:jetty</exclude>-->
<!--<exclude>org.mortbay.jetty:jetty-util</exclude>-->
<!--<exclude>com.thoughtworks.paranamer:paranamer</exclude>-->
<!--<exclude>com.google.protobuf:protobuf-java</exclude>-->
<!--<exclude>org.tukaani:xz</exclude>-->
<!--</excludes>-->
</dependencySet>
</dependencySets>
<!--<這裡指定自定義的服務相關的必備的配置檔案或者其它需要的指令碼路徑>-->
<fileSets>
<fileSet>
<directory>${bin.path}</directory> <!--<上面pom中定義的profile,原檔案目錄>-->
<includes>
<include>**</include>
</includes>
<fileMode>0755</fileMode>
<outputDirectory>bin</outputDirectory> <!--<輸出檔案目錄>-->
</fileSet>
<fileSet>
<includes>
<include>lib/**</include>
</includes>
</fileSet>
<fileSet>
<directory>${conf.path}</directory> <!--<上面pom中定義的profile,原檔案目錄>-->
<includes>
<include>**</include>
</includes>
<outputDirectory>conf</outputDirectory> <!--<輸出檔案目錄>-->
</fileSet>
<!-- 把專案自己編譯出來的jar檔案,放入指定目錄-->
<fileSet>
<directory>${project.build.directory}</directory>
<outputDirectory>plugins.d/${project.name}/lib</outputDirectory>
<includes>
<include>*.jar</include>
</includes>
</fileSet>
</fileSets>
</assembly>
原始碼實現
SinkConfigurationConstants
package com.*.*;
import org.apache.hadoop.hbase.HConstants;
/**
* Constants used for configuration of HBaseSink and AsyncHBaseSink
*
*/
public class JpushHBaseSinkConfigurationConstants {
/**
* The Hbase tableSpace which the sink should write to.
/
public static final String CONFIG_TABLE_SPACE = "tableSpace";
/**
* The Hbase table which the sink should write to.
*/
public static final String CONFIG_TABLE = "table";
/**
* The column family which the sink should use.
*/
public static final String CONFIG_COLUMN_FAMILY = "columnFamily";
/**
* Maximum number of events the sink should take from the channel per
* transaction, if available.
*/
public static final String CONFIG_BATCHSIZE = "batchSize";
public static final String CONFIG_FLUSHSECONDS = "flushSecond";
/**
* The fully qualified class name of the serializer the sink should use.
*/
public static final String CONFIG_SERIALIZER = "serializer";
/**
* Configuration to pass to the serializer.
*/
public static final String CONFIG_SERIALIZER_PREFIX = CONFIG_SERIALIZER + ".";
public static final String CONFIG_TIMEOUT = "timeout";
public static final String CONFIG_ENABLE_WAL = "enableWal";
public static final boolean DEFAULT_ENABLE_WAL = true;
public static final long DEFAULT_TIMEOUT = 60000;
public static final String CONFIG_KEYTAB = "kerberosKeytab";
public static final String CONFIG_PRINCIPAL = "kerberosPrincipal";
public static final String ZK_QUORUM = "zookeeperQuorum";
public static final String ZK_ZNODE_PARENT = "znodeParent";
public static final String DEFAULT_ZK_ZNODE_PARENT = HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT;
public static final String CONFIG_COALESCE_INCREMENTS = "coalesceIncrements";
public static final Boolean DEFAULT_COALESCE_INCREMENTS = false;
public static final int DEFAULT_MAX_CONSECUTIVE_FAILS = 10;
public static final String CONFIG_MAX_CONSECUTIVE_FAILS = "maxConsecutiveFails";
/** Comma separated list of column names to place matching groups in. */
public static final String COL_NAME_CONFIG = "colNames";
public static final String COLUMN_NAME_DEFAULT = "payload";
/** What charset to use when serializing into HBase's byte arrays */
public static final String CHARSET_CONFIG = "charset";
public static final String CHARSET_DEFAULT = "UTF-8";
}
HbaseSink
package com.jd.jpush.service;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.Date;
import java.util.LinkedList;
import java.util.List;
import java.text.SimpleDateFormat;
import com.google.common.base.Charsets;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.hash.Hashing;
import com.jd.jpush.utils.HBaseUtil;
import com.jd.ump.profiler.CallerInfo;
import com.jd.ump.profiler.proxy.Profiler;
import org.apache.flume.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.flume.conf.Configurable;
import org.apache.flume.instrumentation.SinkCounter;
import org.apache.flume.sink.AbstractSink;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.commons.codec.binary.Base64;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Throwables;
import com.alibaba.fastjson.*;
//儲存access生成的訊息體到hbase
public class JpushAsyncHbaseSink extends AbstractSink implements Configurable {
private static final Logger logger = LoggerFactory.getLogger(JpushAsyncHbaseSink.class);
private String tableSpace;
private String tableName;
private byte[] columnFamily;
private List<byte[]> colNames = Lists.newArrayList();
private BufferedMutator bufferedMutator = null;
//private Connection connection = null;
private long lastFlushTime = 0;
private long batchSize;
private long flushSeconds;
private Charset charset;
private String umpMethodMonitoring;
private String umpCustomMonitoring;
private String umpBusinessMonitoring;
private SinkCounter sinkCounter;
private String tableSpaceName;
//@Override
public void configure(Context context) {
tableSpace = context.getString(JpushHBaseSinkConfigurationConstants.CONFIG_TABLE_SPACE);
tableName = context.getString(JpushHBaseSinkConfigurationConstants.CONFIG_TABLE);
String cf = context.getString(JpushHBaseSinkConfigurationConstants.CONFIG_COLUMN_FAMILY);
batchSize = context.getLong(JpushHBaseSinkConfigurationConstants.CONFIG_BATCHSIZE, (long) 100);
flushSeconds = context.getLong(JpushHBaseSinkConfigurationConstants.CONFIG_FLUSHSECONDS, (long) 10);
String colNameStr = context.getString(JpushHBaseSinkConfigurationConstants.COL_NAME_CONFIG);
umpMethodMonitoring = context.getString(JpushHBaseSinkConfigurationConstants.UMP_METHOD_MONITOR);
umpCustomMonitoring = context.getString(JpushHBaseSinkConfigurationConstants.UMP_CUSTOM_MONITOR);
umpBusinessMonitoring = context.getString(JpushHBaseSinkConfigurationConstants.UMP_BUSINESS_MONITOR);
Preconditions.checkNotNull(umpMethodMonitoring, "umpMethodMonitoring cannot be empty, please specify in configuration file");
tableSpaceName = "[tableSpace:tableName]: " + tableSpace + ":" + tableName + " ";
charset = Charset.forName(context.getString(JpushHBaseSinkConfigurationConstants.CHARSET_CONFIG, JpushHBaseSinkConfigurationConstants.CHARSET_DEFAULT));
String[] columnNames = colNameStr.split(",");
for (String s : columnNames) {
colNames.add(s.getBytes(charset));
}
columnFamily = cf.getBytes(Charsets.UTF_8);
if (sinkCounter == null) {
sinkCounter = new SinkCounter(getName());
}
}
//@Override
public Status process() throws EventDeliveryException {
logger.debug("processing...");
Status status = Status.READY;
Channel channel = getChannel();
Transaction transaction = channel