kafka 0.9.0.0 部分配置詳解
4. 使用Kafka的Producer API來完成訊息的推送
1) Kafka 0.9.0.1的java client依賴:
Xml程式碼- <dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka-clients</artifactId>
- <version>0.9.0.1</version>
- </dependency>
2) 寫一個KafkaUtil工具類,用於構造Kafka Client
Java程式碼- publicclass KafkaUtil {
- privatestatic KafkaProducer<String, String> kp;
- publicstatic KafkaProducer<String, String> getProducer() {
- if (kp == null) {
- Properties props = new Properties();
-
props.put("bootstrap.servers", "10.0.0.100:9092,10.0.0.101:9092"
- props.put("acks", "1");
- props.put("retries", 0);
- props.put("batch.size", 16384);
- props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
-
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"
- kp = new KafkaProducer<String, String>(props);
- }
- return kp;
- }
- }
KafkaProducer<K,V>的K代表每條訊息的key型別,V代表訊息型別。訊息的key用於決定此條訊息由哪一個partition接收,所以我們需要保證每條訊息的key是不同的。
Producer端的常用配置
- bootstrap.servers:Kafka叢集連線串,可以由多個host:port組成
- acks:broker訊息確認的模式,有三種:
0:不進行訊息接收確認,即Client端傳送完成後不會等待Broker的確認
1:由Leader確認,Leader接收到訊息後會立即返回確認資訊
all:叢集完整確認,Leader會等待所有in-sync的follower節點都確認收到訊息後,再返回確認資訊
我們可以根據訊息的重要程度,設定不同的確認模式。預設為1 - retries:傳送失敗時Producer端的重試次數,預設為0
- batch.size:當同時有大量訊息要向同一個分割槽傳送時,Producer端會將訊息打包後進行批量傳送。如果設定為0,則每條訊息都獨立傳送。預設為16384位元組
- linger.ms:傳送訊息前等待的毫秒數,與batch.size配合使用。在訊息負載不高的情況下,配置linger.ms能夠讓Producer在傳送訊息前等待一定時間,以積累更多的訊息打包傳送,達到節省網路資源的目的。預設為0
- key.serializer/value.serializer:訊息key/value的序列器Class,根據key和value的型別決定
- buffer.memory:訊息緩衝池大小。尚未被髮送的訊息會儲存在Producer的記憶體中,如果訊息產生的速度大於訊息傳送的速度,那麼緩衝池滿後傳送訊息的請求會被阻塞。預設33554432位元組(32MB)
更多的Producer配置見官網:http://kafka.apache.org/documentation.html#producerconfigs
3) 寫一個簡單的Producer端,每隔1秒向Kafka叢集傳送一條訊息:
Java程式碼- publicclass KafkaTest {
- publicstaticvoid main(String[] args) throws Exception{
- Producer<String, String> producer = KafkaUtil.getProducer();
- int i = 0;
- while(true) {
- ProducerRecord<String, String> record = new ProducerRecord<String, String>("test", String.valueOf(i), "this is message"+i);
- producer.send(record, new Callback() {
- publicvoid onCompletion(RecordMetadata metadata, Exception e) {
- if (e != null)
- e.printStackTrace();
- System.out.println("message send to partition " + metadata.partition() + ", offset: " + metadata.offset());
- }
- });
- i++;
- Thread.sleep(1000);
- }
- }
- }
在呼叫KafkaProducer的send方法時,可以註冊一個回撥方法,在Producer端完成傳送後會觸發回撥邏輯,在回撥方法的 metadata物件中,我們能夠獲取到已傳送訊息的offset和落在的分割槽等資訊。注意,如果acks配置為0,依然會觸發回撥邏輯,只是拿不到 offset和訊息落地的分割槽資訊。
跑一下,輸出是這樣的:
message send to partition 0, offset: 28message send to partition 1, offset: 26
message send to partition 0, offset: 29
message send to partition 1, offset: 27
message send to partition 1, offset: 28
message send to partition 0, offset: 30
message send to partition 0, offset: 31
message send to partition 1, offset: 29
message send to partition 1, offset: 30
message send to partition 1, offset: 31
message send to partition 0, offset: 32
message send to partition 0, offset: 33
message send to partition 0, offset: 34
message send to partition 1, offset: 32
乍一看似乎offset亂掉了,但其實這是因為訊息分佈在了兩個分割槽上,每個分割槽上的offset其實是正確遞增的。
5. 使用Kafka的Consumer API來完成訊息的消費
1) 改造一下KafkaUtil類,加入Consumer client的構造。
Java程式碼- publicclass KafkaUtil {
- privatestatic KafkaProducer<String, String> kp;
- privatestatic KafkaConsumer<String, String> kc;
- publicstatic KafkaProducer<String, String> getProducer() {
- if (kp == null) {
- Properties props = new Properties();
- props.put("bootstrap.servers", "10.0.0.100:9092,10.0.0.101:9092");
- props.put("acks", "1");
- props.put("retries", 0);
- props.put("batch.size", 16384);
- props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- kp = new KafkaProducer<String, String>(props);
- }
- return kp;
- }
- publicstatic KafkaConsumer<String, String> getConsumer() {
- if(kc == null) {
- Properties props = new Properties();
- props.put("bootstrap.servers", "10.0.0.100:9092,10.0.0.101:9092");
- props.put("group.id", "1");
- props.put("enable.auto.commit", "true");
- props.put("auto.commit.interval.ms", "1000");
- props.put("session.timeout.ms", "30000");
- props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- kc = new KafkaConsumer<String, String>(props);
- }
- return kc;
- }
- }
同樣,我們介紹一下Consumer常用配置
- bootstrap.servers/key.deserializer/value.deserializer:和Producer端的含義一樣,不再贅述
- fetch.min.bytes:每次最小拉取的訊息大小(byte)。Consumer會等待訊息積累到一定尺寸後進行批量拉取。預設為1,代表有一條就拉一條
- max.partition.fetch.bytes:每次從單個分割槽中拉取的訊息最大尺寸(byte),預設為1M
- group.id:Consumer的group id,同一個group下的多個Consumer不會拉取到重複的訊息,不同group下的Consumer則會保證拉取到每一條訊息。注意,同一個group下的consumer數量不能超過分割槽數。
- enable.auto.commit:是否自動提交已拉取訊息的offset。提交offset即視為該訊息已經成功被消費,該組下的Consumer無法再拉取到該訊息(除非手動修改offset)。預設為true
- auto.commit.interval.ms:自動提交offset的間隔毫秒數,預設5000。
全部的Consumer配置見官方文件:http://kafka.apache.org/documentation.html#newconsumerconfigs
2) 編寫Consumer端:
Java程式碼- publicclass KafkaTest {
- publicstaticvoid main(String[] args) throws Exception{
- KafkaConsumer<String, String> consumer = KafkaUtil.getConsumer();
- consumer.subscribe(Arrays.asList("test"));
- while(true) {
- ConsumerRecords<String, String> records = consumer.poll(1000);
- for(ConsumerRecord<String, String> record : records) {
- System.out.println("fetched from partition " + record.partition() + ", offset: " + record.offset() + ", message: " + record.value());
- }
- }
- }
- }
執行輸出:
fetched from partition 0, offset: 28, message: this is message0fetched from partition 0, offset: 29, message: this is message2
fetched from partition 0, offset: 30, message: this is message5
fetched from partition 0, offset: 31, message: this is message6
fetched from partition 0, offset: 32, message: this is message10
fetched from partition 0, offset: 33, message: this is message11
fetched from partition 0, offset: 34, message: this is message12
fetched from partition 1, offset: 26, message: this is message1
fetched from partition 1, offset: 27, message: this is message3
fetched from partition 1, offset: 28, message: this is message4
fetched from partition 1, offset: 29, message: this is message7
fetched from partition 1, offset: 30, message: this is message8
fetched from partition 1, offset: 31, message: this is message9
fetched from partition 1, offset: 32, message: this is message13
說明:
- KafkaConsumer的poll方法即是從Broker拉取訊息,在poll之前首先要用subscribe方法訂閱一個Topic。
- poll方法的入參是拉取超時毫秒數,如果沒有新的訊息可供拉取,consumer會等待指定的毫秒數,到達超時時間後會直接返回一個空的結果集。
- 如 果Topic有多個partition,KafkaConsumer會在多個partition間以輪詢方式實現負載均衡。如果啟動了多個 Consumer執行緒,Kafka也能夠通過zookeeper實現多個Consumer間的排程,保證同一組下的Consumer不會重複消費訊息。注 意,Consumer數量不能超過partition數,超出部分的Consumer無法拉取到任何資料。
- 可以看出,拉取到的訊息並不是完全順序化的,kafka只能保證一個partition內的訊息先進先出,所以在跨partition的情況下,訊息的順序是沒有保證的。
- 本 例中採用的是自動提交offset,Kafka client會啟動一個執行緒定期將offset提交至broker。假設在自動提交的間隔內發生故障(比如整個JVM程序死掉),那麼有一部分訊息是會被 重複消費的。要避免這一問題,可使用手動提交offset的方式。構造consumer時將enable.auto.commit設為false,並在代 碼中用consumer.commitSync()來手動提交。
如果不想讓kafka控制consumer拉取資料時在partition間的負載均衡,也可以手工控制:
Java程式碼- publicstaticvoid main(String[] args) throws Exception{
- KafkaConsumer<String, String> consumer = KafkaUtil.getConsumer();
- String topic = "test";
- TopicPartition partition0 = new TopicPartition(topic, 0);
- TopicPartition partition1 = new TopicPartition(topic, 1);
- consumer.assign(Arrays.asList(partition0, partition1));
- while(true) {
- ConsumerRecords<String, String> records = consumer.poll(100);
- for(ConsumerRecord<String, String> record : records) {
- System.out.println("fetched from partition " + record.partition() + ", offset: " + record.offset() + ", message: " + record.value());
- }
- consumer.commitSync();
- }
- }
使用consumer.assign()方法為consumer執行緒指定1個或多個partition。
此處的坑:
在測試中我發現,如果用手工指定partition的方法拉取訊息,不知為何kafka的自動提交offset機制會失效,必須使用手動方式才能正確提交已消費的訊息offset。題外話:
在 真正的應用環境中,Consumer端將訊息拉取下來後要做的肯定不止是輸出出來這麼簡單,在消費訊息時很有可能需要花掉更多的時間。1個 Consumer執行緒消費訊息的速度很有可能是趕不上Producer產生訊息的速度,所以我們不得不考慮Consumer端採用多執行緒模型來消費訊息。然而KafkaConsumer並不是執行緒安全的,多個執行緒操作同一個KafkaConsumer例項會出現各種問題,Kafka官方對於Consumer端的多執行緒處理給出的指導建議如下:
1. 每個執行緒都持有一個KafkaConsumer物件
好處:
- 實現簡單
- 不需要執行緒間的協作,效率最高
- 最容易實現每個Partition內訊息的順序處理
弊端:
- 每個KafkaConsumer都要與叢集保持一個TCP連線
- 執行緒數不能超過Partition數
- 每一batch拉取的資料量會變小,對吞吐量有一定影響
2. 解耦,1個Consumer執行緒負責拉取訊息,數個Worker執行緒負責消費訊息
好處:
- 可自由控制Worker執行緒的數量,不受Partition數量限制
弊端:
- 訊息消費的順序無法保證
- 難以控制手動提交offset的時機
個人認為第二種方式更加可取,consumer數不能超過partition數這個限制是很要命的,不可能為了提高Consumer消費訊息的效率而把Topic分成更多的partition,partition越多,叢集的高可用性就越低。
6. Kafka叢集高可用性測試
1) 檢視當前Topic的狀態:
Shell程式碼- /kafka-topics.sh --describe --zookeeper 10.0.0.100:2181,10.0.0.101:2181,10.0.0.102:2181 --topic test
輸出:
Topic:test PartitionCount:2 ReplicationFactor:2 Configs:Topic: test Partition: 0 Leader: 1 Replicas: 1,0 Isr: 0,1
Topic: test Partition: 1 Leader: 0 Replicas: 0,1 Isr: 0,1
可以看到,partition0的leader是broker1,parition1的leader是broker0
2) 啟動Producer向Kafka叢集傳送訊息
輸出:
message send to partition 0, offset: 35message send to partition 1, offset: 33
message send to partition 0, offset: 36
message send to partition 1, offset: 34
message send to partition 1, offset: 35
message send to partition 0, offset: 37
message send to partition 0, offset: 38
message send to partition 1, offset: 36
message send to partition 1, offset: 37
3) 登入SSH將broker0,也就是partition 1的leader kill掉
再次檢視Topic狀態:
Topic:test PartitionCount:2 ReplicationFactor:2 Configs:Topic: test Partition: 0 Leader: 1 Replicas: 1,0 Isr: 1
Topic: test Partition: 1 Leader: 1 Replicas: 0,1 Isr: 1
可以看到,當前parition0和parition1的leader都是broker1了
此時再去看Producer的輸出:
[kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.network.Selector - Connection with /10.0.0.100 disconnectedjava.net.ConnectException: Connection refused: no further information
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739)
at org.apache.kafka.common.network.PlaintextTransportLayer.finishConnect(PlaintextTransportLayer.java:54)
at org.apache.kafka.common.network.KafkaChannel.finishConnect(KafkaChannel.java:72)
at org.apache.kafka.common.network.Selector.poll(Selector.java:274)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:256)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:216)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:128)
at java.lang.Thread.run(Thread.java:745)
[kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.Metadata - Updated cluster metadata version 7 to Cluster(nodes = [Node(1, 10.0.0.101, 9092)], partitions = [Partition(topic = test, partition = 1, leader = 1, replicas = [1,], isr = [1,], Partition(topic = test, partition = 0, leader = 1, replicas = [1,], isr = [1,]])
能看到Producer端的DEBUG日誌顯示與broker0的連結斷開了,此時Kafka立刻開始更新叢集metadata,更新後的metadata表示broker1現在是兩個partition的leader,Producer程序很快就恢復繼續執行,沒有漏發任何訊息,能夠看出Kafka叢集的故障切換機制還是很厲害的
4) 我們再把broker0啟動起來
Shell程式碼- bin/kafka-server-start.sh -daemon config/server.properties
然後再次檢查Topic狀態:
Topic:test PartitionCount:2 ReplicationFactor:2 Configs:Topic: test Partition: 0 Leader: 1 Replicas: 1,0 Isr: 1,0
Topic: test Partition: 1 Leader: 1 Replicas: 0,1 Isr: 1,0
我們看到,broker0啟動起來了,並且已經是in-sync狀態(注意Isr從1變成了1,0),但此時兩個partition的leader還都是 broker1,也就是說當前broker1會承載所有的傳送和拉取請求。這顯然是不行的,我們要讓叢集恢復到負載均衡的狀態。
這時候,需要使用Kafka的選舉工具觸發一次選舉:
Shell程式碼- bin/kafka-preferred-replica-election.sh --zookeeper 10.0.0.100:2181,10.0.0.101:2181,10.0.0.102:2181
選舉完成後,再次檢視Topic狀態:
Topic:test PartitionCount:2 ReplicationFactor:2 Configs:Topic: test Partition: 0 Leader: 1 Replicas: 1,0 Isr: 1,0
Topic: test Partition: 1 Leader: 0 Replicas: 0,1 Isr: 1,0
可以看到,叢集重新回到了broker0掛掉之前的狀態
但此時,Producer端產生了異常:
org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is not the leader for that topic-partition.原因是Producer端在嘗試向broker1的parition0傳送訊息時,partition0的leader已經切換成了broker0,所以訊息傳送失敗。
此時用Consumer去消費訊息,會發現訊息的編號不連續了,確實漏發了一條訊息。這是因為我們在構造Producer時設定了retries=0,所以在傳送失敗時Producer端不會嘗試重發。
將retries改為3後再次嘗試,會發現leader切換時再次發生了同樣的問題,但Producer的重發機制起了作用,訊息重發成功,啟動Consumer端檢查也證實了所有訊息都發送成功了。
每 次叢集單點發生故障恢復後,都需要進行重新選舉才能徹底恢復叢集的leader分配,如果嫌每次這樣做很麻煩,可以在broker的配置檔案(即 server.properties)中配置auto.leader.rebalance.enable=true,這樣broker在啟動後就會自動進 行重新選舉至此,我們通過測試證實了叢集出現單點故障和恢復的過程中,Producer端能夠保持正確運轉。接下來我們看一下Consumer端的表現:
5) 同時啟動Producer程序和Consumer程序
此時Producer一邊在生產訊息,Consumer一邊在消費訊息
6) 把broker0幹掉,觀察Consumer端的輸出:
能看到,在broker0掛掉後,consumer也端產生了一系列INFO和WARN輸出,但同Producer端一樣,若干秒後自動恢復,訊息仍然是連續的,並未出現斷點。
7) 再次把broker0啟動,並觸發重新選舉,然後觀察輸出:
fetched from partition 0, offset: 418, message: this is message48fetched from partition 0, offset: 419, message: this is message49
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Offset commit for group 1 failed due to NOT_COORDINATOR_FOR_GROUP, will find new coordinator and retry
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Marking the coordinator 2147483646 dead.
[main] WARN org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Auto offset commit failed: This is not the correct coordinator for this group.
fetched from partition 1, offset: 392, message: this is message50
fetched from partition 0, offset: 420, message: this is message51
能看到,重選舉後Consumer端也輸出了一些日誌,意思是在提交offset時發現當前的排程器已經失效了,但很快就重新獲取了新的有效排程器,恢復 了offset的自動提交,驗證已提交offset的值也證明了offset提交併未因leader切換而發生錯誤。
如上,我們也通過測試證實了Kafka叢集出現單點故障時,Consumer端的功能正確性。
相關推薦
kafka 0.9.0.0 部分配置詳解
4. 使用Kafka的Producer API來完成訊息的推送 1) Kafka 0.9.0.1的java client依賴: Xml程式碼 <dependency> <groupId>org.apache.kafka</groupId>
mydumper 0.9.5 備份恢復命令詳解
軟體版本:mydumper 0.9.5 Mydumper命令: -B, --database 需要備份的資料庫 -T, --tables-list 備份的表,使用英文逗號分隔 -O, --omit-from-file 包含要
生產環境elasticsearch5.0.1叢集的部署配置詳解
線上環境elasticsearch5.0.1叢集的配置部署 es叢集的規劃: 硬體: 7臺8核、64G記憶體、2T ssd硬碟加1臺8核16G的阿里雲伺服器 其中一臺作為kibana+kafka連線查詢的伺服器 其他6臺都作為node和master兩種角色 作業系統:cen
Spring-Security整合CAS之Spring-Security.xml部分配置詳解
<?xml version="1.0" encoding="UTF-8"?> <beans:beans xmlns="http://www.springframework.org/schema/security" xmlns:beans="http://w
[Kafka] Apache Kafka 簡介、叢集搭建及配置詳解
前言 kafka是一種高吞吐量的分散式釋出訂閱訊息系統,它可以處理消費者規模的網站中的所有動作流資料。這種動作(網頁瀏覽,搜尋和其他使用者的行動)是在現代網路上的許多社會功能的一個關鍵因素。這些資料通常是由於吞吐量的要求而通過處理日誌和日誌聚合來解決。 Kafk
Django 2.0 新款URL配置詳解
Django2.0釋出後,很多人都擁抱變化,加入了2的行列。 但是和1.11相比,2.0在url的使用方面發生了很大的變化,下面介紹一下: 一、例項 先看一個例子: from django.urls import path from . import views urlpattern
0 httpd2.2配置詳解-Apache配置檔案詳解-(二)
httpd-2.2 15 curl命令 curl是基於URL語法在命令列方式下工作的檔案傳輸工具,它支援FTP, FTPS, HTTP, HTTPS, GOPHER, TELNET, DICT, FILE及LDAP等協議。curl支援HTTPS認證,並且支援HTTP的POST、PU
0 httpd2.2配置詳解-Apache配置文件詳解-(二)
切換 more 簡化 css 程序 ip地址 在服務器 filter utf httpd-2.2 15 curl命令 curl是基於URL語法在命令行方式下工作的文件傳輸工具,它支持FTP, FTPS, HTTP, HTTPS, GOPHER,
redis4.0.9 ubuntu 下安裝配置
Redis下載地址:https://redis.io/download選擇穩定版下載:如果沒有安裝gcc,需要先安裝:sudo apt-get install gcc完了之後,解壓redis 壓縮包sudo tar -zxvf redis-4.0.9.tar.gz進入到 re
spring security oauth2.0配置詳解
spring security oauth2.0 實現 目標 現在很多系統都支援第三方賬號密碼等登陸我們自己的系統,例如:我們經常會看到,一些系統使用微信賬號,微博賬號、QQ賬號等登陸自己的系統,我們現在就是要模擬這種登陸的方式,很多大的公司已經實現了這
struts2.0 配置檔案、常量配置詳解
通常struts2載入struts2常量的順序如下: struts-default.xml:該檔案儲存在struts2-core-2.0.6.jar檔案中。 struts-plugin.xml:該檔案儲存在struts2-Xxx-2.0.6.jar等Struts2外掛JAR檔案中。
訊息中介軟體kafka(0.9以及0.10版本)學習及實踐
目錄 一、介紹 二、特點 三、何時需要訊息佇列 四、元件 五、訊息傳送的流程 六、原理 七、實踐 八、版本比較
百度ECharts 3.0 多座標軸統計圖一般配置詳解(例項)
ECharts 是百度出品的jquery圖表外掛。相對於Chartist,擁有更加強大的功能,以及更加詳細的文件(ECharts的文件形式非常優秀,簡明易懂)。ECharts支援的圖表種類非常多,同時相容性也十分優良,故而在網站建設動態統計圖表時,是一個非常優秀
【筆記】webpack4.0教程_配置詳解_圖文_新手入門_快速上手
前言 這篇筆記會從node安裝開始,到搭建基礎的webpack環境為止。中間每一步的配置和遇到的術語、名詞,我都會做簡單的解釋。 同時,這篇博文是一篇記錄我學習webpack的過程筆記,因為我也在學習過程,如果有記述錯誤之處,歡迎指正。 【注:全程系統環境
Webpack 配置詳解(含 4)——從 0 配置一套開發模板
前言原始碼熟悉 webpack 與 webpack4 配置。webpack4 相對於 3 的最主要的區別是所謂的零配置,但是為了滿足我們的專案需求還是要自己進行配置,不過我們可以使用一些 webpack 的預設值。同時 webpack 也拆成了兩部分,webpack 和 we
Elasticsearch 2.2.0 叢集配置詳解
叢集分片配置 Elasticsearch master 節點的主要role就是決定每個shard分配到什麼節點,以及什麼時候在節點間遷移shard以reblance 叢集。有許多引數可以控制shard分配過程 叢集層面的分片分配引數分成3類:包括分片分配設定,分片平衡設
Linux中Nginx安裝與配置詳解(CentOS-6.5:nginx-1.5.0)
1 Nginx簡介Nginx ("engine x") 是一個高效能的 HTTP 和 反向代理 伺服器,也是一個 IMAP/POP3/SMTP 代理伺服器。 Nginx 是由 Igor Sysoev 為俄羅斯訪問量第二的 Rambler.ru 站點開發的,第一個公開版本0
JBOSS5.0 配置詳解(轉)
Jboss事務預設的時間很短,很多時候我們要修改,jboss4的網上也已經很帖子了,這裡不寫了, 下面給出jboss 5, jboss 6的設定事務的方法: 修改X:\jboss-6-CR1\server\default\deploy\mysql-ds.xml或者其它資料來源配置檔案,修改後如下:
win7,64位系統下iis6.0的安裝與配置詳解,解決“未發現數據源名稱並且未指定預設驅動程式”解決辦法
以下文章參考: 一、百度文庫 http://jingyan.baidu.com/article/219f4bf723bcb2de442d38ed.html 二、http://wsj781222.blog.163.com/blog/static/2684261201351
vue2.0 中#$emit,$on的使用詳解
額外 return turn isp div 傳遞 call sele 發的 vue1.0中 vm.$dispatch 和 vm.$broadcast 被棄用,改用$emit,$on 1. vm.$on( event, callback ) 監聽當前實例上的自定義事件。