我现在从ES读取数据初始化了一个MemSourceStreamOp,然后把他转成了StreamOperator,然后我通过滑动窗口获取一些特征,但是我随时需要读取新的ES数据,需要加到StreamOperator中,让这个滑动窗口再加入数据之后继续滑动。但是我没看到这种类似的方式,这个机器学习PAI需求可以实现吗?
这个机器学习PAI需求可以实现吗?[阿里云机器学习PAI]
「点点赞赏,手留余香」
还没有人赞赏,快来当第一个赞赏的人吧!
我现在从ES读取数据初始化了一个MemSourceStreamOp,然后把他转成了StreamOperator,然后我通过滑动窗口获取一些特征,但是我随时需要读取新的ES数据,需要加到StreamOperator中,让这个滑动窗口再加入数据之后继续滑动。但是我没看到这种类似的方式,这个机器学习PAI需求可以实现吗?
阿里云机器学习PAI是支持动态添加数据到StreamOperator的。在你的场景中,你需要通过DataStreamSourceFunction来不断地从ES读取新的数据,然后将其加入到已有的DataStream中。
具体实现方法如下:
具体代码示例如下:
通过上述方法,你可以实现随时向已有的StreamOperator中添加数据,并保证滑动窗口能够随时获取新的数据并进行计算。
是的,您可以使用DataStreamSourceStreamOp组件从Elasticsearch中读取数据,并将其转换为DataStreamOperator,然后使用DataStreamOperator.addSink()方法将数据发送到您的滑动窗口操作中。
当新的数据到达时,您可以使用DataStreamOperator.addSink()方法将其发送到StreamOperator中进行处理。这样,当新的数据到达时,它会被添加到StreamOperator中,并随着滑动窗口一起被滑动处理。
现在还没有es的数据源,简单的,你可以弄一个Kafka,做同样的逻辑。此回答整理自钉群“Alink开源–用户群”