vertx實例的fileSystem文件系統模塊
阿新 • • 發佈:2019-03-16
當前 exti exist asf add RKE rdo write 上下文
初始化
//根據OS區分實現 System.getProperty("os.name").toLowerCase(); Utils.isWindows() ? new WindowsFileSystem(this) : new FileSystemImpl(this);
異步操作實現
FileSystemImpl 內部類 BlockingAction, 使用internalBlocking Pool 執行文件阻塞操作, xxxBlocking相關方法在當前主線程操作造成阻塞,盡量避免
protected abstract class BlockingAction<T> implements Action<T> {private final Handler<AsyncResult<T>> handler;//回調 handler protected final ContextImpl context;//上下文,作用:處理完成回調到同一線程 public BlockingAction(Handler<AsyncResult<T>> handler) { this.handler = handler; this.context = vertx.getOrCreateContext(); }/** * Run the blocking action using a thread from the worker pool. */ public void run() { context.executeBlocking(this, handler); //使用 internalBlocking Pool執行任務 } }
AsyncFileImpl
FileSystemImpl 的open方法返回的AsyncFileImpl,記住調用close方法,防止文件句柄泄露, linux可使用 lsof 工具查看當前file descriptor情況
AsyncFileImpl(VertxInternal vertx, String path, OpenOptions options, ContextImpl context) { if (!options.isRead() && !options.isWrite()) { throw new FileSystemException("Cannot open file for neither reading nor writing"); } this.vertx = vertx; Path file = Paths.get(path); /**定義open屬性*/ HashSet<OpenOption> opts = new HashSet<>(); if (options.isRead()) opts.add(StandardOpenOption.READ); if (options.isWrite()) opts.add(StandardOpenOption.WRITE); if (options.isCreate()) opts.add(StandardOpenOption.CREATE); if (options.isCreateNew()) opts.add(StandardOpenOption.CREATE_NEW); if (options.isSync()) opts.add(StandardOpenOption.SYNC); if (options.isDsync()) opts.add(StandardOpenOption.DSYNC); if (options.isDeleteOnClose()) opts.add(StandardOpenOption.DELETE_ON_CLOSE); if (options.isSparse()) opts.add(StandardOpenOption.SPARSE); if (options.isTruncateExisting()) opts.add(StandardOpenOption.TRUNCATE_EXISTING); try { //獲取文件權限字符 linux 下 ls -al 文件權限標識 rwx if (options.getPerms() != null) { //將權限字符轉化為FileAttribute屬性 FileAttribute<?> attrs = PosixFilePermissions.asFileAttribute(PosixFilePermissions.fromString(options.getPerms())); /**使用worker Pool,而不是用默認的Executors.newCachedThreadPool(threadFactory), 原則:vertx使用少量線程處理大並發,吞吐量優先*/ ch = AsynchronousFileChannel.open(file, opts, vertx.getWorkerPool(), attrs); } else { ch = AsynchronousFileChannel.open(file, opts, vertx.getWorkerPool()); } //屬性為"追加"獲取大小做為 write index if (options.isAppend()) writePos = ch.size(); } catch (IOException e) { throw new FileSystemException(e); } //上下文,作用:處理完成回調到同一線程 this.context = context; }
vertx實例的fileSystem文件系統模塊