1. 程式人生 > >fastDFS遇到的併發問題recv cmd: 0 is not correct, expect cmd: 100

fastDFS遇到的併發問題recv cmd: 0 is not correct, expect cmd: 100

此貼是我對一篇帖子FastDFS併發會有bug,其實我也不太信?進行的一些測試和我自己也發現的相關問題與解決方案。

最開始我也按照帖子進行了測試,結果發現確實存在併發問題(下面程式碼改為單執行緒是木有問題的)。以下列出我嘗試的幾種情況:

先列出我進行測試的有main函式的FastConcurrence類:

public class FastConcurrence {
	private static int poolSize=2;//定義執行緒個數
	public static void main(String[] args) throws InterruptedException {
		latchTest();
	}
	private static void latchTest() throws InterruptedException {
	       final CountDownLatch start = new CountDownLatch(1);
	       final CountDownLatch end = new CountDownLatch(poolSize);
	       ExecutorService exce = Executors.newFixedThreadPool(poolSize);
	       for (int i = 0; i < poolSize; i++) {
	           Runnable run = new Runnable() {
	               @Override
	               public void run() {
	                   try {
	                       start.await();
	                       testLoad();
	                   } catch (Exception e) {
	                       e.printStackTrace();
	                   } finally {
	                       end.countDown();
	                   }
	               }
	           };
	           exce.submit(run);
	       }
	       start.countDown();
	       end.await();
	       exce.shutdown();
	   }
	
private static void testLoad() throws Exception {
	 String filePath="D:\\fastDFS\\ques.png";
	 File content=new File(filePath);
	 TestFileManager test=new TestFileManager();
        FastDFSFile file =test.getFastFile(content,"png");
	for (int i=0;i<10;i++){                                                                                  FileManager.upload(file);//裡面封裝了我轉檔案為byte[]和上傳檔案方法storageClient.upload_file 
		   }
		   System.out.println("完成一個執行緒!");
	} 
}

①把trackerClient,trackerServer,storageServer,storageClient設為全域性變數。在類載入的時候,就進行了初始化,關鍵程式碼如下:

    private static TrackerClient  trackerClient;  
    private static TrackerServer  trackerServer;
    private static StorageServer  storageServer;
    private static StorageClient  storageClient;
  
    static { 
        try {
	     // 初始化檔案資源  
	     ClientGlobal.init("C:\\Users\\jianbo\\Downloads\\FastDFS\\conf\\client.conf");
            trackerClient = new TrackerClient();  
            trackerServer = trackerClient.getConnection();  
            //有併發問題,所以勿重用storageClient
            storageClient = new StorageClient(trackerServer, storageServer);
        } catch (Exception e) {  
            logger.error(logger,  e);
        }  
    } 
