版本:python3.8,pyflink1.16.1 集群基于standalone搭建,在上边跑批任务跑了几天,频率比较高,发现taskmanager挂了,之后查询原因定位到是使用了udf的情况就会导致Metaspace内存增加。 然后写了个简单的测试代码印证了下,
from pyflink.table.udf import udf
@udf(result_type=DataTypes.BIGINT())
def convert_to_timestamp(cur_date: datetime):
return cur_date.timestamp() * 1000
s_env = self.getStreamEnv()
s_env.set_parallelism(1)
st_env = self.getTableEnv()
mysql_source = """
CREATE TABLE if not exists marketplace_rank_sale_daily_mysql_src (
`id` INT,
`category_id` BIGINT comment '分类ID',
`begin_rank_scale` INT comment '排名起始区间',
`rank` FLOAT comment '排名',
`quantity` FLOAT comment '销量',
`date` DATE comment '日期',
`created_time` TIMESTAMP(0) comment '创建时间',
PRIMARY KEY (`category_id`,`begin_rank_scale`,`date`) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'xxx',
'table-name' = 'xxx',
'username' = 'xxx',
'password' = 'xxx',
'driver' = 'com.mysql.cj.jdbc.Driver'
)
"""
st_env.execute_sql(mysql_source)
st_env.create_temporary_function("convert_to_timestamp", convert_to_timestamp)
st_env.execute_sql("SELECT category_id, `rank`, quantity,`date`, `created_time`,convert_to_timestamp(created_time) as time_number FROM marketplace_rank_sale_daily_mysql_src").print()
没引入其他jar包,都放在flink_home/lib下,类加载器用的ParentFirstClassLoader,不知道该怎么解决了?
该版本session集群存在此问题,只能更换成yarn application模式,具体修复版本未知。
猜测可能存在版本不兼容的问题。建议升级或降级相应的依赖库版本再试试。