1. 程式人生 > >第114課加強版:SparkStreaming+Kafka+createDirectStream+KafkaOffsetMonitor解決內幕

第114課加強版:SparkStreaming+Kafka+createDirectStream+KafkaOffsetMonitor解決內幕

第114課加強版:SparkStreaming+Kafka+createDirectStream 

前傳:

114課程的高階版:

1. spark streaming 使用 direct方式直接讀取 kafka的資料,offset 沒有經過zookeeper。因此在KafkaOffsetMonitor中也監控不到資料 。

2. 我們通過sparkstreaming操作offset,然後kafkacluster將offset更新到zookeeper中。

3.將KafkaOffsetMonitor從master遷移到woker2上執行 

final AtomicReference offsetRanges = new AtomicReference();
		 adClickedStreaming.transformToPair( new Function, JavaPairRDD>() {
 
			private static final long serialVersionUID = 1L;

			@Override
			public JavaPairRDD call(JavaPairRDD rdd) throws Exception {
				   OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
				   offsetRanges.set(offsets);
			       return rdd;
			}
			 
		 }).foreachRDD( new Function, Void>() {
 
			private static final long serialVersionUID = 1L;

			@Override
			public Void call(JavaPairRDD rdd) throws Exception {
				KafkaCluster kafkaCluster= new KafkaCluster((scala.collection.immutable.Map) kafkaParameters);
			 
			 
				
				for (OffsetRange o : offsetRanges.get()) {
			
					TopicAndPartition topicAndPartition=new   TopicAndPartition("AdClicked",o.partition());
				     
				    Map offsetsmap = new HashMap();
				    offsetsmap.put(topicAndPartition, o.untilOffset());
				  			 
					kafkaCluster.setConsumerOffsets("groupId", (scala.collection.immutable.Map) offsetsmap);
				
			         System.out.println(
			      "IMF 114 topic:  " +    o.topic() + " IMF 114  partition : " + o.partition() + " IMF 114  fromOffset : " + o.fromOffset() + " IMF 114  untilOffset  : " + o.untilOffset()
			         );
			       }
				return null;
			}


遇到問題 

16/07/23 19:34:57 INFO scheduler.JobScheduler: Starting job streaming job 1469273510000 ms.1 from job set of time 1469273510000 ms
16/07/23 19:34:57 ERROR scheduler.JobScheduler: Error running job streaming job 1469273510000 ms.0
java.lang.ClassCastException: java.util.HashMap cannot be cast to scala.collection.immutable.Map
        at com.dt.spark.SparkApps.SparkStreaming.AdClickedStreamingStats$2.call(AdClickedStreamingStats.java:156)
        at com.dt.spark.SparkApps.SparkStreaming.AdClickedStreamingStats$2.call(AdClickedStreamingStats.java:1)
        at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:316)
        at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:316)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
        at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:49)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
        at scala.util.Try$.apply(Try.scala:161)
        at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
        at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:224)
        at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
        at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
        at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:223)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
Exception in thread "main" java.lang.ClassCastException: java.util.HashMap cannot be cast to scala.collection.immutable.Map
        at com.dt.spark.SparkApps.SparkStreaming.AdClickedStreamingStats$2.call(AdClickedStreamingStats.java:156)
        at com.dt.spark.SparkApps.SparkStreaming.AdClickedStreamingStats$2.call(AdClickedStreamingStats.java:1)
        at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:316)
        at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:316)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
        at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:49)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
        at scala.util.Try$.apply(Try.scala:161)
        at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
        at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:224)
        at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
        at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
        at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:223)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
16/07/23 19:34:57 INFO spark.SparkContext: Starting job: print at AdClickedStreamingStats.java:320

解決方法

scala.collection.immutable.Map scalaImmutableoffsetsmap= scala.collection.JavaConverters.mapAsScalaMapConverter(offsetsmap).asScala().toMap(scala.Predef.conforms());

 原始碼如下

