tongchenkeji 发表于:2023-3-27 13:46:400次点击 已关注取消关注 关注 私信 请大家帮我看看下列问题是什么情况?[阿里云实时计算 Flink版] 暂停朗读为您朗读 为啥我这个窗口函数里面的 log一直没数据呢。上面那个有输出。 logger.info(“kafkaStream Window…..”); 没有。 看起来是执行了。 「点点赞赏,手留余香」 赞赏 还没有人赞赏,快来当第一个赞赏的人吧! 海报 实时计算Flink版# 云消息队列 Kafka 版375# 实时计算 Flink版3179
爱回答的三好学生AM 2023-11-27 18:48:55 1 /** *todo: 2023/3/20 23:14 九师兄 * * 测试数据 {“id”:”11″,”name”:”akka”,”password”:”123″,”password1″:”123″,”age”:1} * 输出结果:1 分钟内收集到 student 的数据条数是:7 **/ public void timeWindowAll() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); //2.定义加载或创建数据源(source),监听9000端口的socket消息 DataStream textStream9000 = env.socketTextStream(“localhost”, 9992, “”); SingleOutputStreamOperator student = textStream9000.map(string -> JSONObject.parseObject(string, Student.class));student.timeWindowAll(Time.seconds(5)).apply(new AllWindowFunction, TimeWindow>() { @Override public void apply(TimeWindow window, Iterable values, Collector> out) throws Exception { ArrayList students = Lists.newArrayList(values); if (students.size() > 0) { System.out.println("1 分钟内收集到 student 的数据条数是:" + students.size()); out.collect(students); } }}).print();env.execute("flink learning connectors kafka"); }我测试是可以的 你对比一下。此回答整理自钉群“【③群】Apache Flink China社区”
/** *todo: 2023/3/20 23:14 九师兄 * * 测试数据 {“id”:”11″,”name”:”akka”,”password”:”123″,”password1″:”123″,”age”:1} * 输出结果:1 分钟内收集到 student 的数据条数是:7 **/ public void timeWindowAll() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); //2.定义加载或创建数据源(source),监听9000端口的socket消息 DataStream textStream9000 = env.socketTextStream(“localhost”, 9992, “”);
}我测试是可以的 你对比一下。此回答整理自钉群“【③群】Apache Flink China社区”
Flink中如何使用TimeWindowAll https://www.yisu.com/zixun/525527.html