jdk8开始,java新增了Stream相关的API。配合lambda表达式,java也拥有了部分的函数式编程的能力。
Stream的出现,使得java对集合的操作变得简单且意图明确,同时parallel方法也使得并行计算变得方便。下面仿照jdk的代码自己实现下面的代码。

1
2
3
4
5
Stream.of(3, 1, 2)
.map(x -> String.valueOf(x + 1))
.sorted()
.map(Integer::parseInt)
.forEach(System.out::println);

这一篇只是初步实现stream的简易串行版本,后面会再尝试实现简易的并行API

在上面的代码中,包含四种操作

  • 创建stream操作,不包含其他的操作
  • map这种无状态的操作,也就是不依赖上下文的操作
  • sorted有状态的操作,需要得到全部的信息才能生效
  • foreach终止操作,后面不能再有其他操作。

上面这四种操作,分别在在代码中对应为Head,StatelessOp,StatefulOp以及TerminalOp。这些操作除了终止操作都是referencePipeLine的子类,也就是每一个操作都是流水线的一部分。为什么要设计成流水线的模式?因为在流水线的模式下,可以做到尽量在一次遍历中完成整个操作。下面来简单说一下这个流水线是怎么实现的。

上面的每个操作都是一个op,在stream中,一个完整的操作可以用<数据源,操作,回调函数>简单表示。而这个完整的操作在stream中用stage表示。上面的代码最终行程的流水线类似一个个stage组成的双向链表。至于为什么是双向的链表,是因为前面的stage要调用后面的stage,所以需要后面stage的引用。真正触发整个流水线工作的地方是在最后的终止操作,也就是前面的非终止操作都是惰性执行的。当终止操作开始,需要一层层的包装回去,包装成”一个操作”,执行这个操作就相当于执行整个流水线,所以当前的 stage需要有前一个stage的引用,也就形成了双线链表。同时两个stage之间的调用肯定也是有一个协议,也就是sink接口。
整个代码的设计还是很精巧的。下面的我写的简易的stream
MyStream

参考:
深入理解Java Stream流水线