import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.doris.flink.cfg.DorisExecutionOptions;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;
import org.apache.doris.flink.sink.DorisSink;
import org.apache.doris.flink.sink.writer.serializer.JsonDebeziumSchemaSerializer;
import org.apache.doris.flink.tools.cdc.ParsingProcessFunction;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.*;
public class TestSinkToDorisWithString2 {
public static void main(String[] args) throws Exception {
Configuration configuration = new Configuration();
configuration.setString("rest.port", "8088");
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration);
env.setParallelism(1);
env.enableCheckpointing(3000);
KafkaSource<Tuple2<String, String>> kafkaSource = KafkaSource.<Tuple2<String, String>>builder()
.setBootstrapServers("xxx.xxx.xxx.xxx:9092")
.setTopics("bigdatatest")
.setStartingOffsets(OffsetsInitializer.latest())
.setGroupId("bigdatatest01")
.setDeserializer(new KafkaRecordDeserializationSchema<Tuple2<String, String>>() {
@Override
public void deserialize(ConsumerRecord<byte[], byte[]> consumerRecord, Collector<Tuple2<String, String>> collector) throws IOException {
collector.collect(Tuple2.of(consumerRecord.topic(), new String(consumerRecord.value(), StandardCharsets.UTF_8)));
}
@Override
public TypeInformation<Tuple2<String, String>> getProducedType() {
return new TupleTypeInfo<>(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
}
}).build();
DataStreamSource<Tuple2<String, String>> stream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafka_source");
SingleOutputStreamOperator<String> flatStream = stream.flatMap(new FlatMapFunction<Tuple2<String, String>, String>() {
@Override
public void flatMap(Tuple2<String, String> value, Collector<String> out) throws Exception {
JSONObject data = JSON.parseObject(value.f1);
JSONObject payload = data.getJSONObject("payload");
JSONObject source = payload.getJSONObject("source");
String table_name = source.getString("table");
if ("orders".equals(table_name)
|| "products".equals(table_name)
) {
out.collect(payload.toJSONString());
}
}
});
flatStream.print();
List<Tuple2<String, String>> dorisTables = new ArrayList<>();
// 库名 表名
dorisTables.add(Tuple2.of("test", "test_orders_full"));
dorisTables.add(Tuple2.of("test", "test_products_full"));
SingleOutputStreamOperator<Void> parsedStream = flatStream.process(new ProcessFunction<String, Void>() {
public transient Map<String, OutputTag<String>> recordOutputTags;
@Override
public void open(Configuration parameters) throws Exception {
recordOutputTags = new HashMap<>();
}
@Override
public void processElement(String record, ProcessFunction<String, Void>.Context ctx, Collector<Void> out) throws Exception {
String tableName = getRecordTableName(record);
String dorisName = "test_" + tableName + "_full";
OutputTag<String> recordOutputTag = getRecordOutputTag(dorisName);
ctx.output(recordOutputTag, record);
}
public String getRecordTableName(String record) throws Exception {
JSONObject data = JSON.parseObject(record);
JSONObject source = data.getJSONObject("source");
return source.getString("table");
}
public OutputTag<String> getRecordOutputTag(String tableName) {
return recordOutputTags.computeIfAbsent(
tableName, ParsingProcessFunction::createRecordOutputTag);
}
});
for (Tuple2<String, String> dbTbl : dorisTables) {
OutputTag<String> recordOutputTag = ParsingProcessFunction.createRecordOutputTag(dbTbl.f1);
DataStream<String> sideOutput = parsedStream.getSideOutput(recordOutputTag);
String uidName = dbTbl.f1; // dorisTableName
sideOutput
.sinkTo(buildDorisSink(dbTbl.f0, dbTbl.f1))
.setParallelism(1)
.name(uidName)
.uid(uidName);
}
env.execute();
}
public static DorisSink<String> buildDorisSink(String databaseName, String tableName) {
String tableIdentifier = databaseName + "." + tableName;
DorisSink.Builder<String> builder = DorisSink.builder();
DorisOptions.Builder dorisBuilder = DorisOptions.builder();
dorisBuilder
.setJdbcUrl("jdbc:mysql://xxx.xxx.xxx.xxx:9030")
.setFenodes("xxx.xxx.xxx.xxx:8030")
.setUsername("root")
.setPassword("root");
dorisBuilder.setTableIdentifier(tableIdentifier);
Properties pro = new Properties();
// default json data format
pro.setProperty("format", "json");
pro.setProperty("read_json_by_line", "true");
DorisExecutionOptions.Builder executionBuilder =
DorisExecutionOptions.builder().setLabelPrefix("label-doris" + System.currentTimeMillis()).setStreamLoadProp(pro);
DorisExecutionOptions executionOptions = executionBuilder.build();
Map<String, String> tableMapping = new HashMap<>();
Map<String, String> tableConfig = new HashMap<>();
String sourceTableName = tableName.replace("test_", "").replace("_full", "");
tableMapping.put("test." + sourceTableName, "test.test_" + sourceTableName + "_full");
tableConfig.put("replication_num", "1");
tableConfig.put("light_schema_change", "true");
JsonDebeziumSchemaSerializer debeziumSchemaSerializer = JsonDebeziumSchemaSerializer.builder()
.setDorisOptions(dorisBuilder.build())
.setNewSchemaChange(true)
.setExecutionOptions(executionOptions)
.setTableMapping(tableMapping)
.setTableProperties(tableConfig)
.setTargetDatabase(databaseName)
.setTargetTablePrefix("test_")
.setTargetTableSuffix("_full")
.build();
builder.setDorisReadOptions(DorisReadOptions.builder().build())
.setDorisExecutionOptions(executionOptions)
.setSerializer(debeziumSchemaSerializer)
.setDorisOptions(dorisBuilder.build());
return builder.build();
}
}