Stream
- Stream API 是一种流式的数据处理风格,将要处理的数据当作流,在管道内进行传输,并在管道中的每个节点对数据进行处理,比如过滤、排序、映射、聚合等
- Stream 的操作
- 中间操作:中间操作只是一种标记,结束操作时才会触发实际计算
- 无状态:元素的处理不受前面元素的影响
- 有状态:有状态的中间操作必须等到所有元素处理之后才能得到最终结果,比如排序
- 终止操作:得出最后计算结果的操作
- 短路操作:不用处理全部元素就可以返回结果
- 非短路操作:必须处理完所有元素才能得到结果
- 中间操作:中间操作只是一种标记,结束操作时才会触发实际计算
执行过程
- 执行过程概述
- 将多个操作构建成作用链,该作用链是一个双向链表
- 将每个元素依次让作用链进行处理
- 终止操作收集作用链处理的结果
- 将多个操作构建成作用链,该作用链是一个双向链表
- 例子 filter() 为无状态中间操作,sorted() 为有状态中间操作。collect() 为终止操作
List<String> list = Lists.newArrayList("Chris Kate", "Bruce", "Tina");
list.stream().filter(s -> s.length() > 5).sorted((s1, s2) -> s1.length() - s2.length()).collect(Collectors.toList());- 构建作用链
一般为一个源节点,多个中间操作,一个种植操作,构建的双向链表如下- 调用 list.stream() 时,会创建一个源节点
- 调用 filter()(无状态中间操作) 时,将操作添加到链表末尾,将 Head 和 FilterOp 连接起来
- 调用 sorted()(有状态中间操作)时,同样将 SortedOp 添加到链表末尾,此外一下属性会记录排序操作需要的数据,比如比较器 Comparator
- 调用 list.stream() 时,会创建一个源节点
- 作用链循环处理每个元素
- 调用 collect() 终止操作
终止操作是收集处理结果的操作,因此在调用 collect() 终止操作时就会进行以下操作- 遍历流元素,以此让流元素经过作用链的处理
- 由于排序 Op 是有状态的中间操作,因此,排序 Op 最存储前面的操作处理过的流元素,进行一些操作,然后再传递给其下游进行处理
- 终止操作只用于收集结果,终止操作处理之后,返回的并不是一个流,那么这个流的处理就结束了
- 调用 collect() 终止操作
- 调用 collect() 操作之后的事
- 判断并行流还是串行流,分别调用各自的处理方式
return isParallel()
? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
: terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags())); - 如果是串行流,则会调用终止操作对应的 evaluate 方法,此处考虑 ReduceOp 的情况
- 在 evaluate 方法中会创建一个 ReducingSink,用来收集 reduce 的处理结果
- 接下来就是调用 warpSink() 对 Sink 进行封装,封转过的结果就是形成一个水槽的单向链表
- 使用 copyInto() 方法,对元素进行迭代,依次由水槽单向链表处理
- 判断并行流还是串行流,分别调用各自的处理方式
- 构建作用链
Stream 的并行操作原理
- 本质上就是在 ForkJoin 上进行了一层封装,将 Stream 不断尝试分解成更小的 split,然后使用 Fork/Join 框架分而治之
Stream 并行的线程池
- 默认使用 fork/join 池来完成,这个线程池是所有并行流共享的,ForkJoinPool.commonPool()
- 可以创建自己的线程池,避免共享线程池