1. 程式人生 > >Kafka系列之-Kafka Protocol例項分析

Kafka系列之-Kafka Protocol例項分析

  本文基於A Guide To The Kafka Protocol文件,以及Spark Streaming中實現的org.apache.spark.streaming.kafka.KafkaCluster類。整理出Kafka中有關

  • Metadata API
  • Produce API
  • Fetch API
  • Offset API(Aka ListOffset)
  • Offset Commit/Fetch API
  • Group Membership API
  • Administrative API
      

零、準備工作

  需要執行以下部分的示例程式碼時,需要提前建好需要的topic,寫入一些message

,再用consumer消費一下。

1、新建topic

[[email protected] kafka]$ bin/kafka-topics.sh --zookeeper kafka001:2181 --create --topic kafka_protocol_test --replication-factor 3 --partitions 4
Created topic "kafka_protocol_test".
[[email protected] kafka]$ bin/kafka-topics.sh --zookeeper kafka001:2181 --describe
--topic kafka_protocol_test Topic:kafka_protocol_test PartitionCount:4 ReplicationFactor:3 Configs: Topic: kafka_protocol_test Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3 Topic: kafka_protocol_test Partition: 1 Leader: 2 Replicas: 2,3,4 Isr: 2,3,4 Topic: kafka_protocol_test
Partition: 2 Leader: 3 Replicas: 3,4,1 Isr: 3,4,1 Topic: kafka_protocol_test Partition: 3 Leader: 4 Replicas: 4,1,2 Isr: 4,1,2

2、produce message

  使用Kafka系列之-自定義Producer中提到的KafkaProducerTool往指定kafka_protocol_test中傳送訊息,

public class ProducerTest2 {
    public static void main(String[] args) throws InterruptedException {
        KafkaProducerTool kafkaProducerTool = new KafkaProducerToolImpl("D:\\Files\\test\\kafkaconfig.properties");
        int i = 1;
        while(true) {
            kafkaProducerTool.publishMessage("message" + (i++));
        }
    }
}

  執行一段時間後停止寫入。執行一個console-consumerkafka_protocol_test消費message

3、consume message

  執行一個console-consumerkafka_protocol_test消費。注意觀察該topic每個partition中的messages數。

[[email protected] kafka]$ bin/kafka-console-consumer.sh --zookeeper kafka001:2181 --topic kafka_protocol_test --from-beginning

  這裡寫圖片描述

一、Metadata API

  這個API是通過向Kafka叢集傳送一個TopicMetadaaRequest請求,得到MetadataResponse響應後從MetadataResponse中解析出Metadata相關資訊。
  TopicMetadataRequest的結構和示例如下
  

case class TopicMetadataRequest(val versionId: Short,
                                val correlationId: Int,
                                val clientId: String,
                                val topics: Seq[String])

TopicMetadataRequest(TopicMetadataRequest.CurrentVersion, 0, config.clientId, topics)

  得到的MetadataResponse包含的資訊如下,可以從PartitionMetadata中獲取到Partition相關資訊,從TopicMetadata中獲取到Topic相關資訊,Broker中記錄了Brokerip和埠號等。

MetadataResponse => [Broker][TopicMetadata]
  Broker => NodeId Host Port  (any number of brokers may be returned)
    NodeId => int32
    Host => string
    Port => int32
  TopicMetadata => TopicErrorCode TopicName [PartitionMetadata]
    TopicErrorCode => int16
  PartitionMetadata => PartitionErrorCode PartitionId Leader Replicas Isr
    PartitionErrorCode => int16
    PartitionId => int32
    Leader => int32
    Replicas => [int32]
    Isr => [int32]

1、所包含的資訊

可以查詢指定Topic是否存在,
指定topic有多少個partition,
每個partition當前哪個broker處於leader狀態,
每個broker的host和port是什麼

  如果設定了auto.create.topics.enable引數,遇到不存在的topic時,就會按預設replicationpartition新建該不存在的topic
  

2、示例

  生成一個TopicMetadataRequest物件

// 封裝一個TopicMetadataRequest型別的請求物件
val req = TopicMetadataRequest(TopicMetadataRequest.CurrentVersion, 0, config.clientId, topics)
// 傳送該請求
val resp: TopicMetadataResponse = consumer.send(req)
// 其中consumer物件是SimpleConsumer型別的
new SimpleConsumer(host, port, config.socketTimeoutMs,
      config.socketReceiveBufferBytes, config.clientId)

