1. 程式人生 > >基於Flume做FTP文件實時同步到本地磁盤的windows服務。

基於Flume做FTP文件實時同步到本地磁盤的windows服務。

lis tin obj ont jar包 配置文件 程序 targe 服務

需求:做一個windows服務,實現從ftp服務器實時下載或者更新文件到本地磁盤。

功能挺簡單的。直接寫個ftp工具類用定時器跑就能搞定,那我為什麽不用呢?

別問,問就是我無聊啊,然後研究一下Flume打發時間。哈哈~

一、Flume部分

Source組件和Sink組件用的都是第三方。

source組件:https://github.com/keedio/flume-ftp-source

Sink組件用的誰的目前已經找不到了,網上搜到了一個升級版的。

sink組件:https://github.com/huyanping/flume-sinks-safe-roll-file-sink

因為一些個性化的需求,所以我對他們源代碼做了些變動。

具體代碼參考:https://gitee.com/syher/spring-boot-project/tree/master/spring-boot-flume

Ftp-Source組件的關鍵技術是Apache FtpClient,而TailDir-sink則用的RandomAccessFile。

Junit測試類我已經寫好了,如果不想安裝服務又有興趣了解的朋友,可以自己改下配置跑一下看看。

技術分享圖片
package com.syher.flume;

import com.google.common.collect.Lists;
import com.urey.flume.sink.taildir.SafeRollingFileSink;
import org.apache.flume.*; import org.apache.flume.channel.ChannelProcessor; import org.apache.flume.channel.MemoryChannel; import org.apache.flume.channel.ReplicatingChannelSelector; import org.apache.flume.conf.Configurables; import org.apache.flume.sink.DefaultSinkProcessor; import org.apache.flume.sink.RollingFileSink;
import org.apache.flume.source.PollableSourceRunner; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.keedio.flume.source.ftp.source.Source; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; import java.util.Arrays; import java.util.HashMap; import java.util.Map; import java.util.Properties; //@RunWith(SpringRunner.class) //@SpringBootTest public class SpringBootFlumeApplicationTests { Context defaultContext = new Context(); @Before public void init() throws Exception { Map<String, String> prop = new HashMap<>(); prop.put("channel.capacity", "1000"); prop.put("channel.transactionCapacity", "1000"); prop.put("source.client.source", "ftp"); prop.put("source.name.server", "192.168.1.150"); prop.put("source.port", "21"); prop.put("source.user", "username"); prop.put("source.password", "secret"); prop.put("source.working.directory", "/ftp/source"); prop.put("source.filter.pattern", ".+\\.pdf"); // prop.put("source.folder", "/ftp"); prop.put("source.flushlines", "false"); prop.put("sink.sink.directory", "G:/ftp/target/rolling"); prop.put("sink.sink.moveFile", "false"); prop.put("sink.sink.targetDirectory", "G:/ftp/target/PDFfiles"); prop.put("sink.sink.useCopy", "true"); prop.put("sink.sink.copyDirectory", "G:/ftp/target/copy"); prop.put("sink.sink.useFileSuffix", "false"); prop.put("sink.sink.fileSuffix", ".log"); defaultContext.putAll(prop); } public MemoryChannel getChannel() { MemoryChannel channel = new MemoryChannel(); channel.setName("channel"); configure(channel, "channel."); return channel; } public Source getSource(Channel channel) { Source source = new Source(); source.setName("source"); ChannelSelector selector = new ReplicatingChannelSelector(); selector.setChannels(Lists.newArrayList(channel)); ChannelProcessor processor = new ChannelProcessor(selector); source.setChannelProcessor(processor); configure(source, "source."); return source; } public Sink getSink(Channel channel) { SafeRollingFileSink sink = new SafeRollingFileSink(); sink.setName("sink"); sink.setChannel(channel); configure(sink, "sink."); return sink; } public void configure(Object target, String prefixProperty) { Context context = new Context(); context.putAll(defaultContext.getSubProperties(prefixProperty)); Configurables.configure(target, context); } @Test public void contextLoads() throws Exception { MemoryChannel channel = getChannel(); Source source = getSource(channel); Sink sink = getSink(channel); PollableSourceRunner sourceRunner = new PollableSourceRunner(); sourceRunner.setSource(source); channel.start(); sourceRunner.start(); SinkProcessor sinkProcessor = new DefaultSinkProcessor(); sinkProcessor.setSinks(Arrays.<Sink>asList(sink)); SinkRunner sinkRunner = new SinkRunner(sinkProcessor); channel.start(); sourceRunner.start(); sinkRunner.start(); while (!Thread.interrupted()) { Thread.sleep(200); } } }
View Code

二、JSW服務部分

用的java service wrapper把java程序做成了windows服務。

工具包已經上傳在我上面提到的gitee碼雲項目上。flume-wrapper.zip。

解壓後在conf目錄可以看到兩個配置文件。一個是flume的,一個是jsw的。

bin目錄裏面是一些裝卸啟停的批命令。

lib目錄裏面有項目運行依賴的jar包。

lib.d目錄沒啥用,是我備份了從flume拷出來的一些無用的jar包。可刪。

具體的配置和用法可以看壓縮包裏的使用說明文檔。

註意,jsw的logfile的日誌級別最好指定ERROR級別的,不然聽說、可能會造成內存不足。

技術分享圖片

三、采集結果

技術分享圖片

可以看到,采集效率還是很穩的。一分鐘不到就搞定了。

基於Flume做FTP文件實時同步到本地磁盤的windows服務。