/**
		 * 建立Kafka元資料,來讓Spark Streaming這個Kafka Consumer利用
		 */
		Map kafkaParameters = new HashMap();
		kafkaParameters.put("metadata.broker.list", "Master:9092,Worker1:9092,Worker2:9092");

		Set topics = new HashSet();
		topics.add("AdClicked");

		JavaPairInputDStream adClickedStreaming = KafkaUtils.createDirectStream(jsc, String.class,
				String.class, StringDecoder.class, StringDecoder.class, kafkaParameters, topics);
		
		
		//=====================================================//
		//=====================================================//
	
  scala.collection.immutable.Map scalaImmutablekafkaParameters= scala.collection.JavaConverters.mapAsScalaMapConverter(kafkaParameters).asScala().toMap(scala.Predef.conforms());
	
		
		final AtomicReference offsetRanges = new AtomicReference();
		 adClickedStreaming.transformToPair( new Function, JavaPairRDD>() {
 
			private static final long serialVersionUID = 1L;

			@Override
			public JavaPairRDD call(JavaPairRDD rdd) throws Exception {
				   OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
				   offsetRanges.set(offsets);
			       return rdd;
			}
			 
		 }).foreachRDD( new Function, Void>() {
 
			private static final long serialVersionUID = 1L;

			@Override
			public Void call(JavaPairRDD rdd) throws Exception {
				KafkaCluster kafkaCluster= new KafkaCluster( scalaImmutablekafkaParameters);
			 
			 
				
				for (OffsetRange o : offsetRanges.get()) {
		
			
					TopicAndPartition topicAndPartition=new   TopicAndPartition("AdClicked",o.partition());
			
				    
				    Map offsetsmap = new HashMap();
				    offsetsmap.put(topicAndPartition, o.untilOffset());
				    
				    scala.collection.immutable.Map scalaImmutableoffsetsmap= scala.collection.JavaConverters.mapAsScalaMapConverter(offsetsmap).asScala().toMap(scala.Predef.conforms());
				  			    
							 
					kafkaCluster.setConsumerOffsets("IMF-20160723-2-114-Kafka-groupId",  scalaImmutableoffsetsmap);
				
			         System.out.println(
			      "IMF 114 topic:  " +    o.topic() + " IMF 114  partition : " + o.partition() + " IMF 114  fromOffset : " + o.fromOffset() + " IMF 114  untilOffset  : " + o.untilOffset()
			         );
			       }
				return null;
			}

		 
			 
			 
			 
		 });
		 
	 
		
		
		

將KafkaOffsetMonitor從master遷移到woker2上執行 

[email protected]:/usr/local# scp   -rq /usr/local/kafka_monitor  [email protected]:/usr/local/kafka_monitor


[email protected]:/usr/local# 

[email protected]:/usr/local/kafka_monitor# IMFKafkaOffsetMon.sh
serving resources from: jar:file:/usr/local/kafka_monitor/KafkaOffsetMonitor-assembly-0.2.0.jar!/offsetapp
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
2016-07-23 21:33:28.204:INFO:oejs.Server:jetty-7.x.y-SNAPSHOT
log4j:WARN No appenders could be found for logger (org.I0Itec.zkclient.ZkConnection).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
log4j:WARN No appenders could be found for logger (org.I0Itec.zkclient.ZkEventThread).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
2016-07-23 21:33:28.355:INFO:oejsh.ContextHandler:started o.e.j.s.ServletContextHandler{/,jar:file:/usr/local/kafka_monitor/KafkaOffsetMonitor-assembly-0.2.0.jar!/offsetapp}
2016-07-23 21:33:28.395:INFO:oejs.AbstractConnector:Started [email protected]:8089

執行結果  KafkaOffsetMonitor  登入 http://192.168.189.3:8089/#/






一:kafka啟動調整,將
nohup /usr/local/kafka_2.10-0.9.0.1/bin/kafka-server-stop.sh    /usr/local/kafka_2.10-0.9.0.1/config/server.properties &
 
更改為: 
nohup /usr/local/kafka_2.10-0.8.2.1/bin/kafka-server-start.sh  /usr/local/kafka_2.10-0.8.2.1/config/server.properties &


二:114的程式碼更改,補充offset的內容

如首次讀取,則IMFfromOffsets.put(topicAndPartition, 0L);  偏移量置0;
如已經有資料,則讀取已經消費的offset的位置 IMFfromOffsets.put(topicAndPartition, offset); 

使用.createDirectStream方式,從kafka的IMFfromOffsets位置開始取資料

