简单地说,滑窗算法是一种移动固定大小的窗口(子列表)来遍历数据结构的方法,主要是基于固定步骤的序列流数据。
如果我们想通过使用大小为3的窗口遍历列表[1 2 3 4 5],我们透过窗口只能看到以下数据组:
[1 2 3]
[2 3 4]
[3 4 5]
.如果我们想要使用比集合大小更大的窗口遍历相同的列表,我们甚至不会得到一个元素。
Java 10提供了一种 实现,支持顺序和并行聚合操作的一系列元素:
int sum = widgets.stream() .filter(w -> w.getColor() == RED) .mapToInt(w -> w.getWeight()) .sum();
下面谈的是如何在这个流上使用滑窗算法。
为了能够创建自定义Stream,我们需要实现自定义 。
在我们的例子中,我们需要能够迭代Stream <T>序列数据,因此我们需要实现Spliterator接口并指定泛型类型参数:
public class SlidingWindowSpliteratorimplements Spliterator > {// ...}
有一堆方法需要实现:
public class SlidingWindowSpliteratorimplements Spliterator > {//下面会实现@Overridepublic boolean tryAdvance(Consumer > action) {return false;}//准备下面实现@Overridepublic Spliterator > trySplit() { return null;}@Overridepublic long estimateSize() { return 0;}//下面准备实现@Overridepublic int characteristics() { return 0;}}
我们还需要一些字段来存储缓冲元素、窗口大小参数、源集合的迭代器以及预先计算的大小估计(稍后我们将需要):
private final Queuebuffer;private final Iterator sourceIterator;private final int windowSize;private final int size;
在我们开始实现接口方法之前,我们需要能够实例化我们的工具。
在这种情况下,我们将限制构造函数的可见性,并公开一个公共静态工厂方法:
private SlidingWindowSpliterator(Collectionsource, int windowSize) {this.buffer = new ArrayDeque<>(windowSize);this.sourceIterator = Objects.requireNonNull(source).iterator();this.windowSize = windowSize;this.size = calculateSize(source, windowSize);}
公开的静态方法:
staticStream > windowed(Collection stream, int windowSize) {return StreamSupport.stream(new SlidingWindowSpliterator<>(stream, windowSize), false);}
现在让我们实现Spliterator方法中容易的部分。
实现 trySplit()时,我们默认使用文档中指定的值。幸运的是,计算大小很容易:
private static int calculateSize(Collection source, int windowSize) {return source.size() < windowSize? 0: source.size() - windowSize + 1;}@Override public Spliterator> trySplit() { return null; } @Override public long estimateSize() { return size; }
在characteristics()中,我们指定:
ORDERED - 因为顺序很重要
NONNULL - 因为元素永远不会为null(尽管可以包含空值)
SIZED -因为大小是可以预见的
@Overridepublic int characteristics() {return ORDERED | NONNULL | SIZED;}
现在实现tryAdvance,这里是关键部分 - 负责实际分组和迭代的方法。
首先,如果窗口小于1,则没有任何内容可以迭代,以便我们可以立即返回:
@Overridepublic boolean tryAdvance(Consumer > action) {if (windowSize < 1) {return false;}// ...}
现在,要生成第一个子列表,我们需要开始迭代并填充缓冲区:
while (sourceIterator.hasNext()) {buffer.add(sourceIterator.next());// ...}
填充缓冲区后,我们可以调度整个组,并从缓冲区中丢弃最旧的元素。
这里有一个关键部分,可能会试图将buffer.stream()传递给accept()方法,这是一个巨大的错误 - Streams惰性地绑定到底层集合,这意味着如果源更改,Stream也会更改。
为了避免这个问题并将我们的组与内部缓冲区表示分离,我们需要在创建每个Stream实例之前对缓冲区的当前状态进行快照。我们将使用数组支持Stream实例,以使它们尽可能轻量级。
由于Java不支持通用数组,我们需要做一些丑陋的转换:
if (buffer.size() == windowSize) {action.accept(Arrays.stream((T[]) buffer.toArray(new Object[0])));buffer.poll();return sourceIterator.hasNext();}
...瞧,我们准备好使用它:
windowed(List.of(1,2,3,4,5), 3).map(group -> group.collect(toList())).forEach(System.out::println);
滑窗代码编制成功,运行结果如下:
// result[1, 2, 3][2, 3, 4][3, 4, 5]