(1)查詢topic是否存在
  由於在TopicMetadataRequest中可以傳送一組Seq[String]型別的topics,所以獲取到的TopicMetadataResponse.topicsMetadataSet[TopicMetadata]型別的。
  對每個TopicMetadata物件,如果其errorCode不為ErrorMapping.NoError即表示該Topic不正常。
  

topicMetadatas.foreach { topic =>
  if (topic.errorCode == ErrorMapping.NoError)
    println(s"topic: ${topic.topic}存在")
  else
    println(s"topic: ${topic.topic}不存在")
}

(2)獲取Topic的Partition個數
  首先將所有TopicMetadata中正常的Topic過濾出來,然後遍歷每一個TopicMetadata物件,獲取其partitionsMetadata資訊,其長度即Partition的個數

val existsTopicMetadatas = topicMetadatas.filter(tm => tm.errorCode == ErrorMapping.NoError)
existsTopicMetadatas.foreach { topic =>
   val numPartitions = topic.partitionsMetadata.length
   println(s"topic: ${topic.topic} 有${numPartitions}個partition")
}

(3)獲取Partition具體情況
  以下程式碼可以獲取到Topic的每個Partition中的Leader Partition以及replication節點的資訊。

existsTopicMetadatas.foreach { topic =>
    println(s"topic:${topic.topic}的Partition資訊:")
    topic.partitionsMetadata.foreach { pm =>
    val leaderPartition = pm.leader
    println(s"\tpartition: ${pm.partitionId}")
    println(s"\tleader節點:$leaderPartition")
    val replicas = pm.replicas
    println(s"\treplicas節點:$replicas")
  }
}

3、執行結果

  傳入上面新建的kafka_protocol_test以及一個不存在的topic kafka_protocol_test1,以上程式碼的執行結果如下:

=============Topic相關資訊===========
topic: kafka_protocol_test存在
topic: kafka_protocol_test1不存在
topic: kafka_protocol_test 有4個partition
=============Partition相關資訊===========
topic:kafka_protocol_test的Partition資訊:
    partition: 0
    leader節點:Some(id:1,host:kafka001,port:9092)
    replicas節點:Vector(id:1,host:kafka001,port:9092, id:2,host:kafka002,port:9092, id:3,host:kafka003,port:9092)
    partition: 1
    leader節點:Some(id:2,host:kafka002,port:9092)
    replicas節點:Vector(id:2,host:kafka002,port:9092, id:3,host:kafka003,port:9092, id:4,host:kafka004,port:9092)
    partition: 2
    leader節點:Some(id:3,host:kafka003,port:9092)
    replicas節點:Vector(id:3,host:kafka003,port:9092, id:4,host:kafka004,port:9092, id:1,host:kafka001,port:9092)
    partition: 3
    leader節點:Some(id:4,host:kafka004,port:9092)
    replicas節點:Vector(id:4,host:kafka004,port:9092, id:1,host:kafka001,port:9092, id:2,host:kafka002,port:9092)

二、Produce API

三、Fetch API

四、Offset API(Aka ListOffset)

  這個API通過向Kafka叢集傳送一個OffsetRequest物件,從返回的OffsetResponse物件中獲取Offset相關資訊。
  OffsetRequest物件描述如下

OffsetRequest => ReplicaId [TopicName [Partition Time MaxNumberOfOffsets]]
  ReplicaId => int32
  TopicName => string
  Partition => int32
  Time => int64
  MaxNumberOfOffsets => int32

  上面Time的作用是,獲取特定時間(單位為ms)之前的所有messages。如果設定為-1則獲取最新的offset,即下一條messagesoffset位置;如果設定為-2則獲取第一條messageoffset位置,即當前partition中的offset起始位置。

  OffsetResponse物件描述如下

OffsetResponse => [TopicName [PartitionOffsets]]
  PartitionOffsets => Partition ErrorCode [Offset]
  Partition => int32
  ErrorCode => int16
  Offset => int64

1、所包含的資訊

  通過該API可以獲取指定topic-partition集合的合法offset的範圍,需要直接連線到PartitionLeader節點。

2、示例

  獲取指定topic下所有partitionoffset範圍
  封裝一個getLeaderOffsets方法,在此方法的基礎上分別封裝一個getEarliestLeaderOffsets方法用於獲取最小offsetgetLatestLeaderOffsets用於獲取最大offset
  分別傳入的關鍵引數是前面提到的Time

