大数据

kafka2doris 1-n表同步

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.doris.flink.cfg.DorisExecutionOptions;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;
import org.apache.doris.flink.sink.DorisSink;
import org.apache.doris.flink.sink.writer.serializer.JsonDebeziumSchemaSerializer;
import org.apache.doris.flink.tools.cdc.ParsingProcessFunction;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.*;

public class TestSinkToDorisWithString2 {
    public static void main(String[] args) throws Exception {
        Configuration configuration = new Configuration();
        configuration.setString("rest.port", "8088");
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration);
        env.setParallelism(1);
        env.enableCheckpointing(3000);
        KafkaSource<Tuple2<String, String>> kafkaSource = KafkaSource.<Tuple2<String, String>>builder()
                .setBootstrapServers("xxx.xxx.xxx.xxx:9092")
                .setTopics("bigdatatest")
                .setStartingOffsets(OffsetsInitializer.latest())
                .setGroupId("bigdatatest01")
                .setDeserializer(new KafkaRecordDeserializationSchema<Tuple2<String, String>>() {
                    @Override
                    public void deserialize(ConsumerRecord<byte[], byte[]> consumerRecord, Collector<Tuple2<String, String>> collector) throws IOException {
                        collector.collect(Tuple2.of(consumerRecord.topic(), new String(consumerRecord.value(), StandardCharsets.UTF_8)));
                    }

                    @Override
                    public TypeInformation<Tuple2<String, String>> getProducedType() {
                        return new TupleTypeInfo<>(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
                    }
                }).build();
        DataStreamSource<Tuple2<String, String>> stream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafka_source");


        SingleOutputStreamOperator<String> flatStream = stream.flatMap(new FlatMapFunction<Tuple2<String, String>, String>() {
            @Override
            public void flatMap(Tuple2<String, String> value, Collector<String> out) throws Exception {
                JSONObject data = JSON.parseObject(value.f1);
                JSONObject payload = data.getJSONObject("payload");
                JSONObject source = payload.getJSONObject("source");
                String table_name = source.getString("table");
                if ("orders".equals(table_name)
                        || "products".equals(table_name)
                ) {
                    out.collect(payload.toJSONString());
                }
            }
        });

        flatStream.print();
        List<Tuple2<String, String>> dorisTables = new ArrayList<>();
        // 库名 表名
        dorisTables.add(Tuple2.of("test", "test_orders_full"));
        dorisTables.add(Tuple2.of("test", "test_products_full"));

        SingleOutputStreamOperator<Void> parsedStream = flatStream.process(new ProcessFunction<String, Void>() {
            public transient Map<String, OutputTag<String>> recordOutputTags;

            @Override
            public void open(Configuration parameters) throws Exception {
                recordOutputTags = new HashMap<>();
            }

            @Override
            public void processElement(String record, ProcessFunction<String, Void>.Context ctx, Collector<Void> out) throws Exception {
                String tableName = getRecordTableName(record);
                String dorisName = "test_" + tableName + "_full";
                OutputTag<String> recordOutputTag = getRecordOutputTag(dorisName);
                ctx.output(recordOutputTag, record);
            }

            public String getRecordTableName(String record) throws Exception {
                JSONObject data = JSON.parseObject(record);
                JSONObject source = data.getJSONObject("source");
                return source.getString("table");
            }

            public OutputTag<String> getRecordOutputTag(String tableName) {
                return recordOutputTags.computeIfAbsent(
                        tableName, ParsingProcessFunction::createRecordOutputTag);
            }

        });

        for (Tuple2<String, String> dbTbl : dorisTables) {
            OutputTag<String> recordOutputTag = ParsingProcessFunction.createRecordOutputTag(dbTbl.f1);
            DataStream<String> sideOutput = parsedStream.getSideOutput(recordOutputTag);
            String uidName = dbTbl.f1; // dorisTableName
            sideOutput
                    .sinkTo(buildDorisSink(dbTbl.f0, dbTbl.f1))
                    .setParallelism(1)
                    .name(uidName)
                    .uid(uidName);
        }

        env.execute();
    }

    public static DorisSink<String> buildDorisSink(String databaseName, String tableName) {
        String tableIdentifier = databaseName + "." + tableName;
        DorisSink.Builder<String> builder = DorisSink.builder();
        DorisOptions.Builder dorisBuilder = DorisOptions.builder();
        dorisBuilder
                .setJdbcUrl("jdbc:mysql://xxx.xxx.xxx.xxx:9030")
                .setFenodes("xxx.xxx.xxx.xxx:8030")
                .setUsername("root")
                .setPassword("root");
        dorisBuilder.setTableIdentifier(tableIdentifier);
        Properties pro = new Properties();
        // default json data format
        pro.setProperty("format", "json");
        pro.setProperty("read_json_by_line", "true");
        DorisExecutionOptions.Builder executionBuilder =
                DorisExecutionOptions.builder().setLabelPrefix("label-doris" + System.currentTimeMillis()).setStreamLoadProp(pro);
        DorisExecutionOptions executionOptions = executionBuilder.build();
        Map<String, String> tableMapping = new HashMap<>();
        Map<String, String> tableConfig = new HashMap<>();
        String sourceTableName = tableName.replace("test_", "").replace("_full", "");
        tableMapping.put("test." + sourceTableName, "test.test_" + sourceTableName + "_full");
        tableConfig.put("replication_num", "1");
        tableConfig.put("light_schema_change", "true");
        JsonDebeziumSchemaSerializer debeziumSchemaSerializer = JsonDebeziumSchemaSerializer.builder()
                .setDorisOptions(dorisBuilder.build())
                .setNewSchemaChange(true)
                .setExecutionOptions(executionOptions)
                .setTableMapping(tableMapping)
                .setTableProperties(tableConfig)
                .setTargetDatabase(databaseName)
                .setTargetTablePrefix("test_")
                .setTargetTableSuffix("_full")
                .build();
        builder.setDorisReadOptions(DorisReadOptions.builder().build())
                .setDorisExecutionOptions(executionOptions)
                .setSerializer(debeziumSchemaSerializer)
                .setDorisOptions(dorisBuilder.build());
        return builder.build();
    }


}