KafkaUtils.createDirectStream(jsc, String.class,
String.class, StringDecoder.class, StringDecoder.class, Tuple2.class,kafkaParameters, IMFfromOffsets,
new Function<MessageAndMetadata<String, String>,Tuple2 >()

然後更新 傳送到kafka,由kafka更新到zookeeper



應讀者wudongdong411的要求,上原始碼吧:


	
		SparkConf conf = new SparkConf().setMaster("spark://192.168.189.1:7077")
				.setAppName("IMF-kafka.offset-20160730-2-114-AdClickedStreamingStats");

	
		JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(10));
		jsc.checkpoint("/usr/local/IMF_testdata/IMFcheckpoint114");
	
		/**
		 * 建立Kafka元資料,來讓Spark Streaming這個Kafka Consumer利用
		 */
		Map kafkaParameters = new HashMap();
		kafkaParameters.put("metadata.broker.list", "Master:9092,Worker1:9092,Worker2:9092");

		Set topics = new HashSet();
		topics.add("AdClicked");
	
		
		/* 
		  讀取上次消費的offset的位置
		 http://blog.csdn.net/rongyongfeikai2/article/details/49784785#
			 
		*/
		 scala.collection.immutable.Map scalaImmutablekafkaParameters= scala.collection.JavaConverters.mapAsScalaMapConverter(kafkaParameters).asScala().toMap(scala.Predef.conforms());

		
		final   String IMFgroupId = "IMF-114-groupId-20160730offset" ;
		
		
		 java.util.Map IMFfromOffsets = new java.util.HashMap();  
		
	 	final KafkaCluster kafkaCluster= new KafkaCluster( scalaImmutablekafkaParameters);
	 	
	 	scala.collection.immutable.Set scalaTotopics = scala.collection.JavaConverters.asScalaSetConverter(topics).asScala().toSet( );
	 	
	 	scala.collection.immutable.Set scalaTopicAndPartitionSet = kafkaCluster.getPartitions( scalaTotopics).right().get();
 	
	 	if(kafkaCluster.getConsumerOffsets(IMFgroupId, scalaTopicAndPartitionSet).isLeft()) {
	 		Set javaTopicAndPartitionSet = JavaConversions.setAsJavaSet(scalaTopicAndPartitionSet);  
	 		for (TopicAndPartition topicAndPartition : javaTopicAndPartitionSet) {  
	 			        IMFfromOffsets.put(topicAndPartition, 0L);  
	 			      }  
		} else {
			//讀取已經消費的offset的位置
			
			scala.collection.immutable.Map consumerOffsetsTemp = kafkaCluster.getConsumerOffsets(IMFgroupId,scalaTopicAndPartitionSet).right().get();  
			Map consumerOffsets = JavaConversions.mapAsJavaMap(consumerOffsetsTemp);

			Set javaTopicAndPartitionSet = JavaConversions.setAsJavaSet(scalaTopicAndPartitionSet);  

			  for (TopicAndPartition topicAndPartition : javaTopicAndPartitionSet) {  
				  Long offset = (Long) consumerOffsets.get(topicAndPartition);  
				 IMFfromOffsets.put(topicAndPartition, offset); 
				  
				  
				  }  
			
		}
		
		
	
	 	
	 	JavaInputDStream adClickedStreamingTuple2 =	KafkaUtils.createDirectStream(jsc, String.class,
				String.class, StringDecoder.class, StringDecoder.class, Tuple2.class,kafkaParameters, IMFfromOffsets,
				 new Function,Tuple2 >(){
	 				private static final long serialVersionUID = 1L;

					@Override
					public  Tuple2  call(MessageAndMetadata v1) throws Exception {
						 return new Tuple2(v1.topic(), v1.message()) ;
			
					}
	 	}); 
	

		final AtomicReference offsetRanges = new AtomicReference();
	 	
		adClickedStreamingTuple2.foreachRDD(new Function,Void>() {

			@Override
			public Void call(JavaRDD arg0) throws Exception {
				 OffsetRange[] offsets = ((HasOffsetRanges) arg0.rdd()).offsetRanges();  
				 
					for(OffsetRange o: offsets){

	                    TopicAndPartition topicAndPartition = new TopicAndPartition(o.topic(), o.partition());
	                    Map topicAndPartitionObjectMap = new HashMap();
	                    topicAndPartitionObjectMap.put(topicAndPartition, o.untilOffset());

	            
	                    scala.collection.mutable.Map map =
	                            JavaConversions.mapAsScalaMap(topicAndPartitionObjectMap);
	                    scala.collection.immutable.Map scalatopicAndPartitionObjectMap =
	                    		map.toMap(new Predef.$less$colon$less, Tuple2>() {
	                        public Tuple2 apply(Tuple2 v1) {
	                            return v1;
	                        }
	                    });

	             
	                    kafkaCluster.setConsumerOffsets(IMFgroupId, scalatopicAndPartitionObjectMap);
					}

				
					return null;
			}
 
		});
	
	
			JavaPairDStream filteredadClickedStreaming =	adClickedStreamingTuple2.transformToPair(new Function,JavaPairRDD>() {
				 
				private static final long serialVersionUID = 1L;

				@Override
				public JavaPairRDD call(JavaRDD rdd) throws Exception {
					final List blackListNames = new ArrayList();
					JDBCWrapper jdbcWrapper = JDBCWrapper.getJDBCInstance();
					jdbcWrapper.doQuery("SELECT * FROM blacklisttable", null, new ExecuteCallBack() {

						@Override
						public void resultCallBack(ResultSet result) throws Exception {

							while (result.next()) {

								blackListNames.add(result.getString(1));
							}
						}

					});

					List> blackListTuple = new ArrayList>();

					for (String name : blackListNames) {
						blackListTuple.add(new Tuple2(name, true));
					}

					List> blackListFromDB = blackListTuple; // 資料來自於查詢的黑名單表並且對映成為

					JavaSparkContext jsc = new JavaSparkContext(rdd.context());

					/**
					 * 黑名單的表中只有userID,但是如果要進行join操作的話,就必須是Key-Value,所以
					 * 在這裡我們需要基於資料表中的資料產生Key-Value型別的資料集合;
					 */
					JavaPairRDD blackListRDD = jsc.parallelizePairs(blackListFromDB);

					/**
					 * 進行操作的時候肯定是基於userID進行join的,
					 * 所以必須把傳入的rdd進行mapToPair操作轉化成為符合 格式的rdd
					 * 
					 * 廣告點選的基本資料格式:timestamp、ip、userID、adID、province、city
					 */

					JavaPairRDD> rdd2Pair = rdd
							.mapToPair(new PairFunction>(){

								@Override
								public Tuple2> call(Tuple2 t) throws Exception {
									String userID = ((String) t._2).split("\t")[2];
									return new Tuple2>(userID, t);
								}
														
							});
						

					JavaPairRDD, Optional>> joined = rdd2Pair
							.leftOuterJoin(blackListRDD);

					JavaPairRDD result = joined.filter(
							new Function, Optional>>, Boolean>() {

						@Override
						public Boolean call(Tuple2, Optional>> v1)
								throws Exception {
							Optional optional = v1._2._2;

							if (optional.isPresent() && optional.get()) {
								return false;
							} else {
								return true;
							}

						}
					}).mapToPair(
							new PairFunction, Optional>>, String, String>() {

						@Override
						public Tuple2 call(
								Tuple2, Optional>> t)
										throws Exception {
							// TODO Auto-generated method stub
							return t._2._1;
						}
					});

					return result;
				}
				
			});
			filteredadClickedStreaming.print();