def getLatestLeaderOffsets(
       topicAndPartitions: Set[TopicAndPartition]
       ): Either[Err, Map[TopicAndPartition, LeaderOffset]] =
    getLeaderOffsets(topicAndPartitions, OffsetRequest.LatestTime) // -1L
def getEarliestLeaderOffsets(
       topicAndPartitions: Set[TopicAndPartition]
       ): Either[Err, Map[TopicAndPartition, LeaderOffset]] =
    getLeaderOffsets(topicAndPartitions, OffsetRequest.EarliestTime) // -2L

  在getLeaderOffsets中,查詢到當前partition的leader節點,
  def findLeaders(topicAndPartitions: Set[TopicAndPartition]): Either[Err, Map[TopicAndPartition, (String, Int)]] = {
    // 獲取當前topicAndPartitions中的所有topic
    val topics = topicAndPartitions.map(_.topic)
    // 獲取topic對應的MetadataResp物件,之前已過濾不存在的topic,所以這裡無需進一步過濾
    val topicMetadatas = getMetadataResp(topics.toSeq).left.get

    val leaderMap = topicMetadatas.flatMap { topic =>
      topic.partitionsMetadata.flatMap { pm =>
        val tp = TopicAndPartition(topic.topic, pm.partitionId)
        // 獲取對應PartitionMedatada的leader節點資訊
        pm.leader.map { l =>
          tp -> (l.host -> l.port)
        }
      }
    }.toMap
    Right(leaderMap)
  }

  然後在這些節點中,封裝一個OffsetRequest物件,向Kafka叢集獲得OffsetResponse物件。

val resp = consumer.getOffsetsBefore(req)
val respMap = resp.partitionErrorAndOffsets

  最後從OffsetResponse物件中獲取offset範圍,

val resp = getMetadataResp(topics.toSeq)
    // 如果獲取的resp是left,則處理返回的Set[TopicMetadata]
val topicAndPartitions = processRespInfo(resp) { resp =>
val topicMetadatas = resp.left.get.asInstanceOf[Set[TopicMetadata]]
val existsTopicMetadatas = topicMetadatas.filter(tm => tm.errorCode == ErrorMapping.NoError)
   getPartitions(existsTopicMetadatas)
}.asInstanceOf[Set[TopicAndPartition]]

// 獲取指定topic-partition最早的offset
val offsetBegin = getEarliestLeaderOffsets(topicAndPartitions).right.get
// 獲取指定topic-partition最晚的offset
val offsetEnd = getLatestLeaderOffsets(topicAndPartitions).right.get

print("=============Offset範圍資訊===========")
topicAndPartitions.foreach { tp =>
   println(s"topic: ${tp.topic}, Partition: ${tp.partition} 的Offset範圍:")
   println(s"\t${offsetBegin(tp).offset} ~ ${offsetEnd(tp).offset}")
}

3、執行結果

  連線到kafka_protocol_test,執行結果如下

topic: kafka_protocol_test, Partition: 0Offset範圍:
    0 ~ 9000
topic: kafka_protocol_test, Partition: 1Offset範圍:
    0 ~ 598134
topic: kafka_protocol_test, Partition: 2Offset範圍:
    0 ~ 0
topic: kafka_protocol_test, Partition: 3Offset範圍:
    0 ~ 91000

  和第零節中圖片顯示結果一致。

五、Offset Commit/Fetch API

  首先參考Offset Management文件中的描述,分析一下Kafka中有關Offset管理的文件。
  在這篇文件中主要提供了OffsetFetchOffsetCommit兩個API,其中
  

1、OffsetFetch API

  這個API可以獲取一個Consumer讀取messageoffset資訊。傳送的請求是OffsetFetchRequest型別的物件,接收到的是OffsetFetchResponse型別的響應。具體offset資訊可以從OffsetFetchResponse物件中解析。
  傳送的Request請求為,需要指定consumer所屬的group,以及需要獲取offset的所有TopicAndPartitions

val req = OffsetFetchRequest(groupId, topicAndPartitions.toSeq, 0)

  或得到的響應為OffsetFetchResponse型別的物件。
val resp = consumer.fetchOffsets(req)
  其中consumer物件是SimpleConsumer型別的
new SimpleConsumer(host, port, config.socketTimeoutMs,
      config.socketReceiveBufferBytes, config.clientId)

  具體獲取offset的邏輯如下,
