1. 程式人生 > >【初學與研發之NETTY】netty4之檔案上傳

【初學與研發之NETTY】netty4之檔案上傳

客戶端:

public class UpLoadClient {
	private StringBuffer resultBuffer = new StringBuffer();
	private EventLoopGroup group = null;
	private HttpDataFactory factory = null;
	
	private Object waitObject = new Object();
	
	private ChannelFuture future = null;
	
	public UpLoadClient(String host, int port) throws Exception {
		this.group = new NioEventLoopGroup();
		this.factory = new DefaultHttpDataFactory(DefaultHttpDataFactory.MINSIZE);
		
        Bootstrap b = new Bootstrap();
        b.option(ChannelOption.TCP_NODELAY, true);
        b.option(ChannelOption.SO_SNDBUF, 1048576*200);
        b.option(ChannelOption.SO_KEEPALIVE, true);
        
		b.group(group).channel(NioSocketChannel.class);
        b.handler(new UpLoadClientIntializer());
        
        this.future = b.connect(host, port).sync();
	}
	
	public void uploadFile(String path) {
		if(path == null) {
			System.out.println("上傳檔案的路徑不能為null...");
        	return;
		}
		File file = new File(path);
		if (!file.canRead()) {
			System.out.println(file.getName() + "不可讀...");
        	return;
		}
		if (file.isHidden() || !file.isFile()) {
        	System.out.println(file.getName() + "不存在...");
        	return;
        }
		
        try {
            HttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, "");
            
        	HttpPostRequestEncoder bodyRequestEncoder = new HttpPostRequestEncoder(factory, request, false);
        	
            bodyRequestEncoder.addBodyAttribute("getform", "POST");
            bodyRequestEncoder.addBodyFileUpload("myfile", file, "application/x-zip-compressed", false);
            
            List<InterfaceHttpData> bodylist = bodyRequestEncoder.getBodyListAttributes();
            if (bodylist == null) {
            	System.out.println("請求體不存在...");
				return;
			}
            
            HttpRequest request2 = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, file.getName());
            HttpPostRequestEncoder bodyRequestEncoder2 = new HttpPostRequestEncoder(factory, request2, true);
            
            bodyRequestEncoder2.setBodyHttpDatas(bodylist);
            bodyRequestEncoder2.finalizeRequest();
            
            Channel channel = this.future.channel();
            if(channel.isActive() && channel.isWritable()) {
                channel.writeAndFlush(request2);
                
                if (bodyRequestEncoder2.isChunked()) {
                    channel.writeAndFlush(bodyRequestEncoder2).awaitUninterruptibly();
                }
                
                bodyRequestEncoder2.cleanFiles();
            }
            channel.closeFuture().sync();
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
	
	public void shutdownClient() {
		// 等待資料的傳輸通道關閉
		group.shutdownGracefully();
		factory.cleanAllHttpDatas();
	}
	
	public boolean isCompleted() {
		while(waitObject != null) {
			//當通道處於開通和活動時,處於等待
		}
		if(resultBuffer.length() > 0) {
			if("200".equals(resultBuffer.toString())) {
				resultBuffer.setLength(0);
				return true;
			}
		}
		return false;
	}
	
	private class UpLoadClientIntializer extends ChannelInitializer<SocketChannel> {
		@Override
		protected void initChannel(SocketChannel ch) throws Exception {
			ChannelPipeline pipeline = ch.pipeline();
			
			pipeline.addLast("decoder", new HttpResponseDecoder());
			pipeline.addLast("encoder", new HttpRequestEncoder());  
			pipeline.addLast("chunkedWriter", new ChunkedWriteHandler());  

			pipeline.addLast("dispatcher", new UpLoadClientHandler());
		}
	}
	
	private class UpLoadClientHandler extends SimpleChannelInboundHandler<HttpObject> {
		private boolean readingChunks = false;
		private int succCode = 200;
		
		protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg)
				throws Exception {
			if (msg instanceof HttpResponse) {
				HttpResponse response = (HttpResponse) msg;
				
				succCode = response.getStatus().code();
	            
	            if (succCode == 200 && HttpHeaders.isTransferEncodingChunked(response)) {
	                readingChunks = true;
	            }
			}
				
			if (msg instanceof HttpContent) {
	            HttpContent chunk = (HttpContent) msg;
	            System.out.println("【響應】"+succCode+">>"+chunk.content().toString(CharsetUtil.UTF_8));
	            if (chunk instanceof LastHttpContent) {
	                readingChunks = false;
	            }
	        }
			
			if (!readingChunks) {
				resultBuffer.append(succCode);
				ctx.channel().close();
			}
		}
		
