跳到主要内容

Stream

  • Stream API 是一种流式的数据处理风格,将要处理的数据当作流,在管道内进行传输,并在管道中的每个节点对数据进行处理,比如过滤、排序、映射、聚合等
  • Stream 的操作
    Stream 的操作
    1. 中间操作:中间操作只是一种标记,结束操作时才会触发实际计算
      • 无状态:元素的处理不受前面元素的影响
      • 有状态:有状态的中间操作必须等到所有元素处理之后才能得到最终结果,比如排序
    2. 终止操作:得出最后计算结果的操作
      • 短路操作:不用处理全部元素就可以返回结果
      • 非短路操作:必须处理完所有元素才能得到结果

执行过程

  • 执行过程概述
    1. 将多个操作构建成作用链,该作用链是一个双向链表
      Stream 执行过程
    2. 将每个元素依次让作用链进行处理
      Stream 执行过程
    3. 终止操作收集作用链处理的结果
  • 例子
    List<String> list = Lists.newArrayList("Chris Kate", "Bruce", "Tina");
    list.stream().filter(s -> s.length() > 5).sorted((s1, s2) -> s1.length() - s2.length()).collect(Collectors.toList());
    filter() 为无状态中间操作,sorted() 为有状态中间操作。collect() 为终止操作
    • 构建作用链
      一般为一个源节点,多个中间操作,一个种植操作,构建的双向链表如下
      Stream 执行过程
      • 调用 list.stream() 时,会创建一个源节点
        Stream 执行过程
      • 调用 filter()(无状态中间操作) 时,将操作添加到链表末尾,将 Head 和 FilterOp 连接起来
        Stream 执行过程
      • 调用 sorted()(有状态中间操作)时,同样将 SortedOp 添加到链表末尾,此外一下属性会记录排序操作需要的数据,比如比较器 Comparator
        Stream 执行过程
    • 作用链循环处理每个元素
      • 调用 collect() 终止操作
        终止操作是收集处理结果的操作,因此在调用 collect() 终止操作时就会进行以下操作
        • 遍历流元素,以此让流元素经过作用链的处理
        • 由于排序 Op 是有状态的中间操作,因此,排序 Op 最存储前面的操作处理过的流元素,进行一些操作,然后再传递给其下游进行处理
        • 终止操作只用于收集结果,终止操作处理之后,返回的并不是一个流,那么这个流的处理就结束了
    • 调用 collect() 操作之后的事
      • 判断并行流还是串行流,分别调用各自的处理方式
        return isParallel()
        ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
        : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
      • 如果是串行流,则会调用终止操作对应的 evaluate 方法,此处考虑 ReduceOp 的情况
      • 在 evaluate 方法中会创建一个 ReducingSink,用来收集 reduce 的处理结果
      • 接下来就是调用 warpSink() 对 Sink 进行封装,封转过的结果就是形成一个水槽的单向链表
        水槽的单向链表
      • 使用 copyInto() 方法,对元素进行迭代,依次由水槽单向链表处理
        水槽的单向链表

Stream 的并行操作原理

  • 本质上就是在 ForkJoin 上进行了一层封装,将 Stream 不断尝试分解成更小的 split,然后使用 Fork/Join 框架分而治之
    并行操作原理

Stream 并行的线程池

  • 默认使用 fork/join 池来完成,这个线程池是所有并行流共享的,ForkJoinPool.commonPool()
  • 可以创建自己的线程池,避免共享线程池