Pyflink1.16.1使用UDF会导致JVM Metaspace内存泄漏[阿里云实时计算 Flink版]

版本:python3.8,pyflink1.16.1 集群基于standalone搭建,在上边跑批任务跑了几天,频率比较高,发现taskmanager挂了,之后查询原因定位到是使用了udf的情况就会导致Metaspace内存增加。 然后写了个简单的测试代码印证了下,

        from pyflink.table.udf import udf

        @udf(result_type=DataTypes.BIGINT())
        def convert_to_timestamp(cur_date: datetime):
              return cur_date.timestamp() * 1000

        s_env = self.getStreamEnv()
        s_env.set_parallelism(1)
        st_env = self.getTableEnv()


        mysql_source = """
        CREATE TABLE if not exists marketplace_rank_sale_daily_mysql_src (
        `id` INT,
        `category_id` BIGINT  comment '分类ID',
        `begin_rank_scale` INT comment '排名起始区间',
        `rank` FLOAT comment '排名',  
        `quantity` FLOAT  comment '销量',
        `date` DATE  comment '日期',
        `created_time` TIMESTAMP(0) comment '创建时间',
        PRIMARY KEY (`category_id`,`begin_rank_scale`,`date`) NOT ENFORCED
        ) WITH (
        'connector' = 'jdbc',
        'url' = 'xxx',
        'table-name' = 'xxx',
        'username' = 'xxx',
        'password' = 'xxx',
        'driver' = 'com.mysql.cj.jdbc.Driver'
        )
        """

        st_env.execute_sql(mysql_source)


        st_env.create_temporary_function("convert_to_timestamp", convert_to_timestamp)
        st_env.execute_sql("SELECT category_id, `rank`, quantity,`date`,         `created_time`,convert_to_timestamp(created_time) as time_number FROM marketplace_rank_sale_daily_mysql_src").print()

没引入其他jar包,都放在flink_home/lib下,类加载器用的ParentFirstClassLoader,不知道该怎么解决了?

「点点赞赏,手留余香」

    还没有人赞赏,快来当第一个赞赏的人吧!
=====这是一个广告位,招租中,联系qq 78315851====
1 条回复 A 作者 M 管理员
  1. 该版本session集群存在此问题,只能更换成yarn application模式,具体修复版本未知。

  2. 猜测可能存在版本不兼容的问题。建议升级或降级相应的依赖库版本再试试。