DateSet開發--簡單入門
阿新 • • 發佈:2019-03-17
ble lin encoding element The -i con depend think
開發流程
1. 獲得一個execution environment, 2. 加載/創建初始數據, 3. 指定這些數據的轉換, 4. 指定將計算結果放在哪裏, 5. 觸發程序執行
例子:
object DataSet_WordCount { def main(args: Array[String]) { //TODO 初始化環境 val env = ExecutionEnvironment.getExecutionEnvironment //TODO 加載/創建初始數據 val text = env.fromElements("Who‘s there?", "I think I hear them. Stand, ho! Who‘s there?") //TODO 指定這些數據的轉換 val split_words = text.flatMap(line => line.toLowerCase().split("\\W+")) val filter_words = split_words.filter(x=> x.nonEmpty) val map_words = filter_words.map(x=> (x,1)) val groupBy_words= map_words.groupBy(0) val sum_words = groupBy_words.sum(1) //todo 指定將計算結果放在哪裏 // sum_words.setParallelism(1)//匯總結果 sum_words.writeAsText(args(0))//"/Users/niutao/Desktop/flink.txt" //TODO 觸發程序執行 env.execute("DataSet wordCount") } }
添加maven打包插件:
<build> <sourceDirectory>src/main/java</sourceDirectory> <testSourceDirectory>src/test/scala</testSourceDirectory> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>2.5.1</version> <configuration> <source>1.7</source> <target>1.7</target> <!--<encoding>${project.build.sourceEncoding}</encoding>--> </configuration> </plugin> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.2.0</version> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> <configuration> <args> <!--<arg>-make:transitive</arg>--> <arg>-dependencyfile</arg> <arg>${project.build.directory}/.scala_dependencies</arg> </args> </configuration> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-surefire-plugin</artifactId> <version>2.18.1</version> <configuration> <useFile>falseView Code</useFile> <disableXmlReport>true</disableXmlReport> <includes> <include>**/*Test.*</include> <include>**/*Suite.*</include> </includes> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>2.3</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <filters> <filter> <artifact>*:*</artifact> <excludes> <!-- zip -d learn_spark.jar META-INF/*.RSA META-INF/*.DSA META-INF/*.SF --> <exclude>META-INF/*.SF</exclude> <exclude>META-INF/*.DSA</exclude> <exclude>META-INF/*.RSA</exclude> </excludes> </filter> </filters> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass>com.nt.DEMO.WordCount</mainClass> </transformer> </transformers> </configuration> </execution> </executions> </plugin> </plugins> </build>
使用rz命令上傳jar包,然後執行程序:
bin/flink run -m yarn-cluster -yn 2 /home/elasticsearch/flinkjar/itcast_learn_flink-1.0-SNAPSHOT.jar com.nt.DEMO.WordCount
在yarn的8088頁面可以觀察到提交的程序
去/opt/cdh/flink-1.3.2/flinkJAR文件夾下可以找到輸出的運行結果
DateSet開發--簡單入門