1. 程式人生 > >DateSet開發--簡單入門

DateSet開發--簡單入門

ble lin encoding element The -i con depend think

開發流程

1.    獲得一個execution environment,
2.    加載/創建初始數據,
3.    指定這些數據的轉換,
4.    指定將計算結果放在哪裏,
5.    觸發程序執行

例子:

object DataSet_WordCount {
  def main(args: Array[String]) {
    //TODO 初始化環境
    val env = ExecutionEnvironment.getExecutionEnvironment
    //TODO 加載/創建初始數據
    val text = env.fromElements(
      
"Who‘s there?", "I think I hear them. Stand, ho! Who‘s there?") //TODO 指定這些數據的轉換 val split_words = text.flatMap(line => line.toLowerCase().split("\\W+")) val filter_words = split_words.filter(x=> x.nonEmpty) val map_words = filter_words.map(x=> (x,1)) val groupBy_words
= map_words.groupBy(0) val sum_words = groupBy_words.sum(1) //todo 指定將計算結果放在哪裏 // sum_words.setParallelism(1)//匯總結果 sum_words.writeAsText(args(0))//"/Users/niutao/Desktop/flink.txt" //TODO 觸發程序執行 env.execute("DataSet wordCount") } }

將程序打包,提交到yarn

添加maven打包插件:

技術分享圖片
<build>
    <sourceDirectory>src/main/java</sourceDirectory>
    <testSourceDirectory>src/test/scala</testSourceDirectory>
    <plugins>

        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>2.5.1</version>
            <configuration>
                <source>1.7</source>
                <target>1.7</target>
                <!--<encoding>${project.build.sourceEncoding}</encoding>-->
            </configuration>
        </plugin>

        <plugin>
            <groupId>net.alchim31.maven</groupId>
            <artifactId>scala-maven-plugin</artifactId>
            <version>3.2.0</version>
            <executions>
                <execution>
                    <goals>
                        <goal>compile</goal>
                        <goal>testCompile</goal>
                    </goals>
                    <configuration>
                        <args>
                            <!--<arg>-make:transitive</arg>-->
                            <arg>-dependencyfile</arg>
                            <arg>${project.build.directory}/.scala_dependencies</arg>
                        </args>

                    </configuration>
                </execution>
            </executions>
        </plugin>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-surefire-plugin</artifactId>
            <version>2.18.1</version>
            <configuration>
                <useFile>false
</useFile> <disableXmlReport>true</disableXmlReport> <includes> <include>**/*Test.*</include> <include>**/*Suite.*</include> </includes> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>2.3</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <filters> <filter> <artifact>*:*</artifact> <excludes> <!-- zip -d learn_spark.jar META-INF/*.RSA META-INF/*.DSA META-INF/*.SF --> <exclude>META-INF/*.SF</exclude> <exclude>META-INF/*.DSA</exclude> <exclude>META-INF/*.RSA</exclude> </excludes> </filter> </filters> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass>com.nt.DEMO.WordCount</mainClass> </transformer> </transformers> </configuration> </execution> </executions> </plugin> </plugins> </build>
View Code

使用rz命令上傳jar包,然後執行程序:

bin/flink run -m yarn-cluster -yn 2 /home/elasticsearch/flinkjar/itcast_learn_flink-1.0-SNAPSHOT.jar com.nt.DEMO.WordCount

在yarn的8088頁面可以觀察到提交的程序

去/opt/cdh/flink-1.3.2/flinkJAR文件夾下可以找到輸出的運行結果

DateSet開發--簡單入門