		@Override
		public void channelInactive(ChannelHandlerContext ctx) throws Exception {
			waitObject = null;
		}

		@Override
		public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
				throws Exception {
			
			resultBuffer.setLength(0);
			resultBuffer.append(500);
			System.out.println("管道異常:" + cause.getMessage());
			cause.printStackTrace();
	        ctx.channel().close();
		}
	}
}

服務端:

public class DBServer extends Thread {
	
	//單例項
	private static DBServer dbServer = null;
	
	//定時排程的週期例項
	private static Scheduler sched = null;
	
	private EventLoopGroup bossGroup = null;
    private EventLoopGroup workerGroup = null;
	//建立例項
	public static DBServer newBuild() {
		if(dbServer == null) {
			dbServer = new DBServer();
		}
		return dbServer;
	}
	
	public void run() {
		try {			
			startServer();
		} catch(Exception e) {
			System.out.println("資料服務啟動出現異常:"+e.toString());
			e.printStackTrace();
		}
	}
	
	private void startServer() throws Exception {
		bossGroup = new NioEventLoopGroup();
        workerGroup = new NioEventLoopGroup();
        
        try {
            ServerBootstrap b = new ServerBootstrap();
            
            b.group(bossGroup, workerGroup);
            
			b.option(ChannelOption.TCP_NODELAY, true);
			b.option(ChannelOption.SO_TIMEOUT, 60000);
            b.option(ChannelOption.SO_SNDBUF, 1048576*200);
            
            b.option(ChannelOption.SO_KEEPALIVE, true);
			
            b.channel(NioServerSocketChannel.class);
            b.childHandler(new DBServerInitializer());

            // 伺服器繫結埠監聽
            ChannelFuture f = b.bind(DBConfig.curHost.getIp(), DBConfig.curHost.getPort()).sync();
            
            System.out.println("資料服務:"+DBConfig.curHost.getServerHost()+"啟動完成...");
            // 監聽伺服器關閉監聽
            f.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
	}
}
public class DBServerHandler extends SimpleChannelInboundHandler<HttpObject> {
	
	private static final HttpDataFactory factory = new DefaultHttpDataFactory(DefaultHttpDataFactory.MINSIZE);
	
	private String uri = null;
	
	private HttpRequest request = null;
	
	private HttpPostRequestDecoder decoder;
	
	//message、download、upload
	private String type = "message";
	
	public static final String HTTP_DATE_FORMAT = "EEE, dd MMM yyyy HH:mm:ss zzz";
    public static final String HTTP_DATE_GMT_TIMEZONE = "GMT";
    public static final int HTTP_CACHE_SECONDS = 60;
    
    static {
        DiskFileUpload.baseDirectory = DBConfig.curHost.getZipPath();
    }

    @Override
    public void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception {
    	if (msg instanceof HttpRequest) {
    		request = (HttpRequest) msg;
    		
    		uri = sanitizeUri(request.getUri());
    		
       		if (request.getMethod() == HttpMethod.POST) {
    			if (decoder != null) {
					decoder.cleanFiles();
					decoder = null;
				}
    			try {
	              decoder = new HttpPostRequestDecoder(factory, request);
	    		} catch (Exception e) {
	        		e.printStackTrace();
	        		writeResponse(ctx.channel(), HttpResponseStatus.INTERNAL_SERVER_ERROR, e.toString());
	            	ctx.channel().close();
	        		return;
	    		}
    		}
    	}
    	
    	if (decoder != null && msg instanceof HttpContent) {
        	HttpContent chunk = (HttpContent) msg;
        	
        	try {
        		decoder.offer(chunk);
        	} catch (Exception e) {
        		e.printStackTrace();
        		writeResponse(ctx.channel(), HttpResponseStatus.INTERNAL_SERVER_ERROR, e.toString());
            	ctx.channel().close();
                return;
        	}
        	
        	readHttpDataChunkByChunk();
        	
        	if (chunk instanceof LastHttpContent) {
        		writeResponse(ctx.channel(), HttpResponseStatus.OK, "");
                reset();
                return;
            }
        }
    }

	private String sanitizeUri(String uri) {
		try {
			uri = URLDecoder.decode(uri, "UTF-8");
		} catch(UnsupportedEncodingException e) {
			try {
				uri = URLDecoder.decode(uri, "ISO-8859-1");
			} catch(UnsupportedEncodingException e1) {
				throw new Error();
			}
		}

		return uri;
	}

    private void reset() {
        request = null;

        //銷燬decoder釋放所有的資源
        decoder.destroy();
        
        decoder = null;
    }