果不其然報錯了:
java.net.SocketException: socket closed
	at java.net.SocketInputStream.socketRead0(Native Method)
	at java.net.SocketInputStream.socketRead(Unknown Source)
	at java.net.SocketInputStream.read(Unknown Source)
	at java.net.SocketInputStream.read(Unknown Source)
	at org.csource.fastdfs.ProtoCommon.recvPackage(ProtoCommon.java:263)
	at org.csource.fastdfs.TrackerClient.getStoreStorage(TrackerClient.java:143)
	at org.csource.fastdfs.StorageClient.newWritableStorageConnection(StorageClient.java:1938)
	at org.csource.fastdfs.StorageClient.do_upload_file(StorageClient.java:703)
	at org.csource.fastdfs.StorageClient.upload_file(StorageClient.java:208)
	at org.csource.fastdfs.StorageClient.upload_file(StorageClient.java:226)
	at fastdfs.FileManager.upload(FileManager.java:76)
	at fastdfs.FastConcurrence.testLoad(FastConcurrence.java:48)
	at fastdfs.FastConcurrence.access$0(FastConcurrence.java:42)
	at fastdfs.FastConcurrence$1.run(FastConcurrence.java:27)
	at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
	at java.util.concurrent.FutureTask.run(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.lang.Thread.run(Unknown Source)
java.io.IOException: recv cmd: 0 is not correct, expect cmd: 100
	at org.csource.fastdfs.ProtoCommon.recvHeader(ProtoCommon.java:219)
	at org.csource.fastdfs.ProtoCommon.recvPackage(ProtoCommon.java:250)
	at org.csource.fastdfs.TrackerClient.getStoreStorage(TrackerClient.java:143)
	at org.csource.fastdfs.StorageClient.newWritableStorageConnection(StorageClient.java:1938)
	at org.csource.fastdfs.StorageClient.do_upload_file(StorageClient.java:703)
	at org.csource.fastdfs.StorageClient.upload_file(StorageClient.java:208)
	at org.csource.fastdfs.StorageClient.upload_file(StorageClient.java:226)
	at fastdfs.FileManager.upload(FileManager.java:76)
	at fastdfs.FastConcurrence.testLoad(FastConcurrence.java:48)
	at fastdfs.FastConcurrence.access$0(FastConcurrence.java:42)
	at fastdfs.FastConcurrence$1.run(FastConcurrence.java:27)
	at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
	at java.util.concurrent.FutureTask.run(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.lang.Thread.run(Unknown Source)
②我在呼叫storageClient.upload_file(byte[] file_buff, String file_ext_name, NameValuePair[] meta_list)前一句再storageClient = new StorageClient(trackerServer, storageServer); 

關鍵程式碼如下:

public static String[] upload(FastDFSFile file) {
   try {
     storageClient = new StorageClient(trackerServer, storageServer);//新加的
     uploadResults = storageClient.upload_file(file.getContent(), file.getExt(), meta_list);
   } catch (Exception e) {
     logger.error("Exception when uploadind the file:" + file.getName(), e);
   }
   //省略一部分程式碼
   return uploadResults;
}
好了,出現了和上面①一樣的問題。

我發現,其實這樣本質沒有變,還是去改了全域性storageClient 。應該new個新的,所以產生了③。

我在呼叫storageClient.upload_file(byte[] file_buff, String file_ext_name, NameValuePair[] meta_list)前一句再StorageClient storageClient = new StorageClient(trackerServer, storageServer); 就是原帖最後說明的解決辦法。

public static String[] upload(FastDFSFile file) {
   try {
     StorageClient storageClient = new StorageClient(trackerServer, storageServer);//新加的
     uploadResults = storageClient.upload_file(file.getContent(), file.getExt(), meta_list);
   } catch (Exception e) {
     logger.error("Exception when uploadind the file:" + file.getName(), e);
   }
   //省略一部分程式碼
   return uploadResults;
}
我還是報錯,和前面一樣的異常。原因還沒有來得及找,不過我採用鎖是沒有問題的。

④在呼叫方法的時候採用鎖就能解決。例如這個方法我加了鎖就沒有問題了。

private synchronized static void testLoad() throws Exception {
		   String filePath="D:\\fastDFS\\ques.png";
		   File content=new File(filePath);
		   TestFileManager test=new TestFileManager();
		   FastDFSFile file =test.getFastFile(content,"png");
		   for (int i=0;i<10;i++){
			   FileManager.upload(file);
		   }
		   System.out.println("完成一個執行緒!");
		} 

一路順暢。不過③為什麼報錯了,我還要研究下。

補:對於③我找到問題了。

原始碼下載下來,然後我就查StorageClient類,發現在do_upload_file方法中,有一段程式碼:

bUploadSlave = ((group_name != null && group_name.length() > 0) && 
		                (master_filename != null && master_filename.length() > 0) &&
		                (prefix_name != null));
		if (bUploadSlave)
		{
			bNewConnection = this.newUpdatableStorageConnection(group_name, master_filename);
		}
		else
		{
			bNewConnection = this.newWritableStorageConnection(group_name);
		}
其中的
this.newWritableStorageConnection(group_name);
具體方法是:
protected boolean newWritableStorageConnection(String group_name) throws IOException, MyException
	{
		if (this.storageServer != null)
		{
			return false;
		}
		else
		{
			TrackerClient tracker = new TrackerClient();
  		this.storageServer = tracker.getStoreStorage(this.trackerServer, group_name);
  		if (this.storageServer == null)
  		{
  			throw new MyException("getStoreStorage fail, errno code: " + tracker.getErrorCode());
  		}
  		return true;
		}
  }
而這個方法裡面的

  		this.storageServer = tracker.getStoreStorage(this.trackerServer, group_name);
找進去發現每次trackerServer用完了,就關閉連線了。

trackerServer.close();

所以後面執行緒都被關閉了,就報IO異常,只要在方法③中new StorageClient前面加一句:

TrackerServer trackerServer = trackerClient.getConnection();
就解決問題了。



  		this.storageServer = tracker.getStoreStorage(this.trackerServer, group_name);