这个机器学习PAI需求可以实现吗?[阿里云机器学习PAI]

我现在从ES读取数据初始化了一个MemSourceStreamOp,然后把他转成了StreamOperator,然后我通过滑动窗口获取一些特征,但是我随时需要读取新的ES数据,需要加到StreamOperator中,让这个滑动窗口再加入数据之后继续滑动。但是我没看到这种类似的方式,这个机器学习PAI需求可以实现吗?

「点点赞赏,手留余香」

    还没有人赞赏,快来当第一个赞赏的人吧!
=====这是一个广告位,招租中,联系qq 78315851====
2 条回复 A 作者 M 管理员
  1. 阿里云机器学习PAI是支持动态添加数据到StreamOperator的。在你的场景中,你需要通过DataStreamSourceFunction来不断地从ES读取新的数据,然后将其加入到已有的DataStream中。

    具体实现方法如下:

    1. 创建一个DataStreamSourceFunction,在其中实现从ES中读取数据的逻辑。
    2. 在初始化MemSourceStreamOp时,使用fromDataStream()方法将其转换为DataStream,并保存其状态。
    3. 在创建DataStreamSourceFunction后,使用addSource()方法将其加入到已有的DataStream中。
    4. 在创建滑动窗口时,设置允许延迟结果输出的时间,即allowedLateness方法,以确保当新的数据加入时仍能够继续滑动窗口。

    具体代码示例如下:

    # 从ES中读取数据的DataStreamSourceFunctionclass ESDataGenerator(SourceFunction):    def run(self, ctx: SourceContext) -> None:        # 从ES中读取数据,并将其转换成DataStreamElement类型加入到DataStream中        while True:            data = read_from_ES()            ctx.collect(DataStreamElement(data, timestamp))    def cancel(self):        pass# 初始化MemSourceStreamOpmem_source = MemSourceStreamOp([("col1", "int"), ("col2", "string")], [[1, "a"], [2, "b"]])# 将MemSourceStreamOp转换成DataStream,并保存其状态data_stream = mem_source.link(StreamOperator())# 加入新数据的DataStreamSourceFunctiones_generator = ESDataGenerator()data_stream.add_source(es_generator)# 创建滑动窗口,设置allowedLateness参数windowed_stream = data_stream.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))        .allowedLateness(Time.seconds(5)))# 对滑动窗口进行一些操作,如获取特征等...

    通过上述方法,你可以实现随时向已有的StreamOperator中添加数据,并保证滑动窗口能够随时获取新的数据并进行计算。

  2. 是的,您可以使用DataStreamSourceStreamOp组件从Elasticsearch中读取数据,并将其转换为DataStreamOperator,然后使用DataStreamOperator.addSink()方法将数据发送到您的滑动窗口操作中。

    当新的数据到达时,您可以使用DataStreamOperator.addSink()方法将其发送到StreamOperator中进行处理。这样,当新的数据到达时,它会被添加到StreamOperator中,并随着滑动窗口一起被滑动处理。

  3. 现在还没有es的数据源,简单的,你可以弄一个Kafka,做同样的逻辑。此回答整理自钉群“Alink开源–用户群”