withBrokers(Random.shuffle(config.seedBrokers)) { consumer =>
  // 連線consumer,傳送該OffsetFetchRequest請求
  val resp = consumer.fetchOffsets(req)
  val respMap = resp.requestInfo
  // 從傳入的topicAndPartitions中取出不包含在result中的topicAndPartition
  val needed = topicAndPartitions.diff(result.keySet)
  // 遍歷每一個需要獲取offset的topic-partition
  needed.foreach { tp: TopicAndPartition =>
    respMap.get(tp).foreach { ome: OffsetMetadataAndError =>
      // 如果沒有錯誤
      if (ome.error == ErrorMapping.NoError) {
        result += tp -> ome
      } else {
        errs.append(ErrorMapping.exceptionFor(ome.error))
      }
    }
  }
  if (result.keys.size == topicAndPartitions.size) {
    return Right(result)
  }
}

2、OffsetCommit API

  當最終呼叫commit()方法,或者如果啟用了autocommit引數時,這個API可以使consumer儲存其消費的offset資訊。
  傳送的Request請求為OffsetCommitRequest型別。

  OffsetCommitRequest需要傳入的引數如下,

val offsetEnd = getLatestLeaderOffsets(topicAndPartitions).right.get
val resetOffsets = offsetsFetch.right.get.map { offsetInfo =>
val plus10Offset = offsetInfo._2.offset + 10
   offsetInfo._1 -> OffsetAndMetadata(if (offsetEnd(offsetInfo._1).offset >= plus10Offset) plus10Offset else offsetEnd(offsetInfo._1).offset)
    }
// resetOffsets型別為Map[TopicAndPartition, OffsetAndMetadata]
val req = OffsetCommitRequest(groupId, resetOffsets, 0)
// 傳送該請求的方式如下
val resp = consumer.commitOffsets(req)

3、GroupCoordinator API

  需要注意的是這個API在Kafka-0.9以後的版本中才提供。指定Consumer Groupoffsets資料儲存在某個特定的Broker中。
  向Kafka叢集傳送一個GroupCoordinatorRequest型別的請求引數,該request物件中只需要指定一個groupId即可。如下所示,

val req = new GroupCoordinatorRequest(groupId)
val resp = consumer.send(req)

  獲取到的Response物件是GroupCoordinatorResponse型別的,在resp.coordinatorOpt中返回一個BrokerEndpoint物件,可以獲取該Broker對應的Id, Ip, Port等資訊。

4、示例

(1) 執行OffsetFetch API
(a) 獲取kafka_protocol_test的consumer group消費狀態
  啟動一個console-consumerkafka_protocol_test topic消費messages。需要指定一個特定的group.id引數,如下所示,使用預設的consumer.properties配置檔案即可。

bin/kafka-console-consumer.sh --zookeeper kafka001:2181 --topic kafka_protocol_test --from-beginning --consumer.config ./config/consumer.properties

  執行後,將其停止,檢視當前console-consumer的消費狀態

[[email protected] kafka]$  bin/kafka-consumer-offset-checker.sh --zookeeper kafka001:2181 --topic kafka_protocol_test --group test-consumer-group
Group           Topic                          Pid Offset          logSize         Lag             Owner
test-consumer-group kafka_protocol_test            0   9000            9000            0               none
test-consumer-group kafka_protocol_test            1   26886           598134          571248          none
test-consumer-group kafka_protocol_test            2   0               0               0               none
test-consumer-group kafka_protocol_test            3   18296           91000           72704           none

(b) 執行OffsetFetch程式碼,檢視執行結果

  執行時仍然傳入test-consumer-group,執行結果如下

Topic: kafka_protocol_test, Partition: 0
    Offset: 9000
Topic: kafka_protocol_test, Partition: 1
    Offset: 26886
Topic: kafka_protocol_test, Partition: 2
    Offset: 0
Topic: kafka_protocol_test, Partition: 3
    Offset: 18296

  對比後發現,兩個offset資訊保持一致。

(2)執行OffsetCommit API
  在這裡,將OffsetFetch獲取到的每個TopicAndPartition對應的Offset10,如果加10後超過其最大Offset,則取最大Offset
  在Commit前後,兩次呼叫OffsetFetch API的程式碼,前後執行結果如下,
更新前的offset

Topic: kafka_protocol_test, Partition: 0
    Offset: 9000
Topic: kafka_protocol_test, Partition: 1
    Offset: 26886
Topic: kafka_protocol_test, Partition: 2
    Offset: 0
