Kafka原始碼深度解析-序列4 -Producer -network層核心原理
在上一篇我們分析了Java NIO的原理和使用方式,本篇將進一步分析Kafka client是如何基於NIO構建自己的network層。
network層的分層架構
下圖展示了從最上層的KafkaProducer到最底層的Java NIO的構建層次關係:
圖中淡紫色的方框表示介面或者抽象類,白色方框是具體實現。
整個架構圖也體現了“面向介面程式設計”的思想:最底層Java NIO往上層全部以介面形式暴露,上面的3層,也都定義了相應的介面,逐層往上暴露。
介面的例項化(包括KafkaClient, Selectable, ChannelBuilder),也都在最外層的容器類KafkaProducer的建構函式中完成,KafkaProducer也就充當了一個“工廠”的角色,裝配所有這些底層元件。
network層元件與NIO元件的對映關係
從上圖也可以看出:
KakfaChannel基本是對SocketChannel的封裝,只是這個中間多個一個間接層:TransportLayer,為了封裝普通和加密的Channel;
Send/NetworkReceive是對ByteBuffer的封裝,表示一次請求的資料包;
Kafka的Selector封裝了NIO的Selector,內含一個NIO Selector物件。
Kafka Selector實現思路
1.從上圖可以看出, Selector內部包含一個Map, 也就是它維護了所有連線的連線池。這些KafkaChannel都由ChannelBuilder介面建立。
private final Map<String, KafkaChannel> channels;
2.所有的io操作:connect, read, write其實都是在poll這1個函式裡面完成的。具體什麼意思呢?
NetworkClient的send()函式,呼叫了selector.send(Send send), 但這個時候資料並沒有真的傳送出去,只是暫存在了selector內部相對應的channel裡面。下面看程式碼:
//Selector
public void send(Send send) {
KafkaChannel channel = channelOrFail(send.destination()); //找到資料包相對應的connection
try {
channel.setSend(send); //暫存在這個connection(channel)裡面
} catch (CancelledKeyException e) {
this.failedSends.add(send.destination());
close(channel);
}
}
//KafkaChannel
public void setSend(Send send) {
if (this.send != null) //關鍵點:當前的沒有發出去之前,不能暫存下1個!!!關於這個,後面還要詳細分析
throw new IllegalStateException("Attempt to begin a send operation with prior send operation still in progress.");
this.send = send; //暫存這個資料包
this.transportLayer.addInterestOps(SelectionKey.OP_WRITE);
}
public class KafkaChannel {
private final String id;
private final TransportLayer transportLayer;
private final Authenticator authenticator;
private final int maxReceiveSize;
private NetworkReceive receive;
private Send send; //關鍵點:1個channel一次只能存放1個數據包,在當前的send資料包沒有完整發出去之前,不能存放下一個
...
}
暫存在channel中之後,poll函式進行處理,我們抽象出一個輸入-輸出模型如下:
輸入:暫存的send資料包
輸出:完成的sends, 完成的receive(針對上1次的send), 建立的連線, 斷掉的連線。
@Override
public void poll(long timeout) throws IOException {
if (timeout < 0)
throw new IllegalArgumentException("timeout should be >= 0");
clear(); //關鍵點:每次poll之前,會清空“輸出”
if (hasStagedReceives())
timeout = 0;
/* check ready keys */
long startSelect = time.nanoseconds();
int readyKeys = select(timeout);
long endSelect = time.nanoseconds();
currentTimeNanos = endSelect;
this.sensors.selectTime.record(endSelect - startSelect, time.milliseconds());
if (readyKeys > 0) {
Set<SelectionKey> keys = this.nioSelector.selectedKeys();
Iterator<SelectionKey> iter = keys.iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove();
KafkaChannel channel = channel(key);
// register all per-connection metrics at once
sensors.maybeRegisterConnectionMetrics(channel.id());
lruConnections.put(channel.id(), currentTimeNanos);
try {
/* complete any connections that have finished their handshake */
if (key.isConnectable()) {
channel.finishConnect(); //把建立的連線,加入輸出結果集合
this.connected.add(channel.id());
this.sensors.connectionCreated.record();
}
...
if (channel.ready() && key.isReadable() && !hasStagedReceive(channel)) {
NetworkReceive networkReceive;
while ((networkReceive = channel.read()) != null)
addToStagedReceives(channel, networkReceive);
}
if (channel.ready() && key.isWritable()) {
Send send = channel.write();
if (send != null) {
this.completedSends.add(send); //把完成的傳送,加入輸出結果集合
this.sensors.recordBytesSent(channel.id(), send.size());
}
}
if (!key.isValid()) {
close(channel);
this.disconnected.add(channel.id()); //把斷掉的連線,加入輸出結果集合
}
} catch (Exception e) {
String desc = channel.socketDescription();
if (e instanceof IOException)
log.debug("Connection with {} disconnected", desc, e);
else
log.warn("Unexpected error from {}; closing connection", desc, e);
close(channel);
this.disconnected.add(channel.id()); //把斷掉的連線,加入輸出結果集合
}
}
}
addToCompletedReceives(); //把完成的接收,加入輸出結果集合
long endIo = time.nanoseconds();
this.sensors.ioTime.record(endIo - endSelect, time.milliseconds());
maybeCloseOldestConnection();
}
核心原理之1 – 訊息的分包
在上面的程式碼中,為什麼會有addToStagedReceives? 什麼叫做staged receives呢? 這叫要從資料的分包說起:
在NetworkClient中,往下傳的是一個完整的ClientRequest,進到Selector,暫存到channel中的,也是一個完整的Send物件(1個數據包)。但這個Send物件,交由底層的channel.write(Bytebuffer b)的時候,並不一定一次可以完全傳送,可能要呼叫多次write,才能把一個Send物件完全發出去。這是因為write是非阻塞的,不是等到完全發出去,才會返回。所以才有上面的程式碼:
if (channel.ready() && key.isWritable()) {
Send send = channel.write(); //send不為空,表示完全傳送出去,返回發出去的這個Send物件。如果沒完全發出去,返回null
if (send != null) {
this.completedSends.add(send);
this.sensors.recordBytesSent(channel.id(), send.size());
}
}
同樣,在接收的時候,channel.read(Bytebuffer b),一個response也可能要read多次,才能完全接收。所以就有了上面的while迴圈程式碼:
if (channel.ready() && key.isReadable() && !hasStagedReceive(channel)) {
NetworkReceive networkReceive;
while ((networkReceive = channel.read()) != null) //迴圈接收,直到1個response完全接收到,才會從while迴圈退出
addToStagedReceives(channel, networkReceive);
}
核心原理之2 – 訊息的分界
從上面知道,底層資料的通訊,是在每一個channel上面,2個源源不斷的byte流,一個send流,一個receive流。
send的時候,還好說,傳送之前知道一個完整的訊息的大小;
那接收的時候,我怎麼知道一個msg response什麼時候結束,然後開始接收下一個response呢?
這就需要一個小技巧:在所有request,response頭部,首先是一個定長的,4位元組的頭,receive的時候,至少呼叫2次read,先讀取這4個位元組,獲取整個response的長度,接下來再讀取訊息體。
public class NetworkReceive implements Receive {
private final String source;
private final ByteBuffer size; //頭部4位元組的buffer
private final int maxSize;
private ByteBuffer buffer; //後面整個訊息response的buffer
public NetworkReceive(String source) {
this.source = source;
this.size = ByteBuffer.allocate(4); //先分配4位元組的頭部
this.buffer = null;
this.maxSize = UNLIMITED;
}
}
核心原理之3 - 訊息時序保證
在InFlightRequests中,存放了所有發出去,但是response還沒有回來的request。request發出去的時候,入對;response回來,就把相對應的request出對。
final class InFlightRequests {
private final int maxInFlightRequestsPerConnection;
private final Map<String, Deque<ClientRequest>> requests = new HashMap<String, Deque<ClientRequest>>();
}
這個有個關鍵點:我們注意到request與response的配對,在這裡是用隊列表達的,而不是Map。用佇列的入隊,出隊,完成2者的匹配。要實現這個,伺服器就必須要保證訊息的時序:即在一個socket上面,假如發出去的reqeust是0, 1, 2,那返回的response的順序也必須是0, 1, 2。
但是伺服器是1 + N + M模型,所有的請求進入一個requestQueue,然後是多執行緒並行處理的。那它如何保證訊息的時序呢?
答案是mute/unmute機制:每當一個channel上面接收到一個request,這個channel就會被mute,然後等response返回之後,才會再unmute。這樣就保證了同1個連線上面,同時只會有1個請求被處理。
下面是服務端的程式碼:
selector.completedReceives.asScala.foreach { receive =>
try {
val channel = selector.channel(receive.source)
val session = RequestChannel.Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, channel.principal.getName),
channel.socketAddress)
val req = RequestChannel.Request(processor = id, connectionId = receive.source, session = session, buffer = receive.payload, startTimeMs = time.milliseconds, securityProtocol = protocol)
requestChannel.sendRequest(req)
} catch {
case e @ (_: InvalidRequestException | _: SchemaException) =>
// note that even though we got an exception, we can assume that receive.source is valid. Issues with constructing a valid receive object were handled earlier
error("Closing socket for " + receive.source + " because of error", e)
close(selector, receive.source)
}
selector.mute(receive.source) //收到請求,把這個請求對應的channel, mute
}
selector.completedSends.asScala.foreach { send =>
val resp = inflightResponses.remove(send.destination).getOrElse {
throw new IllegalStateException(s"Send for ${send.destination} completed, but not in `inflightResponses`")
}
resp.request.updateRequestMetrics()
selector.unmute(send.destination) //傳送response之後,把這個responese對應的channel, unmute
}
NetworkClient實現思路
上面已經講到:
(1)Selector維護了所有連線的連線池,所有連線上,訊息的傳送、接收都是通過poll函式進行的
(2)一個channel一次只能暫存1個Send物件。
但如果這個Send物件,一次poll之後,沒有完全傳送出去怎麼辦呢?看上層NetworkClient怎麼處理的:
關鍵的client.ready函式
先從Sender的run()函式看起:
public void run(long now) {
Cluster cluster = metadata.fetch();
// get the list of partitions with data ready to send
RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);
if (result.unknownLeadersExist)
this.metadata.requestUpdate();
// remove any nodes we aren't ready to send to
Iterator<Node> iter = result.readyNodes.iterator();
long notReadyTimeout = Long.MAX_VALUE;
while (iter.hasNext()) {
Node node = iter.next();
if (!this.client.ready(node, now)) { //關鍵函式!!!
iter.remove();
notReadyTimeout = Math.min(notReadyTimeout, this.client.connectionDelay(node, now));
}
}
// create produce requests
Map<Integer, List<RecordBatch>> batches = this.accumulator.drain(cluster,
result.readyNodes,
this.maxRequestSize,
now);
List<RecordBatch> expiredBatches = this.accumulator.abortExpiredBatches(this.requestTimeout, cluster, now);
// update sensors
for (RecordBatch expiredBatch : expiredBatches)
this.sensors.recordErrors(expiredBatch.topicPartition.topic(), expiredBatch.recordCount);
sensors.updateProduceRequestMetrics(batches);
List<ClientRequest> requests = createProduceRequests(batches, now);
long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);
if (result.readyNodes.size() > 0) {
log.trace("Nodes with data ready to send: {}", result.readyNodes);
log.trace("Created {} produce requests: {}", requests.size(), requests);
pollTimeout = 0;
}
for (ClientRequest request : requests) //每個request分屬於不同的Node
client.send(request, now); //client的send就是直接呼叫了selector.send,訊息暫存在channel裡面,沒有傳送
this.client.poll(pollTimeout, now); //呼叫selector.poll,處理連線、傳送、接收
}
在上面的程式碼中,有一個關鍵函式:client.ready(Node n, ..), 這個函式內部會判斷這個node有沒有ready,如果沒有ready,就會從readNodes裡面移除,接下來就不會往這個Node傳送訊息。
那什麼叫ready呢? 我們看一下程式碼:
public boolean ready(Node node, long now) {
if (isReady(node, now))
return true;
if (connectionStates.canConnect(node.idString(), now))
initiateConnect(node, now);
return false;
}
public boolean isReady(Node node, long now) {
return !metadataUpdater.isUpdateDue(now) && canSendRequest(node.idString());
}
private boolean canSendRequest(String node) {
return connectionStates.isConnected(node) && selector.isChannelReady(node) && inFlightRequests.canSendMore(node);
}
public boolean canSendMore(String node) {
Deque<ClientRequest> queue = requests.get(node);
return queue == null || queue.isEmpty() ||
(queue.peekFirst().request().completed() && queue.size() < this.maxInFlightRequestsPerConnection);
}
public boolean completed() {
return remaining <= 0 && !pending;
}
上面的程式碼封了好幾層,但總結下來,一個Node ready,可以向其傳送請求,需要符合以下幾個條件:
1. metadata正常,不需要update: !metadataUpdater.isUpdateDue(now)
2. 連線正常 connectionStates.isConnected(node)
3. channel是ready狀態:這個對於PlaintextChannel, 一直返回true
4. 當前該channel中,沒有in flight request,所有請求都處理完了
5. 當前該channel中,佇列尾部的request已經完全傳送出去, request.completed(),並且inflight request數目,沒有超過設定的最大值(預設為5,即允許在“天上飛”的request最多有5個,所謂在“天上飛”,就是發出去了,response還沒有回來)
而上面的第5個條件,正是解決了上面的問題:一個channel裡面的Send物件要是隻傳送了部分,下1次就不會處於ready狀態了。
client.poll函式
下面看一下client.poll,是如何封裝selector.poll的:
public List<ClientResponse> poll(long timeout, long now) {
long metadataTimeout = metadataUpdater.maybeUpdate(now);
try {
this.selector.poll(Utils.min(timeout, metadataTimeout, requestTimeoutMs));
} catch (IOException e) {
log.error("Unexpected error during I/O", e);
}
//上面說到,selector.poll函式,會把處理結果,放到一堆的狀態變數裡面(輸出結果集),現在就是處理這堆輸出結果的時候了。
long updatedNow = this.time.milliseconds();
List<ClientResponse> responses = new ArrayList<>();
handleCompletedSends(responses, updatedNow);
handleCompletedReceives(responses, updatedNow);
handleDisconnections(responses, updatedNow);
handleConnections();
handleTimedOutRequests(responses, updatedNow);
// invoke callbacks
for (ClientResponse response : responses) {
if (response.request().hasCallback()) {
try {
response.request().callback().onComplete(response);
} catch (Exception e) {
log.error("Uncaught error in request completion:", e);
}
}
}
return responses;
}
//Selector中的那堆狀態變數,在每次poll之前,被clear情況掉,每次poll之後,填充。
//然後在client.poll裡面,這堆輸出結果被處理
public class Selector implements Selectable {
。。。
private final List<Send> completedSends;
private final List<NetworkReceive> completedReceives;
private final Map<KafkaChannel, Deque<NetworkReceive>> stagedReceives;
private final List<String> disconnected;
private final List<String> connected;
。。。
}
連線檢測 & 自動重連機制
在所有tcp長連結的程式設計中,都有一個基本問題要解決:如何判斷1個連線是否斷開?客戶端需要維護所有連線的狀態(connecting, connected, disconnected),然後根據連線狀態做不同邏輯。
但在NIO中,並沒有一個函式,可以直接告訴你一個連線是否斷開了;在NetworkClient裡面,也並沒有開一個執行緒,不斷髮送心跳訊息,來檢測連線。那它是如何處理的呢?
檢測連線斷開的手段
在networkClient的實現中,用了3種手段,來判斷一個連線是否斷開:
手段1:所有的IO函式,connect, finishConnect, read, write都會拋IOException,因此任何時候,呼叫這些函式的時候,只要拋異常,就認為連線已經斷開。
手段2:selectionKey.isValid()
手段3:inflightRequests,所有發出去的request,都設定有一個response返回的時間。在這個時間內,response沒有回來,就認為連線斷了。
前2種手段,都集中在Select.poll函式裡面:
public void poll(long timeout) throws IOException {
if (timeout < 0)
throw new IllegalArgumentException("timeout should be >= 0");
clear();
if (hasStagedReceives())
timeout = 0;
/* check ready keys */
long startSelect = time.nanoseconds();
int readyKeys = select(timeout);
long endSelect = time.nanoseconds();
currentTimeNanos = endSelect;
this.sensors.selectTime.record(endSelect - startSelect, time.milliseconds());
if (readyKeys > 0) {
Set<SelectionKey> keys = this.nioSelector.selectedKeys();
Iterator<SelectionKey> iter = keys.iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove();
KafkaChannel channel = channel(key);
// register all per-connection metrics at once
sensors.maybeRegisterConnectionMetrics(channel.id());
lruConnections.put(channel.id(), currentTimeNanos);
try {
/* complete any connections that have finished their handshake */
if (key.isConnectable()) {
channel.finishConnect();
this.connected.add(channel.id());
this.sensors.connectionCreated.record();
}
/* if channel is not ready finish prepare */
if (channel.isConnected() && !channel.ready())
channel.prepare();
/* if channel is ready read from any connections that have readable data */
if (channel.ready() && key.isReadable() && !hasStagedReceive(channel)) {
NetworkReceive networkReceive;
while ((networkReceive = channel.read()) != null)
addToStagedReceives(channel, networkReceive);
}
/* if channel is ready write to any sockets that have space in their buffer and for which we have data */
if (channel.ready() && key.isWritable()) {
Send send = channel.write();
if (send != null) {
this.completedSends.add(send);
this.sensors.recordBytesSent(channel.id(), send.size());
}
}
if (!key.isValid()) { //手段2
close(channel);
this.disconnected.add(channel.id());
}
} catch (Exception e) { //手段1:任何一個io函式,只要拋錯,就認為連線斷了
String desc = channel.socketDescription();
if (e instanceof IOException)
log.debug("Connection with {} disconnected", desc, e);
else
log.warn("Unexpected error from {}; closing connection", desc, e);
close(channel);
this.disconnected.add(channel.id());
}
}
}
addToCompletedReceives();
long endIo = time.nanoseconds();
this.sensors.ioTime.record(endIo - endSelect, time.milliseconds());
maybeCloseOldestConnection();
}
第3種手段,在NetworkClient裡面:
public List<ClientResponse> poll(long timeout, long now) {
long metadataTimeout = metadataUpdater.maybeUpdate(now);
try {
this.selector.poll(Utils.min(timeout, metadataTimeout, requestTimeoutMs));
} catch (IOException e) {
log.error("Unexpected error during I/O", e);
}
long updatedNow = this.time.milliseconds();
List<ClientResponse> responses = new ArrayList<>();
handleCompletedSends(responses, updatedNow);
handleCompletedReceives(responses, updatedNow);
handleDisconnections(responses, updatedNow);
handleConnections();
handleTimedOutRequests(responses, updatedNow); //手段3:處理所有TimeOutRequests
for (ClientResponse response : responses) {
if (response.request().hasCallback()) {
try {
response.request().callback().onComplete(response);
} catch (Exception e) {
log.error("Uncaught error in request completion:", e);
}
}
}
return responses;
}
private void processDisconnection(List<ClientResponse> responses, String nodeId, long now) {
connectionStates.disconnected(nodeId, now);
for (ClientRequest request : this.inFlightRequests.clearAll(nodeId)) {
log.trace("Cancelled request {} due to node {} being disconnected", request, nodeId);
if (!metadataUpdater.maybeHandleDisconnection(request)) //把MetaDataRequest排除在外,其它所有請求,只要超時,就認為連線斷開
responses.add(new ClientResponse(request, now, true, null));
}
}
除了上述的2個地方,還要一個地方,就是初始化的時候
private void initiateConnect(Node node, long now) {
String nodeConnectionId = node.idString();
try {
log.debug("Initiating connection to node {} at {}:{}.", node.id(), node.host(), node.port());
this.connectionStates.connecting(nodeConnectionId, now);
selector.connect(nodeConnectionId,
new InetSocketAddress(node.host(), node.port()),
this.socketSendBuffer,
this.socketReceiveBuffer);
} catch (IOException e) { //檢測到連線斷開
connectionStates.disconnected(nodeConnectionId, now);
metadataUpdater.requestUpdate();
log.debug("Error connecting to node {} at {}:{}:", node.id(), node.host(), node.port(), e);
}
}
檢測時機
從上面程式碼我們可以看出,連線的檢測時機,有2個:
一個是初始建立連線的時候,一個就是每次poll迴圈,每poll一次,就收集到一個斷開的連線集合。
下面分別是Selector和NetworkClient中,關於連線狀態的資料結構:
//Selector中的連線狀態
public class Selector implements Selectable {
private final List<String> disconnected;
private final List<String> connected;
..
}
//NetworkClient中的連線狀態維護
public class NetworkClient implements KafkaClient {
private final ClusterConnectionStates connectionStates;
...
}
final class ClusterConnectionStates {
private final long reconnectBackoffMs; //重連的時間間隔
private final Map<String, NodeConnectionState> nodeState;
}
private static class NodeConnectionState {
ConnectionState state;
long lastConnectAttemptMs; //上1次發起重連的時間
...
}
public enum ConnectionState {
DISCONNECTED, CONNECTING, CONNECTED
}
總結:
1. Selector中的連線狀態,在每次poll之前,會呼叫clear清空;在poll之後,收集。
2. Selector中的連線狀態,會傳給上層NetworkClient,用於它更新自己的連線狀態
3. 出了來自Selctor,NetworkClient自己內部的inflightRequests,也就是上面的手段3, 也用於檢測連線狀態。
通過上面的機制,就保證了NetworkClient可以實時準確維護所有connection的狀態。
自動重連 - ready函式
狀態知道了,那剩下的就是自動重連了。這個發生在更上層的Send的run函式裡面:
//Sender
public void run(long now) {
Cluster cluster = metadata.fetch();
RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);
if (result.unknownLeadersExist)
this.metadata.requestUpdate();
Iterator<Node> iter = result.readyNodes.iterator();
long notReadyTimeout = Long.MAX_VALUE;
while (iter.hasNext()) {
Node node = iter.next();
if (!this.client.ready(node, now)) { //關鍵的ready函式
iter.remove();
notReadyTimeout = Math.min(notReadyTimeout, this.client.connectionDelay(node, now));
}
}
public boolean ready(Node node, long now) {
if (isReady(node, now))
return true;
if (connectionStates.canConnect(node.idString(), now))
initiateConnect(node, now); //發起重連
return false;
}
public boolean canConnect(String id, long now) {
NodeConnectionState state = nodeState.get(id);
if (state == null)
return true;
else
return state.state == ConnectionState.DISCONNECTED && now - state.lastConnectAttemptMs >= this.reconnectBackoffMs;
}
從上面函式可以看出,每次Send發資料之前,會先呼叫client.ready(node)判斷該node的連線是否可用。
在ready內部,如果連線不是connected狀態,會再判斷是否可以發起自動重連,檢測條件有2個:
條件1: 它不能是connecting狀態,必須是disconnected
條件2: 重連不能太頻繁。當前時間距離上1次重連時間,要有一定的間隔。如果broker掛了,你太頻繁的重連也不起作用。
這裡有個關鍵點:因為都是非阻塞呼叫,本次雖然檢測到連線斷了,但只是發起連線,不會等到連線建立好了,再執行下面的程式碼。
會在poll之後,判斷連線是否建立;在下1次或者下幾次poll之前,可能連線才會建立好,ready才會返回true.
相關推薦
Kafka原始碼深度解析-序列4 -Producer -network層核心原理
在上一篇我們分析了Java NIO的原理和使用方式,本篇將進一步分析Kafka client是如何基於NIO構建自己的network層。 network層的分層架構 下圖展示了從最上層的KafkaProducer到最底層的Java NIO的構建層次關係:
Kafka原始碼深度解析-序列3 -Producer -Java NIO
在上一篇我們分析了Metadata的更新機制,其中涉及到一個問題,就是Sender如何跟伺服器通訊,也就是網路層。同很多Java專案一樣,Kafka client的網路層也是用的Java NIO,然後在上面做了一層封裝。 下面首先看一下,在Sender和伺服器
Kafka原始碼深度解析-序列5 -Producer -RecordAccumulator佇列分析
在Kafka原始碼分析-序列2中,我們提到了整個Producer client的架構圖,如下所示: 其它幾個元件我們在前面都講過了,今天講述最後一個元件RecordAccumulator. Batch傳送 在以前的kafka client中,每條訊
Kafka原始碼深度解析-序列9 -Consumer -SubscriptionState內部結構分析
在前面講了,KafkaConsumer的一個重要部件就是SubscriptionState,這個部件維護了Consumer的消費狀態,本篇對其內部結構進行分析。 2種訂閱策略 在第1篇講過,consumer可以自己指定要消費哪個partition,而不是
Kafka原始碼深度解析-系列1 -訊息佇列的策略與語義
-Kafka關鍵概念介紹 -訊息佇列的各種策略與語義 作為一個訊息佇列,Kafka在業界已經相當有名。相對傳統的RabbitMq/ActiveMq,Kafka天生就是分散式的,支援資料的分片、複製以及叢集的方便擴充套件。 與此同時,Kafka是高可靠的、持
SnapHelper原始碼深度解析
目錄介紹 01.SnapHelper簡單介紹 1.1 SnapHelper作用 1.2 SnapHelper類分析 1.3 LinearSnapHelper類分析 1.4 PagerSnapHelper類分析 02.SnapHelper原始碼分析
FeignClient原始碼深度解析
微信公眾號:吉姆餐廳ak 學習更多原始碼知識,歡迎關注。 全文共16984字左右。 概述 springCloud feign主要對netflix feign進行了增強和包裝,本篇從原始碼角度帶你過一遍裝配流程,揭開feign底層的神祕面紗。 主要包括feign整合r
《Spring原始碼深度解析》讀後感
大概三週看完《Spring原始碼深度解析》寫下一篇讀後感玩 首先高度概括:內容過於豐富 重點不突出 本書共分8個模組 1、XML解析部分非常全面, 各種配置方法, 解析步驟都有介紹,這裡其實就是些巢狀的呼叫,Spring原始碼肯定比自己寫的優美。
原始碼系列Spring,Mybatis,Springboot,Netty原始碼深度解析-Spring的整體架構與容器的基本實現-mybatis原始碼深度解析與最佳實踐
6套原始碼系列Spring,Mybatis,Springboot,Netty原始碼深度解析視訊課程 6套原始碼套餐課程介紹: 1、6套精品是掌櫃最近整理出的最新課程,都是當下最火的技術,最火的課程,也是全網課程的精品; 2、6套資源包含:全套完整
《Spring原始碼深度解析》學習筆記
《Spring原始碼深度解析》學習筆記——Spring的整體架構與容器的基本實現 spring框架是一個分層架構,它包含一系列的功能要素,並被分為大約20個模組,如下圖所示 這些模組被總結為以下幾個部分: Core Container Core Container
Mybatis攔截器原始碼深度解析
目錄: 一. 建立攔截器鏈 1. 建立物件 2. 建立配置檔案 3. 載入攔截器鏈 二. 方法呼叫解析 1. 對請求物件進行攔截器包裝 2. 執行呼叫 三. 小結 Mybatis攔截器 可以幫助我們在執行sql語句過程中增加外掛以實現一些通用的邏輯,比
Spring原始碼深度解析-1、Spring核心類簡單介紹
在更新JAVA基礎原始碼學習的同時,也有必要把Spring抓一抓,以前對於spring的程度僅在於使用,以及一點IOC/AOP的概念,具體深層的瞭解不是很深入,每次看了一點原始碼就看不下去,然後一轉眼都忘記看了啥。 所以這次專門買了書,來細細品味下Spring。 希望能從這一波學習中加強自己
Mybatis原始碼深度解析
前言: mybatis是我們常用的一種操作資料庫的框架。 我們在使用的mybatis有多種方式:原生mybatis、與Spring結合使用的mybatis、與SprinBoot結合使用的mybatis。 使
Spring原始碼深度解析,事務案例講解高階
Spring的整體架構Spring框架是一個分層架構,它包含一系列的功能要素,並被分為大約20個模組,如下圖所示 這些模組被總結為以下幾個部分: Core Container Core Container(核心容器)包含有Core、Beans、Context和Expression Lan
Springboot原始碼深度解析,方法解析,類載入解析,容器建立
springboot的啟動都是從main方法開始的,如下:@SpringBootApplicationpublic class Application { public static void main(String[] args) { SpringApplication.run(Application.cl
spring原始碼深度解析筆記(三)
之前提到在xmlBeanFactory建構函式中呼叫了XmlBeanDefinitionReader型別的reader屬性提供的方法this.reader.loadBeanDefinitions(resource),這就是載入整個資源載入的切入點。 當進入XmlBeanDe
spring原始碼深度解析筆記(四)
DTD與XSD的區別 DTD(Document Type Definition)即文件型別定義,是一種XML約束模式語言,是XML檔案的驗證機制,是屬於XML檔案組成的一部分。DTD是一種保證XML文件格式正確的有效方法,可以通過比較XML文件和DTD檔案來看
python3網路爬蟲-破解天眼查+企業工商資料-分散式爬蟲系統-原始碼深度解析
Python爬蟲-2018年-我破解天眼查和啟信寶企業資料爬蟲--破解反爬技術那些事情 最近在自己用python3+mongdb寫了一套分散式多執行緒的天眼查爬蟲系統,實現了對天眼查整個網站的全部資料各種維度的採集和儲存,主要是為了深入學習爬蟲技術使用,並且根據天眼查網頁的
RecyclerView用法和原始碼深度解析
目錄介紹 1.RecycleView的結構 2.Adapter 2.1 RecyclerView.Adapter扮演的角色 2.2 重寫的方法 2.3 notifyDataSetChanged()重新整理資料 2.4 資料變更通知之觀察者模式 a.首先看.
《Spring原始碼深度解析》pdf附網盤下載連結送給還在迷茫的你
技術書閱讀方法論 一.速讀一遍(最好在1~2天內完成) 人的大腦記憶力有限,在一天內快速看完一本書會在大腦裡留下深刻印象,對於之後複習以及總結都會有特別好的作用。 對於每一章的知識,先閱讀標題,弄懂大概講的是什麼主題,再去快速看一遍,不懂也沒有關係,但是一定要在不懂的