fastDFS遇到的併發問題recv cmd: 0 is not correct, expect cmd: 100
阿新 • • 發佈:2019-01-31
此貼是我對一篇帖子FastDFS併發會有bug,其實我也不太信?進行的一些測試和我自己也發現的相關問題與解決方案。
最開始我也按照帖子進行了測試,結果發現確實存在併發問題(下面程式碼改為單執行緒是木有問題的)。以下列出我嘗試的幾種情況:
先列出我進行測試的有main函式的FastConcurrence類:
public class FastConcurrence { private static int poolSize=2;//定義執行緒個數 public static void main(String[] args) throws InterruptedException { latchTest(); } private static void latchTest() throws InterruptedException { final CountDownLatch start = new CountDownLatch(1); final CountDownLatch end = new CountDownLatch(poolSize); ExecutorService exce = Executors.newFixedThreadPool(poolSize); for (int i = 0; i < poolSize; i++) { Runnable run = new Runnable() { @Override public void run() { try { start.await(); testLoad(); } catch (Exception e) { e.printStackTrace(); } finally { end.countDown(); } } }; exce.submit(run); } start.countDown(); end.await(); exce.shutdown(); } private static void testLoad() throws Exception { String filePath="D:\\fastDFS\\ques.png"; File content=new File(filePath); TestFileManager test=new TestFileManager(); FastDFSFile file =test.getFastFile(content,"png"); for (int i=0;i<10;i++){ FileManager.upload(file);//裡面封裝了我轉檔案為byte[]和上傳檔案方法storageClient.upload_file } System.out.println("完成一個執行緒!"); } }
①把trackerClient,trackerServer,storageServer,storageClient設為全域性變數。在類載入的時候,就進行了初始化,關鍵程式碼如下:
果不其然報錯了:private static TrackerClient trackerClient; private static TrackerServer trackerServer; private static StorageServer storageServer; private static StorageClient storageClient; static { try { // 初始化檔案資源 ClientGlobal.init("C:\\Users\\jianbo\\Downloads\\FastDFS\\conf\\client.conf"); trackerClient = new TrackerClient(); trackerServer = trackerClient.getConnection(); //有併發問題,所以勿重用storageClient storageClient = new StorageClient(trackerServer, storageServer); } catch (Exception e) { logger.error(logger, e); } }
②我在呼叫storageClient.upload_file(byte[] file_buff, String file_ext_name, NameValuePair[] meta_list)前一句再storageClient = new StorageClient(trackerServer, storageServer);java.net.SocketException: socket closed at java.net.SocketInputStream.socketRead0(Native Method) at java.net.SocketInputStream.socketRead(Unknown Source) at java.net.SocketInputStream.read(Unknown Source) at java.net.SocketInputStream.read(Unknown Source) at org.csource.fastdfs.ProtoCommon.recvPackage(ProtoCommon.java:263) at org.csource.fastdfs.TrackerClient.getStoreStorage(TrackerClient.java:143) at org.csource.fastdfs.StorageClient.newWritableStorageConnection(StorageClient.java:1938) at org.csource.fastdfs.StorageClient.do_upload_file(StorageClient.java:703) at org.csource.fastdfs.StorageClient.upload_file(StorageClient.java:208) at org.csource.fastdfs.StorageClient.upload_file(StorageClient.java:226) at fastdfs.FileManager.upload(FileManager.java:76) at fastdfs.FastConcurrence.testLoad(FastConcurrence.java:48) at fastdfs.FastConcurrence.access$0(FastConcurrence.java:42) at fastdfs.FastConcurrence$1.run(FastConcurrence.java:27) at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) at java.util.concurrent.FutureTask.run(Unknown Source) at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) at java.lang.Thread.run(Unknown Source) java.io.IOException: recv cmd: 0 is not correct, expect cmd: 100 at org.csource.fastdfs.ProtoCommon.recvHeader(ProtoCommon.java:219) at org.csource.fastdfs.ProtoCommon.recvPackage(ProtoCommon.java:250) at org.csource.fastdfs.TrackerClient.getStoreStorage(TrackerClient.java:143) at org.csource.fastdfs.StorageClient.newWritableStorageConnection(StorageClient.java:1938) at org.csource.fastdfs.StorageClient.do_upload_file(StorageClient.java:703) at org.csource.fastdfs.StorageClient.upload_file(StorageClient.java:208) at org.csource.fastdfs.StorageClient.upload_file(StorageClient.java:226) at fastdfs.FileManager.upload(FileManager.java:76) at fastdfs.FastConcurrence.testLoad(FastConcurrence.java:48) at fastdfs.FastConcurrence.access$0(FastConcurrence.java:42) at fastdfs.FastConcurrence$1.run(FastConcurrence.java:27) at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) at java.util.concurrent.FutureTask.run(Unknown Source) at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) at java.lang.Thread.run(Unknown Source)
關鍵程式碼如下:
public static String[] upload(FastDFSFile file) {
try {
storageClient = new StorageClient(trackerServer, storageServer);//新加的
uploadResults = storageClient.upload_file(file.getContent(), file.getExt(), meta_list);
} catch (Exception e) {
logger.error("Exception when uploadind the file:" + file.getName(), e);
}
//省略一部分程式碼
return uploadResults;
}
好了,出現了和上面①一樣的問題。
我發現,其實這樣本質沒有變,還是去改了全域性storageClient 。應該new個新的,所以產生了③。
③我在呼叫storageClient.upload_file(byte[] file_buff, String file_ext_name, NameValuePair[]
meta_list)前一句再StorageClient storageClient = new StorageClient(trackerServer, storageServer); 就是原帖最後說明的解決辦法。
public static String[] upload(FastDFSFile file) {
try {
StorageClient storageClient = new StorageClient(trackerServer, storageServer);//新加的
uploadResults = storageClient.upload_file(file.getContent(), file.getExt(), meta_list);
} catch (Exception e) {
logger.error("Exception when uploadind the file:" + file.getName(), e);
}
//省略一部分程式碼
return uploadResults;
}
我還是報錯,和前面一樣的異常。原因還沒有來得及找,不過我採用鎖是沒有問題的。
④在呼叫方法的時候採用鎖就能解決。例如這個方法我加了鎖就沒有問題了。
private synchronized static void testLoad() throws Exception {
String filePath="D:\\fastDFS\\ques.png";
File content=new File(filePath);
TestFileManager test=new TestFileManager();
FastDFSFile file =test.getFastFile(content,"png");
for (int i=0;i<10;i++){
FileManager.upload(file);
}
System.out.println("完成一個執行緒!");
}
一路順暢。不過③為什麼報錯了,我還要研究下。
補:對於③我找到問題了。
原始碼下載下來,然後我就查StorageClient類,發現在do_upload_file方法中,有一段程式碼:
bUploadSlave = ((group_name != null && group_name.length() > 0) &&
(master_filename != null && master_filename.length() > 0) &&
(prefix_name != null));
if (bUploadSlave)
{
bNewConnection = this.newUpdatableStorageConnection(group_name, master_filename);
}
else
{
bNewConnection = this.newWritableStorageConnection(group_name);
}
其中的this.newWritableStorageConnection(group_name);
具體方法是:
protected boolean newWritableStorageConnection(String group_name) throws IOException, MyException
{
if (this.storageServer != null)
{
return false;
}
else
{
TrackerClient tracker = new TrackerClient();
this.storageServer = tracker.getStoreStorage(this.trackerServer, group_name);
if (this.storageServer == null)
{
throw new MyException("getStoreStorage fail, errno code: " + tracker.getErrorCode());
}
return true;
}
}
而這個方法裡面的 this.storageServer = tracker.getStoreStorage(this.trackerServer, group_name);
找進去發現每次trackerServer用完了,就關閉連線了。
trackerServer.close();
所以後面執行緒都被關閉了,就報IO異常,只要在方法③中new StorageClient前面再加一句:
TrackerServer trackerServer = trackerClient.getConnection();
就解決問題了。
this.storageServer = tracker.getStoreStorage(this.trackerServer, group_name);