    /**
     * 通過chunk讀取request,獲取chunk資料
     * @throws IOException 
     */
    private void readHttpDataChunkByChunk() throws IOException {
    	try {
	    	while (decoder.hasNext()) {
	        	
	            InterfaceHttpData data = decoder.next();
	            if (data != null) {
	                try {
	                    writeHttpData(data);
	                } finally {
	                    data.release();
	                }
	            }
	        }
    	} catch (EndOfDataDecoderException e1) {
    		System.out.println("end chunk");
    	}
    }

    private void writeHttpData(InterfaceHttpData data) throws IOException {
        if (data.getHttpDataType() == HttpDataType.FileUpload) {
            FileUpload fileUpload = (FileUpload) data;
            if (fileUpload.isCompleted()) {
            	
				StringBuffer fileNameBuf = new StringBuffer();
				fileNameBuf.append(DiskFileUpload.baseDirectory)
		                   .append(uri);

				fileUpload.renameTo(new File(fileNameBuf.toString()));
            }
        } else if (data.getHttpDataType() == HttpDataType.Attribute) {
        	Attribute attribute = (Attribute) data;
        	if(CommonParam.DOWNLOAD_COLLECTION.equals(attribute.getName())) {
        		SynchMessageWatcher.newBuild().getMsgQueue().add(attribute.getValue());
        	}
        }
    }
    
    private void writeDownLoadResponse(ChannelHandlerContext ctx, RandomAccessFile raf, File file) throws Exception {
    	long fileLength = raf.length();
    	
    	//判斷是否關閉請求響應連線
        boolean close = HttpHeaders.Values.CLOSE.equalsIgnoreCase(request.headers().get(CONNECTION))
                || request.getProtocolVersion().equals(HttpVersion.HTTP_1_0)
                && !HttpHeaders.Values.KEEP_ALIVE.equalsIgnoreCase(request.headers().get(CONNECTION));
        
        HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
        HttpHeaders.setContentLength(response, fileLength);
        
        setContentHeader(response, file);
        
        if (!close) {
            response.headers().set(CONNECTION, HttpHeaders.Values.KEEP_ALIVE);
        }
		
        ctx.write(response);
        System.out.println("讀取大小:"+fileLength);
        
        final FileRegion region = new DefaultFileRegion(raf.getChannel(), 0, 1000);
		ChannelFuture writeFuture = ctx.write(region, ctx.newProgressivePromise());
		writeFuture.addListener(new ChannelProgressiveFutureListener() {
            public void operationProgressed(ChannelProgressiveFuture future, long progress, long total) {
                if (total < 0) {
                    System.err.println(future.channel() + " Transfer progress: " + progress);
                } else {
                    System.err.println(future.channel() + " Transfer progress: " + progress + " / " + total);
                }
            }

            public void operationComplete(ChannelProgressiveFuture future) {
            }
        });
		
	    ChannelFuture lastContentFuture = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
        if(close) {
        	raf.close();
        	lastContentFuture.addListener(ChannelFutureListener.CLOSE);
        }
	}
    
    private static void setContentHeader(HttpResponse response, File file) {
        MimetypesFileTypeMap mimeTypesMap = new MimetypesFileTypeMap();
        response.headers().set(CONTENT_TYPE, mimeTypesMap.getContentType(file.getPath()));
        
        SimpleDateFormat dateFormatter = new SimpleDateFormat(HTTP_DATE_FORMAT, Locale.US);
        dateFormatter.setTimeZone(TimeZone.getTimeZone(HTTP_DATE_GMT_TIMEZONE));

        // Date header
        Calendar time = new GregorianCalendar();
        response.headers().set(DATE, dateFormatter.format(time.getTime()));

        // Add cache headers
        time.add(Calendar.SECOND, HTTP_CACHE_SECONDS);
        response.headers().set(EXPIRES, dateFormatter.format(time.getTime()));
        response.headers().set(CACHE_CONTROL, "private, max-age=" + HTTP_CACHE_SECONDS);
        response.headers().set(LAST_MODIFIED, dateFormatter.format(new Date(file.lastModified())));
    }
    
