博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
用Java实现Stream流处理中的滑窗
阅读量:6331 次
发布时间:2019-06-22

本文共 3192 字,大约阅读时间需要 10 分钟。

hot3.png

简单地说,滑窗算法是一种移动固定大小的窗口(子列表)来遍历数据结构的方法,主要是基于固定步骤的序列流数据。

如果我们想通过使用大小为3的窗口遍历列表[1 2 3 4 5],我们透过窗口只能看到以下数据组:

[1 2 3]

[2 3 4]

[3 4 5]

.如果我们想要使用比集合大小更大的窗口遍历相同的列表,我们甚至不会得到一个元素。

Java 10提供了一种  实现,支持顺序和并行聚合操作的一系列元素:

int sum = widgets.stream()                      .filter(w -> w.getColor() == RED)                      .mapToInt(w -> w.getWeight())                      .sum();

下面谈的是如何在这个流上使用滑窗算法。

为了能够创建自定义Stream,我们需要实现自定义  。

在我们的例子中,我们需要能够迭代Stream <T>序列数据,因此我们需要实现Spliterator接口并指定泛型类型参数:

public class SlidingWindowSpliterator
implements Spliterator
> {// ...}

有一堆方法需要实现:

public class SlidingWindowSpliterator
implements Spliterator
> {//下面会实现@Overridepublic boolean tryAdvance(Consumer
> action) {return false;}//准备下面实现@Overridepublic Spliterator
> trySplit() {
return null;}@Overridepublic long estimateSize() {
return 0;}//下面准备实现@Overridepublic int characteristics() {
return 0;}}

我们还需要一些字段来存储缓冲元素、窗口大小参数、源集合的迭代器以及预先计算的大小估计(稍后我们将需要):

private final Queue
buffer;private final Iterator
sourceIterator;private final int windowSize;private final int size;

在我们开始实现接口方法之前,我们需要能够实例化我们的工具。

在这种情况下,我们将限制构造函数的可见性,并公开一个公共静态工厂方法:

private SlidingWindowSpliterator(Collection
source, int windowSize) {this.buffer = new ArrayDeque<>(windowSize);this.sourceIterator = Objects.requireNonNull(source).iterator();this.windowSize = windowSize;this.size = calculateSize(source, windowSize);}

公开的静态方法:

static 
Stream
> windowed(Collection
stream, int windowSize) {return StreamSupport.stream(new SlidingWindowSpliterator<>(stream, windowSize), false);}

现在让我们实现Spliterator方法中容易的部分。

实现 trySplit()时,我们默认使用文档中指定的值。幸运的是,计算大小很容易:

private static int calculateSize(Collection
source, int windowSize) {return source.size() < windowSize? 0: source.size() - windowSize + 1;}@Override public Spliterator
> trySplit() { return null; } @Override public long estimateSize() { return size; }

在characteristics()中,我们指定:

ORDERED - 因为顺序很重要

NONNULL - 因为元素永远不会为null(尽管可以包含空值)

SIZED -因为大小是可以预见的

@Overridepublic int characteristics() {return ORDERED | NONNULL | SIZED;}

现在实现tryAdvance,这里是关键部分 - 负责实际分组和迭代的方法。

首先,如果窗口小于1,则没有任何内容可以迭代,以便我们可以立即返回:

@Overridepublic boolean tryAdvance(Consumer
> action) {if (windowSize < 1) {return false;}// ...}

现在,要生成第一个子列表,我们需要开始迭代并填充缓冲区:

while (sourceIterator.hasNext()) {buffer.add(sourceIterator.next());// ...}

填充缓冲区后,我们可以调度整个组,并从缓冲区中丢弃最旧的元素。

这里有一个关键部分,可能会试图将buffer.stream()传递给accept()方法,这是一个巨大的错误 - Streams惰性地绑定到底层集合,这意味着如果源更改,Stream也会更改。

为了避免这个问题并将我们的组与内部缓冲区表示分离,我们需要在创建每个Stream实例之前对缓冲区的当前状态进行快照。我们将使用数组支持Stream实例,以使它们尽可能轻量级。

由于Java不支持通用数组,我们需要做一些丑陋的转换:

if (buffer.size() == windowSize) {action.accept(Arrays.stream((T[]) buffer.toArray(new Object[0])));buffer.poll();return sourceIterator.hasNext();}

...瞧,我们准备好使用它:

windowed(List.of(1,2,3,4,5), 3).map(group -> group.collect(toList())).forEach(System.out::println);

滑窗代码编制成功,运行结果如下:

// result[1, 2, 3][2, 3, 4][3, 4, 5]

转载于:https://my.oschina.net/u/3906190/blog/1942497

你可能感兴趣的文章
字符串与整数之间的转换
查看>>
断点传输HTTP和URL协议
查看>>
redis 数据类型详解 以及 redis适用场景场合
查看>>
mysql服务器的主从配置
查看>>
巧用AJAX技术,通过updatePanel控件实现局部刷新
查看>>
20140420技术交流活动总结
查看>>
SaltStack配置salt-api
查看>>
各种情况下block的类型
查看>>
ThinkPHP 3.2.x 集成极光推送指北
查看>>
MYSQL 表情评论存储(emoji)
查看>>
js作用域链
查看>>
java中如何选择Collection Class--java线程(第3版)
查看>>
为运维人员插上腾飞更远的翅膀!
查看>>
Word 2003中编辑标记与格式标记大讨论
查看>>
从国内向海外转移域名经验谈
查看>>
浅谈apache与tomact的整合
查看>>
SQL Server vNext CTP1 on Linux
查看>>
1-为 Lync Server 2010 准备 Active Directory 域服务
查看>>
SELinux安全
查看>>
NetBackup下ORACLE恢复测试方案实例解析
查看>>