java8中Stream的使用以及分割list案例
一、Steam的優勢
java8中Stream配合Lambda表示式極大提高了程式設計效率,程式碼簡潔易懂(可能剛接觸的人會覺得晦澀難懂),不需要寫傳統的多執行緒程式碼就能寫出高效能的併發程式
二、專案中遇到的問題
由於微信介面限制,每次匯入code只能100個,所以需要分割list。但是由於code數量可能很大,這樣執行效率就會很低。
1.首先想到是用多執行緒寫傳統並行程式,但是博主不是很熟練,寫出程式碼可能會出現不可預料的結果,容易出錯也難以維護。
2.然後就想到Steam中的parallel,能提高效能又能利用java8的特性,何樂而不為。
三、廢話不多說,直接先貼程式碼,然後再解釋(java8分割list程式碼在標題四)。
1.該方法是根據傳入數量生成codes,private String getGeneratorCode(int tenantId)是我根據編碼規則生成唯一code這個不需要管,我們要看的是Stream.iterate
2.iterate()第一個引數為起始值,第二個函式表示式(看自己想要生成什麼樣的流關鍵在這裡),http://write.blog.csdn.net/postedit然後必須要通過limit方法來限制自己生成的Stream大小。parallel()是開啟並行處理。map()就是一對一的把Stream中的元素對映成ouput Steam中的 元素。最後用collect收集,
2.1 構造流的方法還有Stream.of(),結合或者陣列可直接list.stream();
String[] array = new String[]{"1","2","3"} ;
stream = Stream.of(array)或者Arrays.Stream(array);
2.2 數值流IntStream
int[] array = new int[]{1,2,3};
IntStream.of(array)或者IntStream.ranage(0,3)
3.以上構造流的方法都是已經知道大小,對於通過入參確定的應該圖中方法自己生成流。
四、java8分割list,利用StreamApi實現。
沒用java8前程式碼,做個鮮明對比():
1.list是我的編碼集合(codes)。MAX_SEND為100(即每次100的大小去分割list),limit為按編碼集合大小算出的本次需要分割多少次。
2.我們可以看到其實就是多了個skip跟limit方法。skip就是捨棄stream前多少個元素,那麼limit就是返回流前面多少個元素(如果流裡元素少於該值,則返回全部)。然後開啟並行處理。通過迴圈我們的分割list的目標就達到了,每次取到的sendList就是100,100這樣子的。
3.因為我這裡業務就只需要到這裡,如果我們分割之後需要收集之後再做處理,那隻需要改寫一下就ok;如:
List<List<String>> splitList = Stream.iterate(0,n->n+1).limit(limit).parallel().map(a->{ List<String> sendList = list.stream().skip(a*MAX_SEND).limit(MAX_SEND).parallel().collect(Collectors.toList()); }).collect(Collectors.toList());
五、java8流裡好像拿不到下標,所以我才用到構造一個遞增數列當下標用,這就是我用java8分割list的過程,比以前的for迴圈看的爽心悅目,優雅些,效能功也提高了。
如果各位有更好的實現方式,歡迎留言指教。
補充知識:聊聊flink DataStream的split操作
序
本文主要研究一下flink DataStream的split操作
例項
SplitStream<Integer> split = someDataStream.split(new OutputSelector<Integer>() { @Override public Iterable<String> select(Integer value) { List<String> output = new ArrayList<String>(); if (value % 2 == 0) { output.add("even"); } else { output.add("odd"); } return output; } });
本例項將dataStream split為兩個dataStream,一個outputName為even,另一個outputName為odd
DataStream.split
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/DataStream.java
@Public public class DataStream<T> { //...... public SplitStream<T> split(OutputSelector<T> outputSelector) { return new SplitStream<>(this,clean(outputSelector)); } //...... }
DataStream的split操作接收OutputSelector引數,然後建立並返回SplitStream
OutputSelector
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/collector/selector/OutputSelector.java
@PublicEvolving public interface OutputSelector<OUT> extends Serializable { Iterable<String> select(OUT value); }
OutputSelector定義了select方法用於給element打上outputNames
SplitStream
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/SplitStream.java
@PublicEvolving public class SplitStream<OUT> extends DataStream<OUT> { protected SplitStream(DataStream<OUT> dataStream,OutputSelector<OUT> outputSelector) { super(dataStream.getExecutionEnvironment(),new SplitTransformation<OUT>(dataStream.getTransformation(),outputSelector)); } public DataStream<OUT> select(String... outputNames) { return selectOutput(outputNames); } private DataStream<OUT> selectOutput(String[] outputNames) { for (String outName : outputNames) { if (outName == null) { throw new RuntimeException("Selected names must not be null"); } } SelectTransformation<OUT> selectTransform = new SelectTransformation<OUT>(this.getTransformation(),Lists.newArrayList(outputNames)); return new DataStream<OUT>(this.getExecutionEnvironment(),selectTransform); } }
SplitStream繼承了DataStream,它定義了select方法,可以用來根據outputNames選擇split出來的dataStream;select方法建立了SelectTransformation
StreamGraphGenerator
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
@Internal public class StreamGraphGenerator { //...... private Collection<Integer> transform(StreamTransformation<?> transform) { if (alreadyTransformed.containsKey(transform)) { return alreadyTransformed.get(transform); } LOG.debug("Transforming " + transform); if (transform.getMaxParallelism() <= 0) { // if the max parallelism hasn't been set,then first use the job wide max parallelism // from theExecutionConfig. int globalMaxParallelismFromConfig = env.getConfig().getMaxParallelism(); if (globalMaxParallelismFromConfig > 0) { transform.setMaxParallelism(globalMaxParallelismFromConfig); } } // call at least once to trigger exceptions about MissingTypeInfo transform.getOutputType(); Collection<Integer> transformedIds; if (transform instanceof OneInputTransformation<?,?>) { transformedIds = transformOneInputTransform((OneInputTransformation<?,?>) transform); } else if (transform instanceof TwoInputTransformation<?,?,?>) { transformedIds = transformTwoInputTransform((TwoInputTransformation<?,?>) transform); } else if (transform instanceof SourceTransformation<?>) { transformedIds = transformSource((SourceTransformation<?>) transform); } else if (transform instanceof SinkTransformation<?>) { transformedIds = transformSink((SinkTransformation<?>) transform); } else if (transform instanceof UnionTransformation<?>) { transformedIds = transformUnion((UnionTransformation<?>) transform); } else if (transform instanceof SplitTransformation<?>) { transformedIds = transformSplit((SplitTransformation<?>) transform); } else if (transform instanceof SelectTransformation<?>) { transformedIds = transformSelect((SelectTransformation<?>) transform); } else if (transform instanceof FeedbackTransformation<?>) { transformedIds = transformFeedback((FeedbackTransformation<?>) transform); } else if (transform instanceof CoFeedbackTransformation<?>) { transformedIds = transformCoFeedback((CoFeedbackTransformation<?>) transform); } else if (transform instanceof PartitionTransformation<?>) { transformedIds = transformPartition((PartitionTransformation<?>) transform); } else if (transform instanceof SideOutputTransformation<?>) { transformedIds = transformSideOutput((SideOutputTransformation<?>) transform); } else { throw new IllegalStateException("Unknown transformation: " + transform); } // need this check because the iterate transformation adds itself before // transforming the feedback edges if (!alreadyTransformed.containsKey(transform)) { alreadyTransformed.put(transform,transformedIds); } if (transform.getBufferTimeout() >= 0) { streamGraph.setBufferTimeout(transform.getId(),transform.getBufferTimeout()); } if (transform.getUid() != null) { streamGraph.setTransformationUID(transform.getId(),transform.getUid()); } if (transform.getUserProvidedNodeHash() != null) { streamGraph.setTransformationUserHash(transform.getId(),transform.getUserProvidedNodeHash()); } if (transform.getMinResources() != null && transform.getPreferredResources() != null) { streamGraph.setResources(transform.getId(),transform.getMinResources(),transform.getPreferredResources()); } return transformedIds; } private <T> Collection<Integer> transformSelect(SelectTransformation<T> select) { StreamTransformation<T> input = select.getInput(); Collection<Integer> resultIds = transform(input); // the recursive transform might have already transformed this if (alreadyTransformed.containsKey(select)) { return alreadyTransformed.get(select); } List<Integer> virtualResultIds = new ArrayList<>(); for (int inputId : resultIds) { int virtualId = StreamTransformation.getNewNodeId(); streamGraph.addVirtualSelectNode(inputId,virtualId,select.getSelectedNames()); virtualResultIds.add(virtualId); } return virtualResultIds; } private <T> Collection<Integer> transformSplit(SplitTransformation<T> split) { StreamTransformation<T> input = split.getInput(); Collection<Integer> resultIds = transform(input); // the recursive transform call might have transformed this already if (alreadyTransformed.containsKey(split)) { return alreadyTransformed.get(split); } for (int inputId : resultIds) { streamGraph.addOutputSelector(inputId,split.getOutputSelector()); } return resultIds; } //...... }
StreamGraphGenerator裡頭的transform會對SelectTransformation以及SplitTransformation進行相應的處理
transformSelect方法會根據select.getSelectedNames()來addVirtualSelectNode
transformSplit方法則根據split.getOutputSelector()來addOutputSelector
小結
DataStream的split操作接收OutputSelector引數,然後建立並返回SplitStream
OutputSelector定義了select方法用於給element打上outputNames
SplitStream繼承了DataStream,它定義了select方法,可以用來根據outputNames選擇split出來的dataStream
doc
DataStream Transformations
以上這篇java8中Stream的使用以及分割list案例就是小編分享給大家的全部內容了,希望能給大家一個參考,也希望大家多多支援我們。