1. 程式人生 > >Spark and Kafka

Spark and Kafka

修改 rate todo println bject delete trace ech 刪除!

package cn.itcast_02_zkclientapi;

import java.util.List;

import org.I0Itec.zkclient.DataUpdater;
import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import org.apache.zookeeper.CreateMode;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

public class TestZKclient {
private ZkClient zkClient = null;
/**
* 創建zookeeper連接
*/
@Before
public void connection() {
//zookeeper地址和超時時間
zkClient = new ZkClient("slave1:2181,slave2:2181,slave3:2181",2000);
}
/**
* 關閉zookeeper連接
*/
@After
public void close() {
try {
Thread.sleep(300);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
zkClient.close();
}
/**
* 創建節點
*/
@Test
public void testCreateZnode() {
zkClient.create("/zkClient", "abc", CreateMode.PERSISTENT);
}
/**
* 刪除節點
*/
@Test
public void testDeleteZnode() {
zkClient.delete("/zkClient0000000010");
}
/**
* 更新節點
*/
@Test
public void testUpdateZnode() {
zkClient.updateDataSerialized("/zkClient", new DataUpdater<String>() {
@Override
public String update(String currentData) {
//返回之前znode的data
System.out.println(currentData);
//設置新的data
return "bbbbbbb";
}
} );
}
/**
* 查詢創建時間
*/
@Test
public void testCreationTime() {
long creationTime = zkClient.getCreationTime("/test");
System.out.println(creationTime);
}
/**
* 查詢節點內容
*/
@Test
public void testGetData() {
String readData = zkClient.readData("/zkClient",true);
System.out.println(readData);
}
/**
* 查詢子節點
* 統計子節點個數
*/
@Test
public void testChild() {
int countChildren = zkClient.countChildren("/zkClient");
System.out.println("/test共有"+countChildren+"個子節點!");
List<String> children = zkClient.getChildren("/zkClient");
for (String string : children) {
System.out.println(string);
}
}
/**
* 註冊節點更改監聽
*/
@Test
public void testDataChangesListener() {
zkClient.subscribeDataChanges("/zkClient", new IZkDataListener() {
@Override
public void handleDataDeleted(String dataPath) throws Exception {
System.out.println("節點被刪除!");

}
@Override
public void handleDataChange(String dataPath, Object data) throws Exception {
System.out.println("節點被更改!");
}
});
for (int i = 0; i < 5; i++) {
zkClient.updateDataSerialized("/zkClient", new DataUpdater<String>() {
@Override
public String update(String currentData) {
return currentData;
}
} );
}
}
/**
* 註冊子節點改變監聽
* @throws InterruptedException
*/
@Test
public void testChildChangesListener() throws InterruptedException {
zkClient.subscribeChildChanges("/zkClient", new IZkChildListener() {
@Override
public void handleChildChange(String parentPath, List<String> currentChilds)
throws Exception {
System.out.println(parentPath+"子節點被修改!");
for (String string : currentChilds) {
System.out.println("現在為"+string);
}
}
});
for (int i = 0; i < 5; i++) {
zkClient.create("/zkClient/test", "abc", CreateMode.EPHEMERAL_SEQUENTIAL);
Thread.sleep(100);
}
}
}

Spark and Kafka