Kafka系列之-Kafka Protocol例項分析
本文基於A Guide To The Kafka Protocol文件,以及Spark Streaming中實現的org.apache.spark.streaming.kafka.KafkaCluster
類。整理出Kafka中有關
- Metadata API
- Produce API
- Fetch API
- Offset API(Aka ListOffset)
- Offset Commit/Fetch API
- Group Membership API
- Administrative API
零、準備工作
需要執行以下部分的示例程式碼時,需要提前建好需要的topic
,寫入一些message
consumer
消費一下。
1、新建topic
[[email protected] kafka]$ bin/kafka-topics.sh --zookeeper kafka001:2181 --create --topic kafka_protocol_test --replication-factor 3 --partitions 4
Created topic "kafka_protocol_test".
[[email protected] kafka]$ bin/kafka-topics.sh --zookeeper kafka001:2181 --describe --topic kafka_protocol_test
Topic:kafka_protocol_test PartitionCount:4 ReplicationFactor:3 Configs:
Topic: kafka_protocol_test Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
Topic: kafka_protocol_test Partition: 1 Leader: 2 Replicas: 2,3,4 Isr: 2,3,4
Topic: kafka_protocol_test Partition: 2 Leader: 3 Replicas: 3,4,1 Isr: 3,4,1
Topic: kafka_protocol_test Partition: 3 Leader: 4 Replicas: 4,1,2 Isr: 4,1,2
2、produce message
使用Kafka系列之-自定義Producer中提到的KafkaProducerTool往指定kafka_protocol_test中傳送訊息,
public class ProducerTest2 {
public static void main(String[] args) throws InterruptedException {
KafkaProducerTool kafkaProducerTool = new KafkaProducerToolImpl("D:\\Files\\test\\kafkaconfig.properties");
int i = 1;
while(true) {
kafkaProducerTool.publishMessage("message" + (i++));
}
}
}
執行一段時間後停止寫入。執行一個console-consumer
從kafka_protocol_test
消費message
。
3、consume message
執行一個console-consumer
從kafka_protocol_test
消費。注意觀察該topic
每個partition
中的messages
數。
[[email protected] kafka]$ bin/kafka-console-consumer.sh --zookeeper kafka001:2181 --topic kafka_protocol_test --from-beginning
一、Metadata API
這個API是通過向Kafka叢集傳送一個TopicMetadaaRequest
請求,得到MetadataResponse
響應後從MetadataResponse
中解析出Metadata相關資訊。
TopicMetadataRequest
的結構和示例如下
case class TopicMetadataRequest(val versionId: Short,
val correlationId: Int,
val clientId: String,
val topics: Seq[String])
TopicMetadataRequest(TopicMetadataRequest.CurrentVersion, 0, config.clientId, topics)
得到的MetadataResponse
包含的資訊如下,可以從PartitionMetadata
中獲取到Partition
相關資訊,從TopicMetadata
中獲取到Topic
相關資訊,Broker
中記錄了Broker
的ip
和埠號等。
MetadataResponse => [Broker][TopicMetadata]
Broker => NodeId Host Port (any number of brokers may be returned)
NodeId => int32
Host => string
Port => int32
TopicMetadata => TopicErrorCode TopicName [PartitionMetadata]
TopicErrorCode => int16
PartitionMetadata => PartitionErrorCode PartitionId Leader Replicas Isr
PartitionErrorCode => int16
PartitionId => int32
Leader => int32
Replicas => [int32]
Isr => [int32]
1、所包含的資訊
可以查詢指定Topic是否存在,
指定topic有多少個partition,
每個partition當前哪個broker處於leader狀態,
每個broker的host和port是什麼
如果設定了auto.create.topics.enable
引數,遇到不存在的topic
時,就會按預設replication
和partition
新建該不存在的topic
。
2、示例
生成一個TopicMetadataRequest
物件
// 封裝一個TopicMetadataRequest型別的請求物件
val req = TopicMetadataRequest(TopicMetadataRequest.CurrentVersion, 0, config.clientId, topics)
// 傳送該請求
val resp: TopicMetadataResponse = consumer.send(req)
// 其中consumer物件是SimpleConsumer型別的
new SimpleConsumer(host, port, config.socketTimeoutMs,
config.socketReceiveBufferBytes, config.clientId)
(1)查詢topic是否存在
由於在TopicMetadataRequest
中可以傳送一組Seq[String]
型別的topics
,所以獲取到的TopicMetadataResponse.topicsMetadata
是Set[TopicMetadata]
型別的。
對每個TopicMetadata
物件,如果其errorCode
不為ErrorMapping.NoError
即表示該Topic
不正常。
topicMetadatas.foreach { topic =>
if (topic.errorCode == ErrorMapping.NoError)
println(s"topic: ${topic.topic}存在")
else
println(s"topic: ${topic.topic}不存在")
}
(2)獲取Topic的Partition個數
首先將所有TopicMetadata
中正常的Topic
過濾出來,然後遍歷每一個TopicMetadata
物件,獲取其partitionsMetadata
資訊,其長度即Partition
的個數
val existsTopicMetadatas = topicMetadatas.filter(tm => tm.errorCode == ErrorMapping.NoError)
existsTopicMetadatas.foreach { topic =>
val numPartitions = topic.partitionsMetadata.length
println(s"topic: ${topic.topic} 有${numPartitions}個partition")
}
(3)獲取Partition具體情況
以下程式碼可以獲取到Topic
的每個Partition
中的Leader Partition
以及replication
節點的資訊。
existsTopicMetadatas.foreach { topic =>
println(s"topic:${topic.topic}的Partition資訊:")
topic.partitionsMetadata.foreach { pm =>
val leaderPartition = pm.leader
println(s"\tpartition: ${pm.partitionId}")
println(s"\tleader節點:$leaderPartition")
val replicas = pm.replicas
println(s"\treplicas節點:$replicas")
}
}
3、執行結果
傳入上面新建的kafka_protocol_test
以及一個不存在的topic kafka_protocol_test1
,以上程式碼的執行結果如下:
=============Topic相關資訊===========
topic: kafka_protocol_test存在
topic: kafka_protocol_test1不存在
topic: kafka_protocol_test 有4個partition
=============Partition相關資訊===========
topic:kafka_protocol_test的Partition資訊:
partition: 0
leader節點:Some(id:1,host:kafka001,port:9092)
replicas節點:Vector(id:1,host:kafka001,port:9092, id:2,host:kafka002,port:9092, id:3,host:kafka003,port:9092)
partition: 1
leader節點:Some(id:2,host:kafka002,port:9092)
replicas節點:Vector(id:2,host:kafka002,port:9092, id:3,host:kafka003,port:9092, id:4,host:kafka004,port:9092)
partition: 2
leader節點:Some(id:3,host:kafka003,port:9092)
replicas節點:Vector(id:3,host:kafka003,port:9092, id:4,host:kafka004,port:9092, id:1,host:kafka001,port:9092)
partition: 3
leader節點:Some(id:4,host:kafka004,port:9092)
replicas節點:Vector(id:4,host:kafka004,port:9092, id:1,host:kafka001,port:9092, id:2,host:kafka002,port:9092)
二、Produce API
三、Fetch API
四、Offset API(Aka ListOffset)
這個API通過向Kafka叢集傳送一個OffsetRequest
物件,從返回的OffsetResponse
物件中獲取Offset
相關資訊。
OffsetRequest
物件描述如下
OffsetRequest => ReplicaId [TopicName [Partition Time MaxNumberOfOffsets]]
ReplicaId => int32
TopicName => string
Partition => int32
Time => int64
MaxNumberOfOffsets => int32
上面Time
的作用是,獲取特定時間(單位為ms)之前的所有messages
。如果設定為-1
則獲取最新的offset
,即下一條messages
的offset
位置;如果設定為-2
則獲取第一條message
的offset
位置,即當前partition
中的offset
起始位置。
OffsetResponse
物件描述如下
OffsetResponse => [TopicName [PartitionOffsets]]
PartitionOffsets => Partition ErrorCode [Offset]
Partition => int32
ErrorCode => int16
Offset => int64
1、所包含的資訊
通過該API可以獲取指定topic-partition
集合的合法offset
的範圍,需要直接連線到Partition
的Leader
節點。
2、示例
獲取指定topic
下所有partition
的offset
範圍
封裝一個getLeaderOffsets
方法,在此方法的基礎上分別封裝一個getEarliestLeaderOffsets
方法用於獲取最小offset
,getLatestLeaderOffsets
用於獲取最大offset
。
分別傳入的關鍵引數是前面提到的Time
,
def getLatestLeaderOffsets(
topicAndPartitions: Set[TopicAndPartition]
): Either[Err, Map[TopicAndPartition, LeaderOffset]] =
getLeaderOffsets(topicAndPartitions, OffsetRequest.LatestTime) // -1L
def getEarliestLeaderOffsets(
topicAndPartitions: Set[TopicAndPartition]
): Either[Err, Map[TopicAndPartition, LeaderOffset]] =
getLeaderOffsets(topicAndPartitions, OffsetRequest.EarliestTime) // -2L
在getLeaderOffsets中,查詢到當前partition的leader節點,
def findLeaders(topicAndPartitions: Set[TopicAndPartition]): Either[Err, Map[TopicAndPartition, (String, Int)]] = {
// 獲取當前topicAndPartitions中的所有topic
val topics = topicAndPartitions.map(_.topic)
// 獲取topic對應的MetadataResp物件,之前已過濾不存在的topic,所以這裡無需進一步過濾
val topicMetadatas = getMetadataResp(topics.toSeq).left.get
val leaderMap = topicMetadatas.flatMap { topic =>
topic.partitionsMetadata.flatMap { pm =>
val tp = TopicAndPartition(topic.topic, pm.partitionId)
// 獲取對應PartitionMedatada的leader節點資訊
pm.leader.map { l =>
tp -> (l.host -> l.port)
}
}
}.toMap
Right(leaderMap)
}
然後在這些節點中,封裝一個OffsetRequest
物件,向Kafka叢集獲得OffsetResponse
物件。
val resp = consumer.getOffsetsBefore(req)
val respMap = resp.partitionErrorAndOffsets
最後從OffsetResponse
物件中獲取offset
範圍,
val resp = getMetadataResp(topics.toSeq)
// 如果獲取的resp是left,則處理返回的Set[TopicMetadata]
val topicAndPartitions = processRespInfo(resp) { resp =>
val topicMetadatas = resp.left.get.asInstanceOf[Set[TopicMetadata]]
val existsTopicMetadatas = topicMetadatas.filter(tm => tm.errorCode == ErrorMapping.NoError)
getPartitions(existsTopicMetadatas)
}.asInstanceOf[Set[TopicAndPartition]]
// 獲取指定topic-partition最早的offset
val offsetBegin = getEarliestLeaderOffsets(topicAndPartitions).right.get
// 獲取指定topic-partition最晚的offset
val offsetEnd = getLatestLeaderOffsets(topicAndPartitions).right.get
print("=============Offset範圍資訊===========")
topicAndPartitions.foreach { tp =>
println(s"topic: ${tp.topic}, Partition: ${tp.partition} 的Offset範圍:")
println(s"\t${offsetBegin(tp).offset} ~ ${offsetEnd(tp).offset}")
}
3、執行結果
連線到kafka_protocol_test
,執行結果如下
topic: kafka_protocol_test, Partition: 0 的Offset範圍:
0 ~ 9000
topic: kafka_protocol_test, Partition: 1 的Offset範圍:
0 ~ 598134
topic: kafka_protocol_test, Partition: 2 的Offset範圍:
0 ~ 0
topic: kafka_protocol_test, Partition: 3 的Offset範圍:
0 ~ 91000
和第零節中圖片顯示結果一致。
五、Offset Commit/Fetch API
首先參考Offset Management文件中的描述,分析一下Kafka中有關Offset管理的文件。
在這篇文件中主要提供了OffsetFetch
和OffsetCommit
兩個API,其中
1、OffsetFetch API
這個API可以獲取一個Consumer
讀取message
的offset
資訊。傳送的請求是OffsetFetchRequest
型別的物件,接收到的是OffsetFetchResponse
型別的響應。具體offset
資訊可以從OffsetFetchResponse
物件中解析。
傳送的Request
請求為,需要指定consumer
所屬的group
,以及需要獲取offset
的所有TopicAndPartitions
。
val req = OffsetFetchRequest(groupId, topicAndPartitions.toSeq, 0)
或得到的響應為OffsetFetchResponse型別的物件。
val resp = consumer.fetchOffsets(req)
其中consumer物件是SimpleConsumer型別的
new SimpleConsumer(host, port, config.socketTimeoutMs,
config.socketReceiveBufferBytes, config.clientId)
具體獲取offset的邏輯如下,
withBrokers(Random.shuffle(config.seedBrokers)) { consumer =>
// 連線consumer,傳送該OffsetFetchRequest請求
val resp = consumer.fetchOffsets(req)
val respMap = resp.requestInfo
// 從傳入的topicAndPartitions中取出不包含在result中的topicAndPartition
val needed = topicAndPartitions.diff(result.keySet)
// 遍歷每一個需要獲取offset的topic-partition
needed.foreach { tp: TopicAndPartition =>
respMap.get(tp).foreach { ome: OffsetMetadataAndError =>
// 如果沒有錯誤
if (ome.error == ErrorMapping.NoError) {
result += tp -> ome
} else {
errs.append(ErrorMapping.exceptionFor(ome.error))
}
}
}
if (result.keys.size == topicAndPartitions.size) {
return Right(result)
}
}
2、OffsetCommit API
當最終呼叫commit()
方法,或者如果啟用了autocommit
引數時,這個API可以使consumer
儲存其消費的offset
資訊。
傳送的Request
請求為OffsetCommitRequest
型別。
OffsetCommitRequest
需要傳入的引數如下,
val offsetEnd = getLatestLeaderOffsets(topicAndPartitions).right.get
val resetOffsets = offsetsFetch.right.get.map { offsetInfo =>
val plus10Offset = offsetInfo._2.offset + 10
offsetInfo._1 -> OffsetAndMetadata(if (offsetEnd(offsetInfo._1).offset >= plus10Offset) plus10Offset else offsetEnd(offsetInfo._1).offset)
}
// resetOffsets型別為Map[TopicAndPartition, OffsetAndMetadata]
val req = OffsetCommitRequest(groupId, resetOffsets, 0)
// 傳送該請求的方式如下
val resp = consumer.commitOffsets(req)
3、GroupCoordinator API
需要注意的是這個API在Kafka-0.9以後的版本中才提供。指定Consumer Group
的offsets
資料儲存在某個特定的Broker
中。
向Kafka
叢集傳送一個GroupCoordinatorRequest
型別的請求引數,該request
物件中只需要指定一個groupId
即可。如下所示,
val req = new GroupCoordinatorRequest(groupId)
val resp = consumer.send(req)
獲取到的Response
物件是GroupCoordinatorResponse
型別的,在resp.coordinatorOpt
中返回一個BrokerEndpoint
物件,可以獲取該Broker
對應的Id, Ip, Port
等資訊。
4、示例
(1) 執行OffsetFetch API
(a) 獲取kafka_protocol_test的consumer group消費狀態
啟動一個console-consumer
從kafka_protocol_test topic
消費messages
。需要指定一個特定的group.id
引數,如下所示,使用預設的consumer.properties
配置檔案即可。
bin/kafka-console-consumer.sh --zookeeper kafka001:2181 --topic kafka_protocol_test --from-beginning --consumer.config ./config/consumer.properties
執行後,將其停止,檢視當前console-consumer
的消費狀態
[[email protected] kafka]$ bin/kafka-consumer-offset-checker.sh --zookeeper kafka001:2181 --topic kafka_protocol_test --group test-consumer-group
Group Topic Pid Offset logSize Lag Owner
test-consumer-group kafka_protocol_test 0 9000 9000 0 none
test-consumer-group kafka_protocol_test 1 26886 598134 571248 none
test-consumer-group kafka_protocol_test 2 0 0 0 none
test-consumer-group kafka_protocol_test 3 18296 91000 72704 none
(b) 執行OffsetFetch程式碼,檢視執行結果
執行時仍然傳入test-consumer-group
,執行結果如下
Topic: kafka_protocol_test, Partition: 0
Offset: 9000
Topic: kafka_protocol_test, Partition: 1
Offset: 26886
Topic: kafka_protocol_test, Partition: 2
Offset: 0
Topic: kafka_protocol_test, Partition: 3
Offset: 18296
對比後發現,兩個offset
資訊保持一致。
(2)執行OffsetCommit API
在這裡,將OffsetFetch
獲取到的每個TopicAndPartition
對應的Offset
加10
,如果加10
後超過其最大Offset
,則取最大Offset
。
在Commit
前後,兩次呼叫OffsetFetch API的程式碼,前後執行結果如下,
更新前的offset
:
Topic: kafka_protocol_test, Partition: 0
Offset: 9000
Topic: kafka_protocol_test, Partition: 1
Offset: 26886
Topic: kafka_protocol_test, Partition: 2
Offset: 0
Topic: kafka_protocol_test, Partition: 3
Offset: 18296
更新後的offset:(partition 0和partition 2沒有變化是由於加10後超過了該partition的offset範圍最大值)
Topic: kafka_protocol_test, Partition: 0
Offset: 9000
Topic: kafka_protocol_test, Partition: 1
Offset: 26896
Topic: kafka_protocol_test, Partition: 2
Offset: 0
Topic: kafka_protocol_test, Partition: 3
Offset: 18306
(3)執行Group Coordinator API
傳入一個consumer group
後,檢視其執行結果
Comsuner Group : test-consumer-group, coordinator broker is:
id: 1, host: kafka001, port: 9092
六、Group Membership API
這個API從Kafka-0.9.0.0版本開始出現。
在0.9以前的client api中,consumer是要依賴Zookeeper的。因為同一個consumer group中的所有consumer需要進行協同,進行下面所講的rebalance。但是因為zookeeper的“herd”與“split brain”,導致一個group裡面,不同的consumer擁有了同一個partition,進而會引起訊息的消費錯亂。為此,在0.9中,不再用zookeeper,而是Kafka叢集本身來進行consumer之間的同步。下面引自kafka設計的原文:
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+Rewrite+Design#Kafka0.9ConsumerRewriteDesign-Failuredetectionprotocol
七、Administrative API
注意,這個API也是從Kafka-0.9之後的client版本中才提供。通過這個API可以對Kafka叢集進行一些管理方面的操作,比如獲取所有的Consumer Groups
資訊。想要獲取叢集中所有Consumer Groups
資訊,需要傳送一個ListGroupRequest
請求到所有的Brokers
節點。
還可以通過傳送一個DescribeGroupsRequest
型別的請求物件,獲取對特定Consumer Group
的描述。
在Kafka-0.9之後的client
中,提供了一個kafka.admin.AdminClient
類,呼叫createSimplePlaintext
方法,傳入一個broker list字val client = AdminClient.createSimplePlaintext(“kafka001:9092,kafka002:9092,kafka003:9092,kafka004:9092”)AdminClient`提供了很多方法,比如
def findCoordinator(groupId: String): Node
def findAllBrokers(): List[Node]
def listAllGroups(): Map[Node, List[GroupOverview]]
def listAllConsumerGroups(): Map[Node, List[GroupOverview]]
等等。
相關推薦
Kafka系列之-Kafka Protocol例項分析
本文基於A Guide To The Kafka Protocol文件,以及Spark Streaming中實現的org.apache.spark.streaming.kafka.KafkaCluster類。整理出Kafka中有關 Metadata AP
Kafka系列之-Kafka監控工具KafkaOffsetMonitor配置及使用
KafkaOffsetMonitor是一個可以用於監控Kafka的Topic及Consumer消費狀況的工具,其配置和使用特別的方便。源專案Github地址為:https://github.com/quantifind/KafkaOffsetMonitor。
apache kafka系列之kafka.common.ConsumerRebalanceFailedException異常解決辦法
kafka.common.ConsumerRebalanceFailedException :log-push-record-consumer-group_mobile-pushremind02.lf
apache kafka系列之原始碼分析走讀-kafka內部模組分析
apache kafka中國社群QQ群:162272557 kafka整體結構分析: kafka原始碼工程目錄結構如下圖: 下面只對core目錄結構作說明,其他都是測試類或java客戶端程式碼 admin --管理員模組,操作和管理topic,parit
apache kafka系列之效能測試報告(虛擬機器版)
測試方法 在其他虛擬機器上使用 Kafka 自帶 kafka-producer-perf-test.sh 指令碼進行測試 Kafka 寫入效能 嘗試使用 kafka-simple-consumer-p
kafka系列之broker重點配置解析(三)
kafka broker 配置 對應kafka安裝目錄config/server.properties檔案的配置 broker.id 每一個Kafka的broker都有一個整數的標識。我們設定broker.id來標識它。預設這個整數是0。這
kafka系列之初步認識(零)
Kafka是什麼 釋出/訂閱訊息中介軟體 也被稱為分散式流平臺 Kafka的誕生 最初是為了解決LinkedIn資料通道問題,最後捐獻給了Apache,是Apache的頂級專案。 Kafka適合的場景 使用者行為跟蹤,可
Kafka系列之-自定義Producer
前面已經講到了,在Kafka中,Message是由Producer產生的,Producer產生的Message會發送到Topic的指定Partition中。Producer可以有多種形式,也可以由使用者通過Java,C以及Python語言來自定義。 K
移動APP持續交付系列之雲構建價值分析
現狀企業A:小APP,Android、iOS研發各三人;大APP,Android、iOS研發各10人。CICD通過部署一臺jenkins伺服器 + 一臺Android 構建機器來完成Android的APP構建,iOS構建完全依賴開發本地環境。企業B:3個APP5個研發同學,5臺伺服器年成本一萬左右 ,程式碼規
給老闆減刑系列之hadoop 安全缺陷分析之一:kerberos 的缺陷
近一年來從事金融資料安全架構方面工作,對大資料平臺安全重要性有了一些新的思考。最近看了Steve Loughran先生寫的本書《Hadoop and Kerberos: The Madness Beyond the Gate》,寫作風格幽默風趣,但是國內對大資料平臺的安全考慮的文
Spring Security系列之核心過濾器原始碼分析(四)
文章來源 前面的部分,我們關注了Spring Security是如何完成認證工作的,但是另外一部分核心的內容:過濾器,一直沒有提到,我們已經知道Spring Security使用了springSecurityFillterChian作為了安全過濾的入口,這一節主要分析一下這個過濾器鏈都包含了哪
玩轉大資料系列之二:資料分析與處理
經過了資料採集和同步之後,就可以在阿里雲上進行資料分析和處理,來玩轉您的資料了。本文向您介紹在阿里雲大資料各產品中,以及各產品之間怎樣來完成您的資料處理和資料分析。 MaxCompute 基於MaxCompute的大資料計算(MaxCompute + RDS) 使用MaxCompute分析IP
Apache Kafka系列(五) Kafka Connect及FileConnector示例
一. Kafka Connect簡介 Kafka是一個使用越來越廣的訊息系統,尤其是在大資料開發中(實時資料處理和分析)。為何整合其他系統和解耦應用,經常使用Producer來發送訊息到Broker,並使用Consumer來消費Broker中的訊息。Kafka Connect是到0.9版本才提供的並極大
一看就懂系列之 Mysql主從延遲分析
前言 在進入正題之前,需要先明白一個東西,DML和DDL是什麼東西? DDL 資料定義語言,create 、alter 、drop、truncate、create、drop 當執行DDL語句時,在每一條語句前後,都將提交當前的事務。如果使用者使用
iOS開發系列之iOS SDK例項教程
(原文地址) iOS SDK是開發iPhone和iPad 應用程式過程中必不可少的軟體開發包,提供了從建立程式,到編譯,除錯,執行,測試等一些列開發過程中所需要的工具,也提供了許多豐富的框架和相關API,供開發者在寫程式的時候使用。 iOS SDK內容眾多,一篇
java基礎系列之ConcurrentHashMap源碼分析(基於jdk1.8)
threshold 主存 類比 tile num method 過程 參數 nsf 1、前提 在閱讀這篇博客之前,希望你對HashMap已經是有所理解的;另外你對java的cas操作也是有一定了解的,因為在這個類中大量使用到了cas相關的操作來保證線程安全的。
Apache Kafka學習之Kafka基本原理
Kafka是一個使用Scala編寫的訊息系統,原本開發自LinkedIn,用作LinkedIn的活動流(Activity Stream)和運營資料處理管道(Pipeline)的基礎。現在它已被多家不同型別的公司作為多種型別的資料管道和訊息系統使用。
asyncio系列之抽絲剝繭分析事件排程的核心原理
先來看一下一個簡單的例子 例1: async def foo(): print('enter foo ...') await bar() print('exit foo ...') async def bar(): print('ent
Java入門系列之集合HashMap原始碼分析(十四)
前言 我們知道在Java 8中對於HashMap引入了紅黑樹從而提高操作效能,由於在上一節我們已經通過圖解方式分析了紅黑樹原理,所以在接下來我們將更多精力投入到解析原理而不是演算法本身,HashMap在Java中是使用比較頻繁的鍵值對資料型別,所以我們非常有必要詳細去分析背後的具體實現原理,無論是C#還是J
JVM系列之:從彙編角度分析Volatile
[toc] # 簡介 Volatile關鍵字對熟悉java多執行緒的朋友來說,應該很熟悉了。Volatile是JMM(Java Memory Model)的一個非常重要的關鍵詞。通過是用Volatile可以實現禁止重排序和變數值執行緒之間可見兩個主要特性。 今天我們從彙編的角度來分析一下Volatile