    private void writeResponse(Channel channel, HttpResponseStatus httpResponseStatus, String returnMsg) {
    	String resultStr = "節點【"+DBConfig.curHost.getServerHost()+"】";
    	if(httpResponseStatus.code() == HttpResponseStatus.OK.code()) {
    		resultStr += "正常接收";
    		if("message".equals(type)) {
    			resultStr += "字串。";
    		} else if("upload".equals(type)) {
    			resultStr += "上傳檔案。";
    		} else if("download".equals(type)) {
    			resultStr += "下載檔名。";
    		}
    	} else if(httpResponseStatus.code() == HttpResponseStatus.INTERNAL_SERVER_ERROR.code()) {
    		resultStr += "接收";
    		if("message".equals(type)) {
    			resultStr += "字串";
    		} else if("upload".equals(type)) {
    			resultStr += "上傳檔案";
    		} else if("download".equals(type)) {
    			resultStr += "下載檔名";
    		}
    		resultStr += "的過程中出現異常:"+returnMsg;
    	}
        //將請求響應的內容轉換成ChannelBuffer.
        ByteBuf buf = copiedBuffer(resultStr, CharsetUtil.UTF_8);

        //判斷是否關閉請求響應連線
        boolean close = HttpHeaders.Values.CLOSE.equalsIgnoreCase(request.headers().get(CONNECTION))
                || request.getProtocolVersion().equals(HttpVersion.HTTP_1_0)
                && !HttpHeaders.Values.KEEP_ALIVE.equalsIgnoreCase(request.headers().get(CONNECTION));
        
        //構建請求響應物件
        FullHttpResponse response = new DefaultFullHttpResponse(
                HttpVersion.HTTP_1_1, httpResponseStatus, buf);
        response.headers().set(CONTENT_TYPE, "text/plain; charset=UTF-8");

        if (!close) {
            //若該請求響應是最後的響應,則在響應頭中沒有必要新增'Content-Length'
            response.headers().set(CONTENT_LENGTH, buf.readableBytes());
        }
        
        //傳送請求響應
        ChannelFuture future = channel.writeAndFlush(response);
        //傳送請求響應操作結束後關閉連線
        if (close) {
            future.addListener(ChannelFutureListener.CLOSE);
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    	cause.getCause().printStackTrace();
    	writeResponse(ctx.channel(), HttpResponseStatus.INTERNAL_SERVER_ERROR, "資料檔案通過過程中出現異常:"+cause.getMessage().toString());
        ctx.channel().close();
    }
}
public class DBServerInitializer extends ChannelInitializer<SocketChannel> {