Topic: kafka_protocol_test, Partition: 3
    Offset: 18296
更新後的offset:(partition 0和partition 2沒有變化是由於加10後超過了該partition的offset範圍最大值)
Topic: kafka_protocol_test, Partition: 0
    Offset: 9000
Topic: kafka_protocol_test, Partition: 1
    Offset: 26896
Topic: kafka_protocol_test, Partition: 2
    Offset: 0
Topic: kafka_protocol_test, Partition: 3
    Offset: 18306

(3)執行Group Coordinator API
  傳入一個consumer group後,檢視其執行結果

Comsuner Group : test-consumer-group, coordinator broker is:
    id: 1, host: kafka001, port: 9092

六、Group Membership API

  這個API從Kafka-0.9.0.0版本開始出現。
  在0.9以前的client api中,consumer是要依賴Zookeeper的。因為同一個consumer group中的所有consumer需要進行協同,進行下面所講的rebalance。但是因為zookeeper的“herd”與“split brain”,導致一個group裡面,不同的consumer擁有了同一個partition,進而會引起訊息的消費錯亂。為此,在0.9中,不再用zookeeper,而是Kafka叢集本身來進行consumer之間的同步。下面引自kafka設計的原文:
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+Rewrite+Design#Kafka0.9ConsumerRewriteDesign-Failuredetectionprotocol