三:MockAdClickedStats 生產者生產資料進行效能測試,將執行緒中sleep語句去掉,一直迴圈不斷的生產資料21:37-21:42 約5分鐘生產80萬條記錄,之後停掉了這個快速生產資料的指令碼,重新執行之前老的指令碼,恢復之前每2秒1條的生產資料速度。

四:KafkaOffsetMonitor監控頁面


相關推薦

114加強:SparkStreaming+Kafka+createDirectStream+KafkaOffsetMonitor解決內幕

第114課加強版:SparkStreaming+Kafka+createDirectStream  前傳: 114課程的高階版: 1. spark streaming 使用 direct方式直接讀取 kafka的資料,offset 沒有經過zookeeper。因此在

91SparkStreaming基於Kafka Direct案例實戰和內幕原始碼解密 java.lang.ClassNotFoundException 踩坑解決問題詳細內幕版本

第91課:SparkStreaming基於Kafka Direct案例實戰和內幕原始碼解密    /* * *王家林老師授課http://weibo.com/ilovepains */  每天晚上20:00YY頻道現場授課頻道68917580 1、作業內容:SparkS

韋東山一期加強筆記3

第一節  Linux命令入門演練在學習嵌入式之前,我們有必要學會linux一些常用的命令,在以後的學習中看到這些命令才不會陌生,在入門階段,只要學會一些常見到的命令,遇到不常用的,再去百度。pwd:顯示當前的路徑ls:顯示當前目錄下所有的檔案cd:切換路徑cd..:返回上一級

三次實驗計算分段函數 四次計算分段函數和循環NEW 五次分支+循環加強 實驗報告

scan amp 函數 寬度 中大 解決方法 sca -1 三次 一.實驗題目,設計思路,實現方法 第四次分支+循環 加強版 (2-2計算個人所得稅,2-7 裝睡,2-8計算天數) 設計思路:2-2 用if-else的語句,與計算分段函數的題類似的做法;2-7 運用for語

機器學習升級(VII)——1 機器學習與數學分析

矩陣分解 變化 回歸分析 兩個 例如 處理 fff mage 我們 參考:鄒博 《機器學習升級版》 1. 機器學習概論 1. 什麽是機器學習 定義:對於某給定的任務T,在合理的性能度量方案P的前提下,某計算機程序可以自主學習任務T的經驗E;隨著提供合適、

