基於Flume做FTP文件實時同步到本地磁盤的windows服務。
需求:做一個windows服務,實現從ftp服務器實時下載或者更新文件到本地磁盤。
功能挺簡單的。直接寫個ftp工具類用定時器跑就能搞定,那我為什麽不用呢?
別問,問就是我無聊啊,然後研究一下Flume打發時間。哈哈~
一、Flume部分
Source組件和Sink組件用的都是第三方。
source組件:https://github.com/keedio/flume-ftp-source
Sink組件用的誰的目前已經找不到了,網上搜到了一個升級版的。
sink組件:https://github.com/huyanping/flume-sinks-safe-roll-file-sink
因為一些個性化的需求,所以我對他們源代碼做了些變動。
具體代碼參考:https://gitee.com/syher/spring-boot-project/tree/master/spring-boot-flume
Ftp-Source組件的關鍵技術是Apache FtpClient,而TailDir-sink則用的RandomAccessFile。
Junit測試類我已經寫好了,如果不想安裝服務又有興趣了解的朋友,可以自己改下配置跑一下看看。
package com.syher.flume; import com.google.common.collect.Lists; import com.urey.flume.sink.taildir.SafeRollingFileSink;View Codeimport org.apache.flume.*; import org.apache.flume.channel.ChannelProcessor; import org.apache.flume.channel.MemoryChannel; import org.apache.flume.channel.ReplicatingChannelSelector; import org.apache.flume.conf.Configurables; import org.apache.flume.sink.DefaultSinkProcessor; import org.apache.flume.sink.RollingFileSink;import org.apache.flume.source.PollableSourceRunner; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.keedio.flume.source.ftp.source.Source; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; import java.util.Arrays; import java.util.HashMap; import java.util.Map; import java.util.Properties; //@RunWith(SpringRunner.class) //@SpringBootTest public class SpringBootFlumeApplicationTests { Context defaultContext = new Context(); @Before public void init() throws Exception { Map<String, String> prop = new HashMap<>(); prop.put("channel.capacity", "1000"); prop.put("channel.transactionCapacity", "1000"); prop.put("source.client.source", "ftp"); prop.put("source.name.server", "192.168.1.150"); prop.put("source.port", "21"); prop.put("source.user", "username"); prop.put("source.password", "secret"); prop.put("source.working.directory", "/ftp/source"); prop.put("source.filter.pattern", ".+\\.pdf"); // prop.put("source.folder", "/ftp"); prop.put("source.flushlines", "false"); prop.put("sink.sink.directory", "G:/ftp/target/rolling"); prop.put("sink.sink.moveFile", "false"); prop.put("sink.sink.targetDirectory", "G:/ftp/target/PDFfiles"); prop.put("sink.sink.useCopy", "true"); prop.put("sink.sink.copyDirectory", "G:/ftp/target/copy"); prop.put("sink.sink.useFileSuffix", "false"); prop.put("sink.sink.fileSuffix", ".log"); defaultContext.putAll(prop); } public MemoryChannel getChannel() { MemoryChannel channel = new MemoryChannel(); channel.setName("channel"); configure(channel, "channel."); return channel; } public Source getSource(Channel channel) { Source source = new Source(); source.setName("source"); ChannelSelector selector = new ReplicatingChannelSelector(); selector.setChannels(Lists.newArrayList(channel)); ChannelProcessor processor = new ChannelProcessor(selector); source.setChannelProcessor(processor); configure(source, "source."); return source; } public Sink getSink(Channel channel) { SafeRollingFileSink sink = new SafeRollingFileSink(); sink.setName("sink"); sink.setChannel(channel); configure(sink, "sink."); return sink; } public void configure(Object target, String prefixProperty) { Context context = new Context(); context.putAll(defaultContext.getSubProperties(prefixProperty)); Configurables.configure(target, context); } @Test public void contextLoads() throws Exception { MemoryChannel channel = getChannel(); Source source = getSource(channel); Sink sink = getSink(channel); PollableSourceRunner sourceRunner = new PollableSourceRunner(); sourceRunner.setSource(source); channel.start(); sourceRunner.start(); SinkProcessor sinkProcessor = new DefaultSinkProcessor(); sinkProcessor.setSinks(Arrays.<Sink>asList(sink)); SinkRunner sinkRunner = new SinkRunner(sinkProcessor); channel.start(); sourceRunner.start(); sinkRunner.start(); while (!Thread.interrupted()) { Thread.sleep(200); } } }
二、JSW服務部分
用的java service wrapper把java程序做成了windows服務。
工具包已經上傳在我上面提到的gitee碼雲項目上。flume-wrapper.zip。
解壓後在conf目錄可以看到兩個配置文件。一個是flume的,一個是jsw的。
bin目錄裏面是一些裝卸啟停的批命令。
lib目錄裏面有項目運行依賴的jar包。
lib.d目錄沒啥用,是我備份了從flume拷出來的一些無用的jar包。可刪。
具體的配置和用法可以看壓縮包裏的使用說明文檔。
註意,jsw的logfile的日誌級別最好指定ERROR級別的,不然聽說、可能會造成內存不足。
三、采集結果
可以看到,采集效率還是很穩的。一分鐘不到就搞定了。
基於Flume做FTP文件實時同步到本地磁盤的windows服務。