【從入門到放棄-Java】併發程式設計-NIO-Channel
簡介
channel即通道,可以用來讀、寫資料,它是全雙工的可以同時用來讀寫操作。這也是它與stream流的最大區別。
channel需要與buffer配合使用,channel通道的一端是buffer,一端是資料來源實體,如檔案、socket等。在nio中,通過channel的不同實現來處理 不同實體與資料buffer中的資料傳輸。
channel介面:
package java.nio.channels; import java.io.IOException; import java.io.Closeable; /** * A nexus for I/O operations. * * <p> A channel represents an open connection to an entity such as a hardware * device, a file, a network socket, or a program component that is capable of * performing one or more distinct I/O operations, for example reading or * writing. * * <p> A channel is either open or closed. A channel is open upon creation, * and once closed it remains closed. Once a channel is closed, any attempt to * invoke an I/O operation upon it will cause a {@link ClosedChannelException} * to be thrown. Whether or not a channel is open may be tested by invoking * its {@link #isOpen isOpen} method. * * <p> Channels are, in general, intended to be safe for multithreaded access * as described in the specifications of the interfaces and classes that extend * and implement this interface. * * * @author Mark Reinhold * @author JSR-51 Expert Group * @since 1.4 */ public interface Channel extends Closeable { /** * Tells whether or not this channel is open. * * @return <tt>true</tt> if, and only if, this channel is open */ public boolean isOpen(); /** * Closes this channel. * * <p> After a channel is closed, any further attempt to invoke I/O * operations upon it will cause a {@link ClosedChannelException} to be * thrown. * * <p> If this channel is already closed then invoking this method has no * effect. * * <p> This method may be invoked at any time. If some other thread has * already invoked it, however, then another invocation will block until * the first invocation is complete, after which it will return without * effect. </p> * * @throws IOException If an I/O error occurs */ public void close() throws IOException; }
常見的channel實現有:
- FileChannel:檔案讀寫資料通道
- SocketChannel:TCP讀寫網路資料通道
- ServerSocketChannel:服務端網路資料讀寫通道,可以監聽TCP連線。對每一個新進來的連線都會建立一個SocketChannel。
- DatagramChannel:UDP讀寫網路資料通道
FileChannel
FileChannel是一個抽象類,它繼承了AbstractInterruptibleChannel類,並實現了 SeekableByteChannel, GatheringByteChannel, ScatteringByteChannel介面。
具體的實現類主要是sun.nio.ch.FileChannelImpl。下面詳細分析下FileChannelImpl中每個方法的具體實現。
open
private FileChannelImpl(FileDescriptor var1, String var2, boolean var3, boolean var4, boolean var5, Object var6) { //主要記載作業系統維護的檔案描述符 this.fd = var1; //是否可讀 this.readable = var3; //是否可寫 this.writable = var4; //是否以追加的方式開啟 this.append = var5; this.parent = var6; this.path = var2; //底層使用native的read和write來處理檔案的 this.nd = new FileDispatcherImpl(var5); } //FileInputStream::getChannel 呼叫 FileChannelImpl.open(fd, path, true, false, this) 獲取只讀channel public static FileChannel open(FileDescriptor var0, String var1, boolean var2, boolean var3, Object var4) { return new FileChannelImpl(var0, var1, var2, var3, false, var4); } //FileOutputStream::getChannel 呼叫 FileChannelImpl.open(fd, path, false, true, append, this) 獲取只寫channel public static FileChannel open(FileDescriptor var0, String var1, boolean var2, boolean var3, boolean var4, Object var5) { return new FileChannelImpl(var0, var1, var2, var3, var4, var5); } private FileChannelImpl(FileDescriptor fd, String path, boolean readable, boolean writable, boolean direct, Object parent) { this.fd = fd; //是否可讀 this.readable = readable; //是否可寫 this.writable = writable; //對於從流建立的channel,在結束時要做不同的清理動作,(openJDK中才有,sun的jdk中沒有) this.parent = parent; //原始檔的path this.path = path; //是否使用DirectIO this.direct = direct; this.nd = new FileDispatcherImpl(); if (direct) { assert path != null; this.alignment = nd.setDirectIO(fd, path); } else { this.alignment = -1; } //當parent不存在時,則註冊一個cleaner,否則交由parent做清理動作。 // Register a cleaning action if and only if there is no parent // as the parent will take care of closing the file descriptor. // FileChannel is used by the LambdaMetaFactory so a lambda cannot // be used here hence we use a nested class instead. this.closer = parent != null ? null : CleanerFactory.cleaner().register(this, new Closer(fd)); } // Used by FileInputStream.getChannel(), FileOutputStream.getChannel // and RandomAccessFile.getChannel() public static FileChannel open(FileDescriptor fd, String path, boolean readable, boolean writable, boolean direct, Object parent) { return new FileChannelImpl(fd, path, readable, writable, direct, parent); }
- open方法主要是返回一個新new的FileChannelImpl物件,初始化時設定fileDescriptor、readable、writable、append、parent、path等屬性,看變數名很容易理解,在此不贅述變數含義。
read
//實現自SeekableByteChannel介面的方法,將檔案中的內容讀取到給定的byteBuffer中
public int read(ByteBuffer dst) throws IOException {
//保證讀寫時,channel處於開啟狀態
ensureOpen();
//判斷是否可讀
if (!readable)
throw new NonReadableChannelException();
synchronized (positionLock) {
if (direct)
Util.checkChannelPositionAligned(position(), alignment);
int n = 0;
int ti = -1;
try {
//開始阻塞,並註冊為Interruptible,可以被中斷
beginBlocking();
//將當前執行緒新增到NativeThreadSet中,並返回索引,方便後續操作。
//NativeThreadSet是一個執行緒安全的本地執行緒集合,方便管理,用來發送訊號
ti = threads.add();
if (!isOpen())
return 0;
do {
//當未被系統中斷(即讀取完畢)或channel未被關閉,則一直讀,將內容寫入到byteBuffer(dst)中
n = IOUtil.read(fd, dst, -1, direct, alignment, nd);
} while ((n == IOStatus.INTERRUPTED) && isOpen());
return IOStatus.normalize(n);
} finally {
//把當前執行緒從set中移出
threads.remove(ti);
//結束,釋放鎖
endBlocking(n > 0);
assert IOStatus.check(n);
}
}
}
//實現自ScatteringByteChannel介面的方法,將檔案中的內容依次讀取到給定的byteBuffer陣列中。
public long read(ByteBuffer[] dsts, int offset, int length)
throws IOException
{
if ((offset < 0) || (length < 0) || (offset > dsts.length - length))
throw new IndexOutOfBoundsException();
//保證讀寫時,channel處於開啟狀態
ensureOpen();
//判斷是否可讀
if (!readable)
throw new NonReadableChannelException();
synchronized (positionLock) {
if (direct)
Util.checkChannelPositionAligned(position(), alignment);
long n = 0;
int ti = -1;
try {
//開始阻塞,並註冊為Interruptible,可以被中斷
beginBlocking();
//將當前執行緒新增到NativeThreadSet中,並返回索引,方便後續操作。
//NativeThreadSet是一個執行緒安全的本地執行緒集合,方便管理,用來發送訊號
ti = threads.add();
if (!isOpen())
return 0;
do {
//當未被系統中斷(即讀取完畢)或channel未被關閉,則一直讀,將內容寫入到byteBuffer(dst)中
n = IOUtil.read(fd, dsts, offset, length,
direct, alignment, nd);
} while ((n == IOStatus.INTERRUPTED) && isOpen());
return IOStatus.normalize(n);
} finally {
//把當前執行緒從set中移出
threads.remove(ti);
//結束,釋放鎖
endBlocking(n > 0);
assert IOStatus.check(n);
}
}
}
write
//實現自SeekableByteChannel介面的方法,將byteBuffer中的內容寫入到檔案中
public int write(ByteBuffer src) throws IOException {
//保證寫時,channel處於開啟狀態
ensureOpen();
//判斷是否可寫
if (!writable)
throw new NonWritableChannelException();
synchronized (positionLock) {
if (direct)
Util.checkChannelPositionAligned(position(), alignment);
int n = 0;
int ti = -1;
try {
//開始阻塞,並註冊為Interruptible,可以被中斷
beginBlocking();
//將當前執行緒新增到NativeThreadSet中,並返回索引,方便後續操作。
//NativeThreadSet是一個執行緒安全的本地執行緒集合,方便管理,用來發送訊號
ti = threads.add();
if (!isOpen())
return 0;
do {
//當未被系統中斷(即寫入完畢)或channel未被關閉,則一直寫,將內容寫入到檔案中
n = IOUtil.write(fd, src, -1, direct, alignment, nd);
} while ((n == IOStatus.INTERRUPTED) && isOpen());
return IOStatus.normalize(n);
} finally {
//把當前執行緒從set中移出
threads.remove(ti);
//結束,釋放鎖
assert IOStatus.check(n);
}
}
}
//實現自GatheringByteChannel介面的方法,將byteBuffer陣列中的內容依次寫入到檔案中
public long write(ByteBuffer[] srcs, int offset, int length)
throws IOException
{
if ((offset < 0) || (length < 0) || (offset > srcs.length - length))
throw new IndexOutOfBoundsException();
//保證寫時,channel處於開啟狀態
ensureOpen();
//判斷是否可寫
if (!writable)
throw new NonWritableChannelException();
synchronized (positionLock) {
if (direct)
Util.checkChannelPositionAligned(position(), alignment);
long n = 0;
int ti = -1;
try {
//開始阻塞,並註冊為Interruptible,可以被中斷
beginBlocking();
//將當前執行緒新增到NativeThreadSet中,並返回索引,方便後續操作。
//NativeThreadSet是一個執行緒安全的本地執行緒集合,方便管理,用來發送訊號
ti = threads.add();
if (!isOpen())
return 0;
do {
//當未被系統中斷(即寫入完畢)或channel未被關閉,則一直寫,將內容寫入到檔案中
n = IOUtil.write(fd, srcs, offset, length,
direct, alignment, nd);
} while ((n == IOStatus.INTERRUPTED) && isOpen());
return IOStatus.normalize(n);
} finally {
//把當前執行緒從set中移出
threads.remove(ti);
//結束,釋放鎖
assert IOStatus.check(n);
}
}
}
position
//實現自SeekableByteChannel介面的方法,獲取當前channel的position
public long position() throws IOException {
ensureOpen();
synchronized (positionLock) {
long p = -1;
int ti = -1;
try {
beginBlocking();
ti = threads.add();
if (!isOpen())
return 0;
boolean append = fdAccess.getAppend(fd);
do {
//append模式下,position在channel的末尾
// in append-mode then position is advanced to end before writing
p = (append) ? nd.size(fd) : nd.seek(fd, -1);
} while ((p == IOStatus.INTERRUPTED) && isOpen());
return IOStatus.normalize(p);
} finally {
threads.remove(ti);
endBlocking(p > -1);
assert IOStatus.check(p);
}
}
}
//實現自SeekableByteChannel介面的方法,設定當前channel的position為newPosition
public FileChannel position(long newPosition) throws IOException {
ensureOpen();
if (newPosition < 0)
throw new IllegalArgumentException();
synchronized (positionLock) {
long p = -1;
int ti = -1;
try {
beginBlocking();
ti = threads.add();
if (!isOpen())
return null;
do {
//設定當前position為newPosition
p = nd.seek(fd, newPosition);
} while ((p == IOStatus.INTERRUPTED) && isOpen());
return this;
} finally {
threads.remove(ti);
endBlocking(p > -1);
assert IOStatus.check(p);
}
}
}
size
實現自SeekableByteChannel介面的方法,返回當前實體(檔案)的大小
truncate
實現自SeekableByteChannel介面的方法,用來擷取檔案至newSize大小
force
實現自SeekableByteChannel介面的方法,用來將channel中尚未寫入磁碟的資料強制落盤
transferTo
將fileChannel中的資料傳遞至另一個channel
transferFrom
從其它channel讀取資料至fileChannel
SocketChannel
open
/**
* Opens a socket channel.
*
* <p> The new channel is created by invoking the {@link
* java.nio.channels.spi.SelectorProvider#openSocketChannel
* openSocketChannel} method of the system-wide default {@link
* java.nio.channels.spi.SelectorProvider} object. </p>
*
* @return A new socket channel
*
* @throws IOException
* If an I/O error occurs
*/
public static SocketChannel open() throws IOException {
return SelectorProvider.provider().openSocketChannel();
}
open方法是呼叫SelectorProvider中實現了java.nio.channels.spi.SelectorProvider#openSocketChannel的方法,底層實際是new SocketChannelImpl,呼叫native方法建立socket
connect
public boolean connect(SocketAddress sa) throws IOException {
//校驗Address是否合法
InetSocketAddress isa = Net.checkAddress(sa);
//獲取系統安全管理器
SecurityManager sm = System.getSecurityManager();
if (sm != null)
//校驗IP和埠是否被允許連線
sm.checkConnect(isa.getAddress().getHostAddress(), isa.getPort());
InetAddress ia = isa.getAddress();
//如果是本機地址,則獲取本機的host
if (ia.isAnyLocalAddress())
ia = InetAddress.getLocalHost();
try {
//加讀鎖
readLock.lock();
try {
//加寫鎖
writeLock.lock();
try {
int n = 0;
//是否阻塞
boolean blocking = isBlocking();
try {
//開啟connect前的校驗並設定為ST_CONNECTIONPENDING,如果blocking是true 即阻塞模式,則記錄當前執行緒的ID,以便接收訊號處理。
beginConnect(blocking, isa);
do {
//呼叫native connect方法
n = Net.connect(fd, ia, isa.getPort());
} while (n == IOStatus.INTERRUPTED && isOpen());
} finally {
//結束連線
endConnect(blocking, (n > 0));
}
assert IOStatus.check(n);
return n > 0;
} finally {
//釋放寫鎖
writeLock.unlock();
}
} finally {
//釋放讀鎖
readLock.unlock();
}
} catch (IOException ioe) {
// connect failed, close the channel
close();
throw SocketExceptions.of(ioe, isa);
}
}
configureBlocking
實現自SelectableChannel的介面方法,呼叫native方法設定socket的阻塞狀態
register
在AbstractSelectableChannel中定義,註冊要監聽的事件。
public final SelectionKey register(Selector sel, int ops, Object att)
throws ClosedChannelException
{
if ((ops & ~validOps()) != 0)
throw new IllegalArgumentException();
if (!isOpen())
throw new ClosedChannelException();
synchronized (regLock) {
if (isBlocking())
throw new IllegalBlockingModeException();
synchronized (keyLock) {
// re-check if channel has been closed
if (!isOpen())
throw new ClosedChannelException();
SelectionKey k = findKey(sel);
if (k != null) {
k.attach(att);
k.interestOps(ops);
} else {
// 向Selector中註冊事件
// New registration
k = ((AbstractSelector)sel).register(this, ops, att);
addKey(k);
}
return k;
}
}
}
read
//實現自ReadableByteChannel介面的方法,從socket中讀取資料至ByteBuffer
@Override
public int read(ByteBuffer buf) throws IOException {
Objects.requireNonNull(buf);
readLock.lock();
try {
boolean blocking = isBlocking();
int n = 0;
try {
//檢查channel是否開啟並已經是connected的狀態。如果blocking是true 即阻塞模式,則記錄當前執行緒的ID,以便接收訊號處理。
beginRead(blocking);
// check if input is shutdown
if (isInputClosed)
return IOStatus.EOF;
//如果是阻塞模式,則一直讀取直到資料讀取完畢;非阻塞模式則直接呼叫native方法不需要等待。
if (blocking) {
do {
n = IOUtil.read(fd, buf, -1, nd);
} while (n == IOStatus.INTERRUPTED && isOpen());
} else {
n = IOUtil.read(fd, buf, -1, nd);
}
} finally {
endRead(blocking, n > 0);
if (n <= 0 && isInputClosed)
return IOStatus.EOF;
}
return IOStatus.normalize(n);
} finally {
readLock.unlock();
}
}
//實現自ScatteringByteChannel介面的方法,從socket中依次讀取資料至ByteBuffer陣列
@Override
public long read(ByteBuffer[] dsts, int offset, int length)
throws IOException
{
Objects.checkFromIndexSize(offset, length, dsts.length);
readLock.lock();
try {
boolean blocking = isBlocking();
long n = 0;
try {
beginRead(blocking);
// check if input is shutdown
if (isInputClosed)
return IOStatus.EOF;
//如果是阻塞模式,則一直讀取直到資料讀取完畢;非阻塞模式則直接呼叫native方法不需要等待。
if (blocking) {
do {
n = IOUtil.read(fd, dsts, offset, length, nd);
} while (n == IOStatus.INTERRUPTED && isOpen());
} else {
n = IOUtil.read(fd, dsts, offset, length, nd);
}
} finally {
endRead(blocking, n > 0);
if (n <= 0 && isInputClosed)
return IOStatus.EOF;
}
return IOStatus.normalize(n);
} finally {
readLock.unlock();
}
}
write
//實現自ReadableByteChannel介面的方法,將ByteBuffer中的資料寫入socket
@Override
public int write(ByteBuffer buf) throws IOException {
Objects.requireNonNull(buf);
writeLock.lock();
try {
boolean blocking = isBlocking();
int n = 0;
try {
beginWrite(blocking);
//如果是阻塞模式,則一直讀取直到資料讀取完畢;非阻塞模式則直接呼叫native方法不需要等待。
if (blocking) {
do {
n = IOUtil.write(fd, buf, -1, nd);
} while (n == IOStatus.INTERRUPTED && isOpen());
} else {
n = IOUtil.write(fd, buf, -1, nd);
}
} finally {
endWrite(blocking, n > 0);
if (n <= 0 && isOutputClosed)
throw new AsynchronousCloseException();
}
return IOStatus.normalize(n);
} finally {
writeLock.unlock();
}
}
@Override
public long write(ByteBuffer[] srcs, int offset, int length)
throws IOException
{
Objects.checkFromIndexSize(offset, length, srcs.length);
writeLock.lock();
try {
boolean blocking = isBlocking();
long n = 0;
try {
beginWrite(blocking);
//如果是阻塞模式,則一直等待直到資料寫入完畢;非阻塞模式則直接呼叫native方法不需要等待。
if (blocking) {
do {
n = IOUtil.write(fd, srcs, offset, length, nd);
} while (n == IOStatus.INTERRUPTED && isOpen());
} else {
n = IOUtil.write(fd, srcs, offset, length, nd);
}
} finally {
endWrite(blocking, n > 0);
if (n <= 0 && isOutputClosed)
throw new AsynchronousCloseException();
}
return IOStatus.normalize(n);
} finally {
writeLock.unlock();
}
}
//實現自ReadableByteChannel介面的方法,將ByteBuffer陣列中的資料依次寫入socket
/**
* Writes a byte of out of band data.
*/
int sendOutOfBandData(byte b) throws IOException {
writeLock.lock();
try {
boolean blocking = isBlocking();
int n = 0;
try {
beginWrite(blocking);
//如果是阻塞模式,則一直等待直到資料寫入完畢;非阻塞模式則直接呼叫native方法不需要等待。
if (blocking) {
do {
n = sendOutOfBandData(fd, b);
} while (n == IOStatus.INTERRUPTED && isOpen());
} else {
n = sendOutOfBandData(fd, b);
}
} finally {
endWrite(blocking, n > 0);
if (n <= 0 && isOutputClosed)
throw new AsynchronousCloseException();
}
return IOStatus.normalize(n);
} finally {
writeLock.unlock();
}
}
ServerSocketChannel
socket
@Override
public ServerSocket socket() {
synchronized (stateLock) {
if (socket == null)
socket = ServerSocketAdaptor.create(this);
return socket;
}
}
bind
@Override
public ServerSocketChannel bind(SocketAddress local, int backlog) throws IOException {
synchronized (stateLock) {
ensureOpen();
if (localAddress != null)
throw new AlreadyBoundException();
InetSocketAddress isa = (local == null)
? new InetSocketAddress(0)
: Net.checkAddress(local);
SecurityManager sm = System.getSecurityManager();
if (sm != null)
sm.checkListen(isa.getPort());
//繫結前做一些前置處理,如將tcp socket檔案描述符轉換成SDP
NetHooks.beforeTcpBind(fd, isa.getAddress(), isa.getPort());
//繫結IP和地址
Net.bind(fd, isa.getAddress(), isa.getPort());
//開始監聽,設定socket上最多可以掛起backlog個連線,若backlog小於1 則預設設定50個
Net.listen(fd, backlog < 1 ? 50 : backlog);
localAddress = Net.localAddress(fd);
}
return this;
}
accept
@Override
public SocketChannel accept() throws IOException {
acceptLock.lock();
try {
int n = 0;
FileDescriptor newfd = new FileDescriptor();
InetSocketAddress[] isaa = new InetSocketAddress[1];
boolean blocking = isBlocking();
try {
begin(blocking);
do {
//阻塞等待接收客戶端連結
n = accept(this.fd, newfd, isaa);
} while (n == IOStatus.INTERRUPTED && isOpen());
} finally {
end(blocking, n > 0);
assert IOStatus.check(n);
}
if (n < 1)
return null;
//新接收的socket初始設定為阻塞模式(因此非阻塞模式的每次需要顯示設定)
// newly accepted socket is initially in blocking mode
IOUtil.configureBlocking(newfd, true);
InetSocketAddress isa = isaa[0];
//用新接收的socket建立SocketChannel
SocketChannel sc = new SocketChannelImpl(provider(), newfd, isa);
// check permitted to accept connections from the remote address
SecurityManager sm = System.getSecurityManager();
if (sm != null) {
try {
sm.checkAccept(isa.getAddress().getHostAddress(), isa.getPort());
} catch (SecurityException x) {
sc.close();
throw x;
}
}
return sc;
} finally {
acceptLock.unlock();
}
}
ServerSocketChannel並沒有read和write方法,只是繼承了AbstractSelectableChannel,以便在selector中使用
DatagramChannel
open
public DatagramChannelImpl(SelectorProvider sp)
throws IOException
{
super(sp);
ResourceManager.beforeUdpCreate();
try {
//如果不支援IPv6則使用IPv4
this.family = Net.isIPv6Available()
? StandardProtocolFamily.INET6
: StandardProtocolFamily.INET;
//設定非流式的socket(tcp是流模式協議,udp是資料報模式協議)
this.fd = Net.socket(family, false);
this.fdVal = IOUtil.fdVal(fd);
} catch (IOException ioe) {
ResourceManager.afterUdpClose();
throw ioe;
}
}
receive
public SocketAddress receive(ByteBuffer dst) throws IOException {
if (dst.isReadOnly())
throw new IllegalArgumentException("Read-only buffer");
readLock.lock();
try {
boolean blocking = isBlocking();
int n = 0;
ByteBuffer bb = null;
try {
SocketAddress remote = beginRead(blocking, false);
boolean connected = (remote != null);
SecurityManager sm = System.getSecurityManager();
if (connected || (sm == null)) {
// connected or no security manager
do {
n = receive(fd, dst, connected);
} while ((n == IOStatus.INTERRUPTED) && isOpen());
if (n == IOStatus.UNAVAILABLE)
return null;
} else {
// Cannot receive into user's buffer when running with a
// security manager and not connected
bb = Util.getTemporaryDirectBuffer(dst.remaining());
for (;;) {
do {
n = receive(fd, bb, connected);
} while ((n == IOStatus.INTERRUPTED) && isOpen());
if (n == IOStatus.UNAVAILABLE)
return null;
InetSocketAddress isa = (InetSocketAddress)sender;
try {
sm.checkAccept(isa.getAddress().getHostAddress(),
isa.getPort());
} catch (SecurityException se) {
// Ignore packet
bb.clear();
n = 0;
continue;
}
bb.flip();
dst.put(bb);
break;
}
}
//sender:傳送方地址, Set by receive0 (## ugh)
assert sender != null;
return sender;
} finally {
if (bb != null)
Util.releaseTemporaryDirectBuffer(bb);
endRead(blocking, n > 0);
assert IOStatus.check(n);
}
} finally {
readLock.unlock();
}
}
send
public int send(ByteBuffer src, SocketAddress target)
throws IOException
{
Objects.requireNonNull(src);
InetSocketAddress isa = Net.checkAddress(target, family);
writeLock.lock();
try {
boolean blocking = isBlocking();
int n = 0;
try {
//當connect後,remote會設定為連線的地址
SocketAddress remote = beginWrite(blocking, false);
if (remote != null) {
// connected
if (!target.equals(remote)) {
throw new AlreadyConnectedException();
}
do {
n = IOUtil.write(fd, src, -1, nd);
} while ((n == IOStatus.INTERRUPTED) && isOpen());
} else {
// not connected
SecurityManager sm = System.getSecurityManager();
if (sm != null) {
InetAddress ia = isa.getAddress();
if (ia.isMulticastAddress()) {
sm.checkMulticast(ia);
} else {
sm.checkConnect(ia.getHostAddress(), isa.getPort());
}
}
do {
n = send(fd, src, isa);
} while ((n == IOStatus.INTERRUPTED) && isOpen());
}
} finally {
endWrite(blocking, n > 0);
assert IOStatus.check(n);
}
return IOStatus.normalize(n);
} finally {
writeLock.unlock();
}
}
connect
@Override
public DatagramChannel connect(SocketAddress sa) throws IOException {
InetSocketAddress isa = Net.checkAddress(sa, family);
SecurityManager sm = System.getSecurityManager();
if (sm != null) {
InetAddress ia = isa.getAddress();
if (ia.isMulticastAddress()) {
sm.checkMulticast(ia);
} else {
sm.checkConnect(ia.getHostAddress(), isa.getPort());
sm.checkAccept(ia.getHostAddress(), isa.getPort());
}
}
readLock.lock();
try {
writeLock.lock();
try {
synchronized (stateLock) {
ensureOpen();
if (state == ST_CONNECTED)
throw new AlreadyConnectedException();
int n = Net.connect(family,
fd,
isa.getAddress(),
isa.getPort());
if (n <= 0)
throw new Error(); // Can't happen
// connected
remoteAddress = isa;
state = ST_CONNECTED;
// refresh local address
localAddress = Net.localAddress(fd);
// flush any packets already received.
boolean blocking = isBlocking();
if (blocking) {
IOUtil.configureBlocking(fd, false);
}
try {
ByteBuffer buf = ByteBuffer.allocate(100);
while (receive(buf) != null) {
buf.clear();
}
} finally {
if (blocking) {
IOUtil.configureBlocking(fd, true);
}
}
}
} finally {
writeLock.unlock();
}
} finally {
readLock.unlock();
}
return this;
}
udp是資料報模式的協議,是沒有connect的。這裡的connect實際上是在底層忽略了與其他地址的資料傳輸。
在connect後,就可以像socketChannel似得使用read和write了
總結
本文學習了各種channel的實現,主要是對底層native方法的一些封裝,針對不同屬性的實體(檔案、socket),使用對應的channel與byteBuffer傳輸資料。再通過byteBuffer與byte資料進行轉換。
channel的實現中,封裝了大量的native方法,重要的底層實現全在native中,後續可以深入學習下。
本文中出現的byteBuffer和selector將在接下來的文章中,單獨分析。
作者:aloof_
原文連結
本文為雲棲社群原創內容,未經