簡單Storm消費Kafka資料並存儲到redis例項(訂單資訊處理)
阿新 • • 發佈:2019-01-04
maven依賴
<dependencies>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>0.9.5</version>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-kafka</artifactId>
<version>0.9.5</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId >
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.clojure</groupId >
<artifactId>clojure</artifactId>
<version>1.5.1</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.8.2</artifactId>
<version>0.8.1</version>
<exclusions>
<exclusion>
<artifactId>jmxtools</artifactId>
<groupId>com.sun.jdmk</groupId>
</exclusion>
<exclusion>
<artifactId>jmxri</artifactId>
<groupId>com.sun.jmx</groupId>
</exclusion>
<exclusion>
<artifactId>jms</artifactId>
<groupId>javax.jms</groupId>
</exclusion>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.4</version>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.7.3</version>
</dependency>
</dependencies>
訂單資訊類:OrderInfo.java
package com.xt.kafka2storm.order;
import com.google.gson.Gson;
import java.io.Serializable;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Random;
import java.util.UUID;
/**
* Describe: 簡單的支付資訊,實際業務更復雜,比如父子訂單,一個訂單多個商品,一個支付訂單多個訂單等情況。
*/
public class OrderInfo implements Serializable {
private String orderId;//訂單編號
private Date createOrderTime;//訂單建立時間
private String paymentId;//支付編號
private Date paymentTime;//支付時間
private String productId;//商品編號
private String productName;//商品名稱
private long productPrice;//商品價格
private long promotionPrice;//促銷價格
private String shopId;//商鋪編號
private String shopName;//商鋪名稱
private String shopMobile;//商品電話
private long payPrice;//訂單支付價格
private int num;//訂單數量
public OrderInfo() {
}
public OrderInfo(String orderId, Date createOrderTime, String paymentId, Date paymentTime, String productId, String productName, long productPrice, long promotionPrice, String shopId, String shopName, String shopMobile, long payPrice, int num) {
this.orderId = orderId;
this.createOrderTime = createOrderTime;
this.paymentId = paymentId;
this.paymentTime = paymentTime;
this.productId = productId;
this.productName = productName;
this.productPrice = productPrice;
this.promotionPrice = promotionPrice;
this.shopId = shopId;
this.shopName = shopName;
this.shopMobile = shopMobile;
this.payPrice = payPrice;
this.num = num;
}
public String getOrderId() {
return orderId;
}
public void setOrderId(String orderId) {
this.orderId = orderId;
}
public Date getCreateOrderTime() {
return createOrderTime;
}
public void setCreateOrderTime(Date createOrderTime) {
this.createOrderTime = createOrderTime;
}
public String getPaymentId() {
return paymentId;
}
public void setPaymentId(String paymentId) {
this.paymentId = paymentId;
}
public Date getPaymentTime() {
return paymentTime;
}
public void setPaymentTime(Date paymentTime) {
this.paymentTime = paymentTime;
}
public String getProductId() {
return productId;
}
public void setProductId(String productId) {
this.productId = productId;
}
public String getProductName() {
return productName;
}
public void setProductName(String productName) {
this.productName = productName;
}
public long getProductPrice() {
return productPrice;
}
public void setProductPrice(long productPrice) {
this.productPrice = productPrice;
}
public long getPromotionPrice() {
return promotionPrice;
}
public void setPromotionPrice(long promotionPrice) {
this.promotionPrice = promotionPrice;
}
public String getShopId() {
return shopId;
}
public void setShopId(String shopId) {
this.shopId = shopId;
}
public String getShopName() {
return shopName;
}
public void setShopName(String shopName) {
this.shopName = shopName;
}
public String getShopMobile() {
return shopMobile;
}
public void setShopMobile(String shopMobile) {
this.shopMobile = shopMobile;
}
public long getPayPrice() {
return payPrice;
}
public void setPayPrice(long payPrice) {
this.payPrice = payPrice;
}
public int getNum() {
return num;
}
public void setNum(int num) {
this.num = num;
}
@Override
public String toString() {
return "PaymentInfo{" +
"orderId='" + orderId + '\'' +
", createOrderTime=" + createOrderTime +
", paymentId='" + paymentId + '\'' +
", paymentTime=" + paymentTime +
", productId='" + productId + '\'' +
", productName='" + productName + '\'' +
", productPrice=" + productPrice +
", promotionPrice=" + promotionPrice +
", shopId='" + shopId + '\'' +
", shopName='" + shopName + '\'' +
", shopMobile='" + shopMobile + '\'' +
", payPrice=" + payPrice +
", num=" + num +
'}';
}
public String random() {
this.productId = "12121212";
this.orderId = UUID.randomUUID().toString().replaceAll("-", "");
this.paymentId = UUID.randomUUID().toString().replaceAll("-", "");
this.productPrice = new Random().nextInt(1000);
this.promotionPrice = new Random().nextInt(500);
this.payPrice = new Random().nextInt(480);
String date = "2015-11-11 12:22:12";
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
try {
this.createOrderTime = simpleDateFormat.parse(date);
} catch (ParseException e) {
e.printStackTrace();
}
return new Gson().toJson(this);
}
}
Kafka生產模擬訂單資訊:OrderMqSender.java
package com.xt.kafka2storm.order;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import java.util.Properties;
public class OrderMqSender {
public static void main(String[] args) {
String TOPIC = "orderMq";
Properties props = new Properties();
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("metadata.broker.list", "hadoop1:9092,hadoop2:9092,hadoop3:9092");
props.put("request.required.acks", "1");
props.put("partitioner.class", "kafka.producer.DefaultPartitioner");
Producer<String, String> producer = new Producer<String, String>(new ProducerConfig(props));
for (int messageNo = 1; messageNo < 100000; messageNo++) {
producer.send(new KeyedMessage<String, String>(TOPIC, messageNo + "",new OrderInfo().random() ));
// try {
// Thread.sleep(100);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
}
}
}
Storm處理從kafka獲取的資料Bolt程式碼:ParserOrderMqBolt.java
package com.xt.kafka2storm;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;
import com.google.gson.Gson;
import com.xt.kafka2storm.order.OrderInfo;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
import java.util.HashMap;
import java.util.Map;
public class ParserOrderMqBolt extends BaseRichBolt {
private JedisPool pool;
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
//change "maxActive" -> "maxTotal" and "maxWait" -> "maxWaitMillis" in all examples
JedisPoolConfig config = new JedisPoolConfig();
//控制一個pool最多有多少個狀態為idle(空閒的)的jedis例項。
config.setMaxIdle(5);
//控制一個pool可分配多少個jedis例項,通過pool.getResource()來獲取;
//如果賦值為-1,則表示不限制;如果pool已經分配了maxActive個jedis例項,則此時pool的狀態為exhausted(耗盡)。
//在borrow一個jedis例項時,是否提前進行validate操作;如果為true,則得到的jedis例項均是可用的;
config.setMaxTotal(1000 * 100);
//表示當borrow(引入)一個jedis例項時,最大的等待時間,如果超過等待時間,則直接丟擲JedisConnectionException;
config.setMaxWaitMillis(30);
config.setTestOnBorrow(true);
config.setTestOnReturn(true);
/**
*如果你遇到 java.net.SocketTimeoutException: Read timed out exception的異常資訊
*請嘗試在構造JedisPool的時候設定自己的超時值. JedisPool預設的超時時間是2秒(單位毫秒)
*/
pool = new JedisPool(config, "127.0.0.1", 6379);
}
public void execute(Tuple input) {
Jedis jedis = pool.getResource();
//獲取kafkaSpout傳送過來的資料,是一個json
String string = new String((byte[]) input.getValue(0));
//解析json
OrderInfo orderInfo = (OrderInfo) new Gson().fromJson(string, OrderInfo.class);
//整個網站,各個業務線,各個品類,各個店鋪,各個品牌,每個商品
//獲取整個網站的金額統計指標
// String totalAmount = jedis.get("totalAmount");
jedis.incrBy("totalAmount",orderInfo.getProductPrice());
//獲取商品所屬業務線的指標資訊
String bid = getBubyProductId(orderInfo.getProductId(),"b");
// String bAmout = jedis.get(bid+"Amout");
jedis.incrBy(bid+"Amount",orderInfo.getProductPrice());
jedis.close();
}
private String getBubyProductId(String productId,String type) {
// key:value
//index:productID:info---->Map
// productId-----<各個業務線,各個品類,各個店鋪,各個品牌,每個商品>
Map<String,String> map = new HashMap<String,String>();
map.put("b","3c");
map.put("c","phone");
map.put("s","121");
map.put("p","iphone");
return map.get(type);
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
}
Storm啟動入口,結合KafkaSpout消費資料:KafkaAndStormTopologyMain.java
package com.xt.kafka2storm;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.topology.TopologyBuilder;
import storm.kafka.KafkaSpout;
import storm.kafka.SpoutConfig;
import storm.kafka.ZkHosts;
/**
* Created by XT on 2018/3/31.
*/
public class KafkaAndStormTopologyMain {
public static void main(String[] args) throws Exception {
TopologyBuilder tb = new TopologyBuilder();
tb.setSpout("kafkaSpout",new KafkaSpout(new SpoutConfig(new ZkHosts("hadoop1:2181,hadoop2:2181,hadoop3:2181"),"orderMq","/myKafka","kafkaSpout")),1);
tb.setBolt("bolt1",new MyBolt1(),1).shuffleGrouping("kafkaSpout");
Config config = new Config();
config.setNumWorkers(1);
//提交任務
if (args.length > 0){
//叢集執行
StormSubmitter.submitTopology(args[0],config,tb.createTopology());
}else {
//本地執行
LocalCluster localCluster = new LocalCluster();
localCluster.submitTopology("storm2kafka",config,tb.createTopology());
}
}
}