用flink doris1.1.1的版本 ,可以对doris维表进行 lookup join么?[阿里云]

用flink doris connector 1.1.1的版本 ,在doris 2.0版本上,可以对doris维表进行 lookup join么?

「点点赞赏,手留余香」

    还没有人赞赏,快来当第一个赞赏的人吧!
=====这是一个广告位,招租中,联系qq 78315851====
1 条回复 A 作者 M 管理员
  1. 根据Flink官方文档和Flink Doris Connector的说明,Flink Doris Connector 1.1.1版本可以与Doris 2.0版本兼容,并支持在Flink中对Doris维表进行lookup join。

    通过使用Flink Doris Connector,您可以将Doris作为Flink的外部系统进行连接,并利用Doris表的数据进行lookup join操作。这样可以在Flink作业中使用Doris表的数据来丰富和补充流数据。例如,将实时事件流与Doris维度表进行关联查询,以获取更多有关事件的信息。

    注意,为了进行lookup join操作,您需要确保正确配置Flink Doris Connector,并正确设置lookup表和主表之间的关联条件。此外,还应注意Flink任务的并行性和资源管理,以优化join操作的性能和稳定性。

  2. 楼主你好,看了你的问题,根据阿里云 Flink Doris Connector 1.1.1 的文档,支持对 Doris 维表进行 Lookup Join。但是Doris 2.0 版本中的一些新特性可能会影响到该功能的使用,所以需要在使用时需要进行测试和验证。

    在使用 Flink Doris Connector 进行 Lookup Join 时,需要创建一个维表 TableSource,并在 StreamTableEnvironment 中注册。示例代码如下:

    import org.apache.flink.api.common.typeinfo.Types;import org.apache.flink.api.java.typeutils.RowTypeInfo;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.TableSchema;import org.apache.flink.table.api.java.StreamTableEnvironment;import org.apache.flink.table.descriptors.Schema;import org.apache.flink.table.types.logical.VarCharType;import org.apache.flink.types.Row;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.functions.source.SourceFunction;import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;import org.apache.flink.streaming.api.functions.source.SourceFunction.TimestampedSourceContext;import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;import org.apache.flink.streaming.api.functions.source.RichSourceFunction;import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;import org.apache.flink.table.api.TableEnvironment;import org.apache.flink.table.api.java.BatchTableEnvironment;import org.apache.flink.table.api.java.StreamTableEnvironment;import org.apache.flink.table.api.java.TableEnvironment;import org.apache.flink.table.descriptors.Kafka;import org.apache.flink.table.descriptors.Schema;import org.apache.flink.table.descriptors.Rowtime;import org.apache.flink.table.descriptors.Rowtime;import org.apache.flink.table.descriptors.StreamTableDescriptor;import org.apache.flink.table.descriptors.StreamTableEnvironmentDescriptor;import org.apache.flink.table.descriptors.StreamTableSourceDescriptor;import org.apache.flink.table.descriptors.TimedRowtime;import org.apache.flink.table.descriptors.TimedRowtime;import org.apache.flink.table.sources.LookupableTableSource;import org.apache.flink.table.sources.RowtimeAttributeDescriptor;import org.apache.flink.table.sources.TableSource;import org.apache.flink.table.types.DataType;import org.apache.flink.util.CloseableIterator;import com.alibaba.fastjson.JSONObject;import com.google.common.collect.ImmutableList;import com.google.common.collect.Lists;import java.util.HashMap;import java.util.List;import java.util.Map;public class DorisLookupJoinExample {  public static void main(String[] args) {    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();    StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);    // 定义流表的 schema    TableSchema schema = new TableSchema(new String[]{"id", "name", "age"}, new DataType[]{        DataTypes.INT(),        DataTypes.STRING(),        DataTypes.INT()    });    // 定义一个维表的 schema    TableSchema lookupSchema = new TableSchema(new String[]{"id", "address"}, new DataType[]{        DataTypes.INT(),        DataTypes.STRING()    });    // 创建一个数据流作为流表    DataStream<Row> stream = env.addSource(new SourceFunction<Row>() {      private boolean isRunning = true;      private int count = 0;      @Override      public void run(SourceContext<Row> ctx) throws Exception {        while (isRunning) {          count++;          Row row = Row.of(count, "abc", count % 30);          ctx.collect(row);          Thread.sleep(1000);        }      }      @Override      public void cancel() {        isRunning = false;      }    }).returns(new RowTypeInfo(schema.getFieldTypes(), schema.getFieldNames()));    // 创建一个维表 TableSource    DorisTableSource lookupTable = DorisTableSource.builder()        .withSchema(lookupSchema)        .fromProperties(props)        .build();    // 在 TableEnvironment 中注册流表和维表    tableEnv.registerDataStream("stream_table", stream, "id, name, age");    tableEnv.registerTableSource("lookup_table", lookupTable);    // 执行 Lookup Join 操作    Table result = tableEnv.sqlQuery(        "SELECT id, name, age, address FROM stream_table " +        "LEFT JOIN lookup_table FOR SYSTEM_TIME AS OF PROCTIME() " +        "ON stream_table.id = lookup_table.id");    // 输出结果    result.printSchema();    result.toAppendStream(Row.class).print();    // 执行 Flink 任务    env.execute("Doris Lookup Join Example");  }  public static class DorisTableSource implements LookupableTableSource<Row> {    private final TableSchema schema;    private final Map<String, String> properties;    public DorisTableSource(TableSchema schema, Map<String, String> properties) {      this.schema = schema;      this.properties = properties;    }    @Override    public TableSchema getTableSchema() {      return schema;    }    @Override    public boolean isAsyncEnabled() {      return false;    }    @Override    public DataType getProducedDataType() {      return DataTypes.createRowType(schema.getFieldDataTypes(), schema.getFieldNames());    }    @Override    public void applyLookup(List<Row> rows, LookupContext context, ResultFuture<Row> resultFuture) throws Exception {      for (Row row : rows) {        // 从 Doris 维表中查询数据,并将查询结果添加到输出结果中        // TODO: 实现从 Doris 维表中查询数据的逻辑        resultFuture.complete(Lists.newArrayList(row));      }    }    @Override    public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext lookupContext) {      return LookupRuntimeProvider.of(new DorisLookupFunction(this));    }    public static DorisTableSourceBuilder builder() {      return new DorisTableSourceBuilder();    }    private static class DorisTableSourceBuilder {      private TableSchema schema;      private Map<String, String> properties;      public DorisTableSourceBuilder withSchema(TableSchema schema) {        this.schema = schema;        return this;      }      public DorisTableSourceBuilder fromProperties(Map<String, String> properties) {        this.properties = properties;        return this;      }      public DorisTableSource build() {        return new DorisTableSource(schema, properties);      }          }  }  public static class DorisLookupFunction extends AsyncTableFunction<Row> {    private final DorisTableSource tableSource;    public DorisLookupFunction(DorisTableSource tableSource) {      this.tableSource = tableSource;    }    public void eval(Integer id, CloseableIterator<Row> resultFuture) {      // 从 Doris 维表中查询数据,并将查询结果添加到输出结果中      // TODO: 实现从 Doris 维表中查询数据的逻辑      resultFuture.close();    }  }}

    所以还是需要你根据实际情况实现 DorisLookupFunction 中的逻辑,从 Doris 维表中查询数据。