七、Administrative API

  注意,這個API也是從Kafka-0.9之後的client版本中才提供。通過這個API可以對Kafka叢集進行一些管理方面的操作,比如獲取所有的Consumer Groups資訊。想要獲取叢集中所有Consumer Groups資訊,需要傳送一個ListGroupRequest請求到所有的Brokers節點。
  還可以通過傳送一個DescribeGroupsRequest型別的請求物件,獲取對特定Consumer Group的描述。

  在Kafka-0.9之後的client中,提供了一個kafka.admin.AdminClient類,呼叫createSimplePlaintext方法,傳入一個broker list字val client = AdminClient.createSimplePlaintext(“kafka001:9092,kafka002:9092,kafka003:9092,kafka004:9092”)AdminClient`提供了很多方法,比如

def findCoordinator(groupId: String): Node
def findAllBrokers(): List[Node]
def listAllGroups(): Map[Node, List[GroupOverview]]
def listAllConsumerGroups(): Map[Node, List[GroupOverview]]

  等等。

相關推薦

Kafka系列-Kafka Protocol例項分析

  本文基於A Guide To The Kafka Protocol文件,以及Spark Streaming中實現的org.apache.spark.streaming.kafka.KafkaCluster類。整理出Kafka中有關 Metadata AP

Kafka系列-Kafka監控工具KafkaOffsetMonitor配置及使用

  KafkaOffsetMonitor是一個可以用於監控Kafka的Topic及Consumer消費狀況的工具,其配置和使用特別的方便。源專案Github地址為:https://github.com/quantifind/KafkaOffsetMonitor。

apache kafka系列kafka.common.ConsumerRebalanceFailedException異常解決辦法

kafka.common.ConsumerRebalanceFailedException :log-push-record-consumer-group_mobile-pushremind02.lf

apache kafka系列原始碼分析走讀-kafka內部模組分析

apache kafka中國社群QQ群:162272557 kafka整體結構分析: kafka原始碼工程目錄結構如下圖: 下面只對core目錄結構作說明,其他都是測試類或java客戶端程式碼 admin   --管理員模組,操作和管理topic,parit

apache kafka系列效能測試報告(虛擬機器版)

測試方法 在其他虛擬機器上使用 Kafka 自帶 kafka-producer-perf-test.sh 指令碼進行測試 Kafka 寫入效能 嘗試使用 kafka-simple-consumer-p

kafka系列broker重點配置解析(三)

kafka broker 配置 對應kafka安裝目錄config/server.properties檔案的配置 broker.id 每一個Kafka的broker都有一個整數的標識。我們設定broker.id來標識它。預設這個整數是0。這

kafka系列初步認識(零)

Kafka是什麼 釋出/訂閱訊息中介軟體 也被稱為分散式流平臺 Kafka的誕生 最初是為了解決LinkedIn資料通道問題,最後捐獻給了Apache,是Apache的頂級專案。 Kafka適合的場景 使用者行為跟蹤,可

Kafka系列-自定義Producer

  前面已經講到了,在Kafka中,Message是由Producer產生的,Producer產生的Message會發送到Topic的指定Partition中。Producer可以有多種形式,也可以由使用者通過Java,C以及Python語言來自定義。   K

移動APP持續交付系列雲構建價值分析

現狀企業A:小APP,Android、iOS研發各三人;大APP,Android、iOS研發各10人。CICD通過部署一臺jenkins伺服器 + 一臺Android 構建機器來完成Android的APP構建,iOS構建完全依賴開發本地環境。企業B:3個APP5個研發同學,5臺伺服器年成本一萬左右 ,程式碼規

給老闆減刑系列hadoop 安全缺陷分析之一:kerberos 的缺陷

近一年來從事金融資料安全架構方面工作,對大資料平臺安全重要性有了一些新的思考。最近看了Steve Loughran先生寫的本書《Hadoop and Kerberos: The Madness Beyond the Gate》,寫作風格幽默風趣,但是國內對大資料平臺的安全考慮的文

Spring Security系列核心過濾器原始碼分析(四)

文章來源 前面的部分,我們關注了Spring Security是如何完成認證工作的,但是另外一部分核心的內容:過濾器,一直沒有提到,我們已經知道Spring Security使用了springSecurityFillterChian作為了安全過濾的入口,這一節主要分析一下這個過濾器鏈都包含了哪

玩轉大資料系列二:資料分析與處理

經過了資料採集和同步之後,就可以在阿里雲上進行資料分析和處理,來玩轉您的資料了。本文向您介紹在阿里雲大資料各產品中,以及各產品之間怎樣來完成您的資料處理和資料分析。 MaxCompute 基於MaxCompute的大資料計算(MaxCompute + RDS) 使用MaxCompute分析IP

Apache Kafka系列(五) Kafka Connect及FileConnector示例

一. Kafka Connect簡介   Kafka是一個使用越來越廣的訊息系統,尤其是在大資料開發中(實時資料處理和分析)。為何整合其他系統和解耦應用,經常使用Producer來發送訊息到Broker,並使用Consumer來消費Broker中的訊息。Kafka Connect是到0.9版本才提供的並極大

一看就懂系列 Mysql主從延遲分析

前言 在進入正題之前,需要先明白一個東西,DML和DDL是什麼東西? DDL 資料定義語言,create 、alter 、drop、truncate、create、drop 當執行DDL語句時,在每一條語句前後,都將提交當前的事務。如果使用者使用

iOS開發系列iOS SDK例項教程

(原文地址) iOS SDK是開發iPhone和iPad 應用程式過程中必不可少的軟體開發包,提供了從建立程式,到編譯,除錯,執行,測試等一些列開發過程中所需要的工具,也提供了許多豐富的框架和相關API,供開發者在寫程式的時候使用。 iOS SDK內容眾多,一篇

java基礎系列ConcurrentHashMap源碼分析(基於jdk1.8)

threshold 主存 類比 tile num method 過程 參數 nsf 1、前提   在閱讀這篇博客之前,希望你對HashMap已經是有所理解的;另外你對java的cas操作也是有一定了解的,因為在這個類中大量使用到了cas相關的操作來保證線程安全的。   

Apache Kafka學習Kafka基本原理

       Kafka是一個使用Scala編寫的訊息系統,原本開發自LinkedIn,用作LinkedIn的活動流(Activity Stream)和運營資料處理管道(Pipeline)的基礎。現在它已被多家不同型別的公司作為多種型別的資料管道和訊息系統使用。

asyncio系列抽絲剝繭分析事件排程的核心原理

先來看一下一個簡單的例子 例1: async def foo(): print('enter foo ...') await bar() print('exit foo ...') async def bar(): print('ent

Java入門系列集合HashMap原始碼分析(十四)

前言 我們知道在Java 8中對於HashMap引入了紅黑樹從而提高操作效能,由於在上一節我們已經通過圖解方式分析了紅黑樹原理,所以在接下來我們將更多精力投入到解析原理而不是演算法本身,HashMap在Java中是使用比較頻繁的鍵值對資料型別,所以我們非常有必要詳細去分析背後的具體實現原理,無論是C#還是J

JVM系列:從彙編角度分析Volatile

[toc] # 簡介 Volatile關鍵字對熟悉java多執行緒的朋友來說,應該很熟悉了。Volatile是JMM(Java Memory Model)的一個非常重要的關鍵詞。通過是用Volatile可以實現禁止重排序和變數值執行緒之間可見兩個主要特性。 今天我們從彙編的角度來分析一下Volatile