    @Override
    public void initChannel(SocketChannel ch) {
        ChannelPipeline pipeline = ch.pipeline();
        
        pipeline.addLast("decoder", new HttpRequestDecoder());
		pipeline.addLast("encoder", new HttpResponseEncoder());
		
		pipeline.addLast("deflater", new HttpContentCompressor());

        pipeline.addLast("handler", new DBServerHandler());
    }
}

相關推薦

初學研發NETTYnetty4檔案

客戶端: public class UpLoadClient { private StringBuffer resultBuffer = new StringBuffer(); private EventLoopGroup group = null; private

初學研發NETTYnetty4物件、位元組傳輸

netty4中的物件、位元組傳輸做了封裝,writeAndFlush中提供的引數Object可以直接的寫入物件、byte[]。不管是位元組陣列還是物件,關鍵部分的程式碼如下: new ChannelInitializer<SocketChannel>() {

初學研發NETTYnetty3檔案

客戶端: package netty3.socket.client; import static org.jboss.netty.channel.Channels.pipeline; import java.io.File; import java.net.InetSo

Jsp使用AjaxFileUploaderjspsmartupload完成不重新整理的Ajax檔案系統

這個不重新整理的Ajax檔案上傳系統同樣可以用來做預覽圖 雖然預覽圖完全可以通過不上傳圖片就完成,但是不重新整理的Ajax檔案上傳系統可以做到上傳完圖片立即返回上傳結果給使用者的結果 上次在《【Jsp】使用jspsmartupload完成簡單的檔案上傳系統》(點選開啟連結)

SSH網上商城專案實戰13Struts2實現檔案功能

  轉自:https://blog.csdn.net/eson_15/article/details/51366384 上一節我們做完了新增和更新商品的功能,這兩個部分裡有涉及到商品圖片的上傳,並沒有詳細解說。為此,這篇文章詳細介紹一下Struts2實現檔案上傳的功能。 1

IOS學習http非同步檔案和下載以及進度指示

2016-02-12 13:05:07.330 network-demo[16708:1254465] =================request redirectResponse================= 2016-02-12 13:05:07.331 network-demo[16708:

HTML5拖放功能(多檔案和元素拖放)

在Web應用中,良好的使用者體驗是設計師們一直的追求,拖拽體驗就是其中之一。在HTML5之前,已經可以使用事件mousedown、mousemove和mouseup巧妙地實現了頁面內的拖放操作,但是拖放的操作範圍還只是侷限在瀏覽器內部。HTML5提供的拖放API,不但能直接

php文件的下載

下載 類型 臨時文件 沒有 ipa pic 文件數量 上傳與下載 文件夾 一、 生活中常見的地方:  a) 例如郵箱、空間、文庫、百度雲、微愛等地方,都可以看到文件的上傳和下載的應用,因此,上傳和下載的功能非常重要!二、 PHP當中的文件上傳和下載  a) 我們需要進行一些

滲透課程第六篇-漏洞解析漏洞

文件的 配置文件 密碼 3.1 安裝目錄 ppa xxx 表單 圖片 上傳漏洞,我們為什麽要上傳?因為我們說過。在網站服務器上 肯定有一個Web容器,它裏面裝的就是Web應用程序 。某些腳本具有一定的執行和修改權限。這些權限可以說是服務器給客戶端訪問時提供的服務的同時提供的

滲透課程第七篇-漏洞繞過漏洞

ng- 相關 都是 http itl 了解 利用 存在 上傳 前一篇我們已經講過了上傳漏洞的解析漏洞,在某些時候,知道網站存在哪個解析漏洞,那麽我們就可以更好的利用上傳漏洞 這個知識點在我們前面就有粗略說過了(http://www.yuntest.org/index.php

滲透課程第八篇-漏洞文本編輯器

fckeditor -m ive conf ima xheditor ger .org 圖片上傳 Oday 常見的文本編輯器有CKEditor,Ewebeditor,UEditor,KindEditor,XHeditor等。其包含的功能類似,比如,圖片上傳、視頻上傳、遠程下

Python學習yieldsend方法

下一條 lis 區別 但是 查找 接受 python 方法 完全 yield作用   簡單地講,yield 的作用就是把一個函數變成一個 generator,帶有 yield 的函數不再是一個普通函數,Python 解釋器會將其視為一個 generator。下面以斐波拉契數

NMSIOU代碼

以及 索引 [] [1] while 更新 HERE append pre # -*- coding: utf-8 -*- import numpy as np def IOU1(A,B): #左上右下坐標(x1,y1,x2,y2) w=max(0,min

java代碼---guavaImmutable(不可變)集合

實例 bubuko mage string 工具類 clas tle wrapper 系列 Immutable(不可變)集合 一、概述 guava是google的一個庫,彌補了java語言的很多方面的不足,很多在java8中已有實現,暫時不展開。Col

牛客網刷題Python3functools.cmp_to_key()學習

牛客網原題: 連結:https://www.nowcoder.com/questionTerminal/a6a656249f404eb498d16b2f8eaa2c60來源:牛客網設有n個正整數,將他們連線成一排,組成一個最大的多位整數。 如:n=3時,3個整數13,312,343,連成的最大整數為3433

BZOJ 4390 [Usaco2015 dec] Max FlowLCA樹上差分

樹上差分板子: #include <cmath> #include <cstdio> #include <cstring> #include <iostream> #include <algorithm> #define db

Android架構基於MVP模式的Retrofit2+RXjava封裝檔案(三)

最近手頭事比較多,抽個空把之前系列也補充一下。 先回顧下之前的 【Android架構】基於MVP模式的Retrofit2+RXjava封裝(一) 【Android架構】基於MVP模式的Retrofit2+RXjava封裝之檔案下載(二) 今天要說的是檔案上傳 1.單圖上

HTTPHTTPS的區別

超文字傳輸協議,即HTTP協議被用於在Web瀏覽器和網站伺服器之間傳遞資訊. HTTP協議以明文方式傳送內容,不提供任何方式的資料加密. 如果攻擊者截取了Web瀏覽器和網站伺服器之間的傳輸報文,就可以直接讀懂其中的資訊. 因此,HTTP協議不適合傳輸一些敏感資訊,比如:信用卡號、密碼等支付資

從壹開始前後端分離 .NET Core2.0 +Vue2.0 框架十二 || 三種跨域方式比較,DTOs(資料傳輸物件)初探

更新反饋 1、博友@童鞋說到了,Nginx反向代理實現跨域,因為我目前還沒有使用到,給忽略了,這次記錄下,為下次補充。 程式碼已上傳Github+Gitee,文末有地址   今天忙著給小夥伴們提出的問題解答,時間上沒把握好,都快下班了,趕緊釋出:書說上文《從壹開始前

演算法資料結構專場BitMap演算法介紹

我們先來看個簡單的問題。假如給你20億個非負數的int型整數,然後再給你一個非負數的int型整數 t ,讓你判斷t是否存在於這20億數中,你會怎麼做呢?有人可能會用一個int陣列,然後把20億個數給存進去,然後再迴圈遍歷一下就可以了。想一下,這樣的話,時間複雜度是O(n),所需要的記憶體空間4byte *