使用webgl(three.js)搭建一個3D建築,3D消防模擬,web3D,bim管理系統——

function getBuildFloorData() { var models = [{ "show": true, "uuid": "", "name": "m4_dtWall_1", "objType": "cube2", "length": 1000, "width": 200, "hei

機器學習升級七期——logistic迴歸

首先看幾個分類圖,可以明顯的看出哪個分類更好 下面logistic/sigmoid函式比較熟悉不做過多介紹。 logistic迴歸引數估計: 上面這種假定為發生與不發生,其對應的概率為h(x)

laravel教程4: 玩轉資料遷移laravel migration(超詳細)

補充上節課:配置虛擬主機 不用修改nginx的配置檔案,減少學習成本,只要在homestead.yaml 和host檔案兩個檔案中做很小的改動就行 D:\03www2018\homestead\Homestead.yaml map: myblog.app

通過自動回覆機器人學Mybatis——加強——慕

**2-1. 介面式程式設計** 介面可以規範型別 呼叫:getMapper public class MessageDao { public List<Message> queryMessageList(String command,String des

87:Flume推送資料到SparkStreaming案例實戰和內幕原始碼解密--flume安裝篇

1、  下載flume 老師提供的包 2、  安裝 vi/etc/profile exportFLUME_HOME=/usr/local/apache-flume-1.6.0-bin exportPATH=.:$PATH:$JAVA_HOME/bin:$HADOOP

sparkStreaming+kafka pythonwordcount申請資源不成功

ERROR:py4j.java_gateway:Error while sending or receiving. Traceback (most recent call last): File "/root/hadoop/tmp/nm-local-dir/userca

008_1個ARM落程序及引申

ash 學習 啟動 nor 引腳 技術分享 因此 ask 知識 form:第008課_第1個ARM裸板程序及引申 第001節_輔線1_硬件知識_LED原理圖 當我們學習C語言的時候,我們會寫個Hello程序。那當我們下ARM程序,也該有一個簡單的程序引領我們入門,這個程

使用webgl(three.js)搭建一個3D智慧園區、3D建築,3D消防模擬,web3D,bim管理系統——(炫酷一)

這節課我們主要講解園區三維視覺化炫酷感官技術方案 前言:   當基礎技術達到普及狀態,應用就趨向於極致,在三維視覺化領域也是這個道理。各大視覺化公司都追求美觀最大化,這時候美工的作用就不容忽視了。   背景說明:     A、經濟背景:經濟下行的大環境下,各大有社會責任的企業與部門開始拉動內需,擴大預

、算法效率的度量

分享 turn 結構 sin 效率 mage alt exit 額外 一、常見的時間復雜度 常見時間復雜度的比較 二、算法分析 定義一個數組 此算法最好的情況時執行一次 而最壞的情況卻要執行n次 註意:數據結構課程中,在沒有特殊說明時,所分析算法的時間復雜度都是

【Linux探索之旅】第一部分:測試並安裝Ubuntu

u盤 nco 過程 sans ubunt windows u盤啟動盤 系統 .com 內容簡單介紹 1、第一部分第三課:測試並安裝Ubuntu 2、第一部分第四課預告:磁盤分區 測試並安裝Ubuntu 大家好,經過前兩個比較偏理論(是否

9 - 函數定義及調用

函數的調用 align 應用 abs end 語言 命令集 func home 第9課 - 函數定義及調用 1. makefile中的函數   (1)make 解釋器提供了一系列的函數供 makefile 調用   (2)在 makefile 中支持自定義函數實現,並調用執

unit8 mariadb

password 配置文件 current 數據庫 enter 1.yum intall mariadb-server -y ##安裝mariadb服務 systemctl start mariadb ##開啟服務 vim /etc/my.cnf #

可持久化並查集加強 BZOJ 3674

log 歷史 clear 必須 new 路徑壓縮 都是 return 父節點 http://www.lydsy.com/JudgeOnline/problem.php?id=3674 3674: 可持久化並查集加強版 Time Limit: 15 Sec Memory

unit9 Apache

配置文件 start enable 信息 1.安裝 yum install httpd -y systemctl start httpd systemctl enable httpd systemctl stop firewalld systemctl disable

java-web—— Servlet 控制器的引入

需要 分享 http javabean 之前 實現類 控制 servle 我會 前言:   之前我們寫的代碼,可以以這樣的流程圖來解釋   今天我們需要講的就是不讓 jsp 直接訪問 javabean   而是加入一個新概念, setvlet 控制器,   也是對應了我