1. 程式人生 > >flume採集本地資料到hdfs

flume採集本地資料到hdfs

配置:

agent1.sources = spooldirSource
agent1.channels = fileChannel
agent1.sinks = hdfsSink

agent1.sources.spooldirSource.type=spooldir
agent1.sources.spooldirSource.spoolDir=/opt/flume
agent1.sources.spooldirSource.channels=fileChannel

agent1.sinks.hdfsSink.type=hdfs
agent1.sinks.hdfsSink.hdfs.path=hdfs://192.168.200.45:8020/flume/cys/%y-%m-%d
agent1.sinks.hdfsSink.hdfs.filePrefix=cys
agent1.sinks.sink1.hdfs.round = true
# Number of seconds to wait before rolling current file (0 = never roll based on time interval)
agent1.sinks.hdfsSink.hdfs.rollInterval = 3600
# File size to trigger roll, in bytes (0: never roll based on file size)
agent1.sinks.hdfsSink.hdfs.rollSize = 128000000
agent1.sinks.hdfsSink.hdfs.rollCount = 0
agent1.sinks.hdfsSink.hdfs.batchSize = 1000

#Rounded down to the highest multiple of this (in the unit configured using hdfs.roundUnit), less than current time.
agent1.sinks.hdfsSink.hdfs.roundValue = 1
agent1.sinks.hdfsSink.hdfs.roundUnit = minute
agent1.sinks.hdfsSink.hdfs.useLocalTimeStamp = true
agent1.sinks.hdfsSink.channel=fileChannel
agent1.sinks.hdfsSink.hdfs.fileType = DataStream


agent1.channels.fileChannel.type = file
agent1.channels.fileChannel.checkpointDir=/usr/share/apache-flume-1.5.0-bin/checkpoint
agent1.channels.fileChannel.dataDirs=/usr/share/apache-flume-1.5.0-bin/dataDir

執行:

[[email protected] conf.dist]# flume-ng agent -f test1   -n agent1 -Dflume.root.logger=INFO,console

異常:

HDFSEventSink.java:463)] HDFS IO error
java.io.IOException: Failed on local exception: com.google.protobuf.InvalidProtocolBufferException: Protocol message end-group tag did not match expected tag.; Host Details : local host is: "sdzn-cdh01.zhiyoubao.com/192.168.200.45"; destination host is: "sdzn-cdh01.zhiyoubao.com":9000;
        at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:764)
        at org.apache.hadoop.ipc.Client.call(Client.java:1415)
        at org.apache.hadoop.ipc.Client.call(Client.java:1364)
        at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206)
        at $Proxy19.create(Unknown Source)
        at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.create(ClientNamenodeProtocolTranslatorPB.java:287)
        at sun.reflect.GeneratedMethodAccessor7.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
        at java.lang.reflect.Method.invoke(Method.java:597)
        at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187)
        at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
        at $Proxy20.create(Unknown Source)
        at org.apache.hadoop.hdfs.DFSOutputStream.newStreamForCreate(DFSOutputStream.java:1645)
        at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1618)
        at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1543)
        at org.apache.hadoop.hdfs.DistributedFileSystem$6.doCall(DistributedFileSystem.java:396)
        at org.apache.hadoop.hdfs.DistributedFileSystem$6.doCall(DistributedFileSystem.java:392)
        at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
        at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:392)
        at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:336)
        at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:908)
        at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:889)
        at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:786)
        at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:775)
        at org.apache.flume.sink.hdfs.HDFSDataStream.doOpen(HDFSDataStream.java:86)
        at org.apache.flume.sink.hdfs.HDFSDataStream.open(HDFSDataStream.java:113)
        at org.apache.flume.sink.hdfs.BucketWriter$1.call(BucketWriter.java:273)
        at org.apache.flume.sink.hdfs.BucketWriter$1.call(BucketWriter.java:262)
        at org.apache.flume.sink.hdfs.BucketWriter$9$1.run(BucketWriter.java:706)
        at org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:183)
        at org.apache.flume.sink.hdfs.BucketWriter.access$1400(BucketWriter.java:59)
        at org.apache.flume.sink.hdfs.BucketWriter$9.call(BucketWriter.java:703)
        at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
        at java.util.concurrent.FutureTask.run(FutureTask.java:138)
        at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
        at java.lang.Thread.run(Thread.java:662)^C

少jar包如圖:

jar放入:

[[email protected] jars]# pwd
/opt/cloudera/parcels/CDH-5.3.6-1.cdh5.3.6.p0.11/jars

異常二:

  1. 1) [ERROR - org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run(SpoolDirectorySource.java:256)] FATAL: Spool Directory source r1: { spoolDir: /home/hadoop/flumeSpool-2 }: Uncaught exception in SpoolDirectorySource thread. Restart or reconfigure Flume to continue processing.  
  2. java.nio.charset.MalformedInputException: Input length = 1  
  3.     at java.nio.charset.CoderResult.throwException(CoderResult.java:281)  
  4.     at org.apache.flume.serialization.ResettableFileInputStream.readChar(ResettableFileInputStream.java:195)

資料格式有問題,修改資料格式即可!