tongchenkeji 发表于:2023-11-30 17:22:360次点击 已关注取消关注 关注 私信 用flink doris1.1.1的版本 ,可以对doris维表进行 lookup join么?[阿里云] 暂停朗读为您朗读 用flink doris connector 1.1.1的版本 ,在doris 2.0版本上,可以对doris维表进行 lookup join么? 「点点赞赏,手留余香」 赞赏 还没有人赞赏,快来当第一个赞赏的人吧! 海报 阿里云# 实时计算 Flink版3179# 流计算2236
小周sirAM 2023-12-3 11:47:13 1 根据Flink官方文档和Flink Doris Connector的说明,Flink Doris Connector 1.1.1版本可以与Doris 2.0版本兼容,并支持在Flink中对Doris维表进行lookup join。 通过使用Flink Doris Connector,您可以将Doris作为Flink的外部系统进行连接,并利用Doris表的数据进行lookup join操作。这样可以在Flink作业中使用Doris表的数据来丰富和补充流数据。例如,将实时事件流与Doris维度表进行关联查询,以获取更多有关事件的信息。 注意,为了进行lookup join操作,您需要确保正确配置Flink Doris Connector,并正确设置lookup表和主表之间的关联条件。此外,还应注意Flink任务的并行性和资源管理,以优化join操作的性能和稳定性。
三掌柜666AM 2023-12-3 11:47:13 2 楼主你好,看了你的问题,根据阿里云 Flink Doris Connector 1.1.1 的文档,支持对 Doris 维表进行 Lookup Join。但是Doris 2.0 版本中的一些新特性可能会影响到该功能的使用,所以需要在使用时需要进行测试和验证。 在使用 Flink Doris Connector 进行 Lookup Join 时,需要创建一个维表 TableSource,并在 StreamTableEnvironment 中注册。示例代码如下: import org.apache.flink.api.common.typeinfo.Types;import org.apache.flink.api.java.typeutils.RowTypeInfo;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.TableSchema;import org.apache.flink.table.api.java.StreamTableEnvironment;import org.apache.flink.table.descriptors.Schema;import org.apache.flink.table.types.logical.VarCharType;import org.apache.flink.types.Row;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.functions.source.SourceFunction;import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;import org.apache.flink.streaming.api.functions.source.SourceFunction.TimestampedSourceContext;import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;import org.apache.flink.streaming.api.functions.source.RichSourceFunction;import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;import org.apache.flink.table.api.TableEnvironment;import org.apache.flink.table.api.java.BatchTableEnvironment;import org.apache.flink.table.api.java.StreamTableEnvironment;import org.apache.flink.table.api.java.TableEnvironment;import org.apache.flink.table.descriptors.Kafka;import org.apache.flink.table.descriptors.Schema;import org.apache.flink.table.descriptors.Rowtime;import org.apache.flink.table.descriptors.Rowtime;import org.apache.flink.table.descriptors.StreamTableDescriptor;import org.apache.flink.table.descriptors.StreamTableEnvironmentDescriptor;import org.apache.flink.table.descriptors.StreamTableSourceDescriptor;import org.apache.flink.table.descriptors.TimedRowtime;import org.apache.flink.table.descriptors.TimedRowtime;import org.apache.flink.table.sources.LookupableTableSource;import org.apache.flink.table.sources.RowtimeAttributeDescriptor;import org.apache.flink.table.sources.TableSource;import org.apache.flink.table.types.DataType;import org.apache.flink.util.CloseableIterator;import com.alibaba.fastjson.JSONObject;import com.google.common.collect.ImmutableList;import com.google.common.collect.Lists;import java.util.HashMap;import java.util.List;import java.util.Map;public class DorisLookupJoinExample { public static void main(String[] args) { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); // 定义流表的 schema TableSchema schema = new TableSchema(new String[]{"id", "name", "age"}, new DataType[]{ DataTypes.INT(), DataTypes.STRING(), DataTypes.INT() }); // 定义一个维表的 schema TableSchema lookupSchema = new TableSchema(new String[]{"id", "address"}, new DataType[]{ DataTypes.INT(), DataTypes.STRING() }); // 创建一个数据流作为流表 DataStream<Row> stream = env.addSource(new SourceFunction<Row>() { private boolean isRunning = true; private int count = 0; @Override public void run(SourceContext<Row> ctx) throws Exception { while (isRunning) { count++; Row row = Row.of(count, "abc", count % 30); ctx.collect(row); Thread.sleep(1000); } } @Override public void cancel() { isRunning = false; } }).returns(new RowTypeInfo(schema.getFieldTypes(), schema.getFieldNames())); // 创建一个维表 TableSource DorisTableSource lookupTable = DorisTableSource.builder() .withSchema(lookupSchema) .fromProperties(props) .build(); // 在 TableEnvironment 中注册流表和维表 tableEnv.registerDataStream("stream_table", stream, "id, name, age"); tableEnv.registerTableSource("lookup_table", lookupTable); // 执行 Lookup Join 操作 Table result = tableEnv.sqlQuery( "SELECT id, name, age, address FROM stream_table " + "LEFT JOIN lookup_table FOR SYSTEM_TIME AS OF PROCTIME() " + "ON stream_table.id = lookup_table.id"); // 输出结果 result.printSchema(); result.toAppendStream(Row.class).print(); // 执行 Flink 任务 env.execute("Doris Lookup Join Example"); } public static class DorisTableSource implements LookupableTableSource<Row> { private final TableSchema schema; private final Map<String, String> properties; public DorisTableSource(TableSchema schema, Map<String, String> properties) { this.schema = schema; this.properties = properties; } @Override public TableSchema getTableSchema() { return schema; } @Override public boolean isAsyncEnabled() { return false; } @Override public DataType getProducedDataType() { return DataTypes.createRowType(schema.getFieldDataTypes(), schema.getFieldNames()); } @Override public void applyLookup(List<Row> rows, LookupContext context, ResultFuture<Row> resultFuture) throws Exception { for (Row row : rows) { // 从 Doris 维表中查询数据,并将查询结果添加到输出结果中 // TODO: 实现从 Doris 维表中查询数据的逻辑 resultFuture.complete(Lists.newArrayList(row)); } } @Override public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext lookupContext) { return LookupRuntimeProvider.of(new DorisLookupFunction(this)); } public static DorisTableSourceBuilder builder() { return new DorisTableSourceBuilder(); } private static class DorisTableSourceBuilder { private TableSchema schema; private Map<String, String> properties; public DorisTableSourceBuilder withSchema(TableSchema schema) { this.schema = schema; return this; } public DorisTableSourceBuilder fromProperties(Map<String, String> properties) { this.properties = properties; return this; } public DorisTableSource build() { return new DorisTableSource(schema, properties); } } } public static class DorisLookupFunction extends AsyncTableFunction<Row> { private final DorisTableSource tableSource; public DorisLookupFunction(DorisTableSource tableSource) { this.tableSource = tableSource; } public void eval(Integer id, CloseableIterator<Row> resultFuture) { // 从 Doris 维表中查询数据,并将查询结果添加到输出结果中 // TODO: 实现从 Doris 维表中查询数据的逻辑 resultFuture.close(); } }} 所以还是需要你根据实际情况实现 DorisLookupFunction 中的逻辑,从 Doris 维表中查询数据。
根据Flink官方文档和Flink Doris Connector的说明,Flink Doris Connector 1.1.1版本可以与Doris 2.0版本兼容,并支持在Flink中对Doris维表进行lookup join。
通过使用Flink Doris Connector,您可以将Doris作为Flink的外部系统进行连接,并利用Doris表的数据进行lookup join操作。这样可以在Flink作业中使用Doris表的数据来丰富和补充流数据。例如,将实时事件流与Doris维度表进行关联查询,以获取更多有关事件的信息。
注意,为了进行lookup join操作,您需要确保正确配置Flink Doris Connector,并正确设置lookup表和主表之间的关联条件。此外,还应注意Flink任务的并行性和资源管理,以优化join操作的性能和稳定性。
楼主你好,看了你的问题,根据阿里云 Flink Doris Connector 1.1.1 的文档,支持对 Doris 维表进行 Lookup Join。但是Doris 2.0 版本中的一些新特性可能会影响到该功能的使用,所以需要在使用时需要进行测试和验证。
在使用 Flink Doris Connector 进行 Lookup Join 时,需要创建一个维表
TableSource
,并在StreamTableEnvironment
中注册。示例代码如下:所以还是需要你根据实际情况实现
DorisLookupFunction
中的逻辑,从 Doris 维表中查询数据。