/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.streamer;

import java.util.Map;
import java.util.Properties;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.formats.common.TimestampFormat;
import org.apache.flink.formats.json.JsonRowDataDeserializationSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.logical.RowType;
import org.apache.hudi.com.beust.jcommander.JCommander;
import org.apache.hudi.common.config.DFSPropertiesConfiguration;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.OptionsInference;
import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.sink.transform.Transformer;
import org.apache.hudi.sink.utils.Pipelines;
import org.apache.hudi.streamer.FlinkStreamerConfig;
import org.apache.hudi.util.AvroSchemaConverter;
import org.apache.hudi.util.StreamerUtil;

public class HoodieFlinkStreamer {
    public static void main(String[] args2) throws Exception {
        DataStream<Object> pipeline;
        Option<Transformer> transformer;
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        FlinkStreamerConfig cfg = new FlinkStreamerConfig();
        JCommander cmd = new JCommander((Object)cfg, null, args2);
        if (cfg.help.booleanValue() || args2.length == 0) {
            cmd.usage();
            System.exit(1);
        }
        env.enableCheckpointing(cfg.checkpointInterval.longValue());
        env.getConfig().setGlobalJobParameters((ExecutionConfig.GlobalJobParameters)cfg);
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
        env.setStateBackend(cfg.stateBackend);
        if (cfg.flinkCheckPointPath != null) {
            env.getCheckpointConfig().setCheckpointStorage(cfg.flinkCheckPointPath);
        }
        TypedProperties kafkaProps = DFSPropertiesConfiguration.getGlobalProps();
        kafkaProps.putAll((Map<?, ?>)StreamerUtil.appendKafkaProps(cfg));
        Configuration conf = FlinkStreamerConfig.toFlinkConfig(cfg);
        RowType rowType = (RowType)AvroSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(conf)).getLogicalType();
        long ckpTimeout = env.getCheckpointConfig().getCheckpointTimeout();
        conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, ckpTimeout);
        DataStream<RowData> dataStream = env.addSource((SourceFunction)new FlinkKafkaConsumer(cfg.kafkaTopic, (DeserializationSchema)new JsonRowDataDeserializationSchema(rowType, (TypeInformation<RowData>)InternalTypeInfo.of((RowType)rowType), false, true, TimestampFormat.ISO_8601), (Properties)kafkaProps)).name("kafka_source").uid("uid_kafka_source");
        if (cfg.transformerClassNames != null && !cfg.transformerClassNames.isEmpty() && (transformer = StreamerUtil.createTransformer(cfg.transformerClassNames)).isPresent()) {
            dataStream = transformer.get().apply(dataStream);
        }
        OptionsInference.setupSinkTasks(conf, env.getParallelism());
        OptionsInference.setupClientId(conf);
        if (OptionsResolver.isAppendMode(conf)) {
            pipeline = Pipelines.append(conf, rowType, dataStream);
            if (OptionsResolver.needsAsyncClustering(conf)) {
                Pipelines.cluster(conf, rowType, pipeline);
            } else if (OptionsResolver.isLazyFailedWritesCleanPolicy(conf)) {
                Pipelines.clean(conf, pipeline);
            } else {
                Pipelines.dummySink(pipeline);
            }
        } else {
            DataStream<HoodieRecord> hoodieRecordDataStream = Pipelines.bootstrap(conf, rowType, dataStream);
            pipeline = Pipelines.hoodieStreamWrite(conf, hoodieRecordDataStream);
            if (OptionsResolver.needsAsyncCompaction(conf)) {
                Pipelines.compact(conf, pipeline);
            } else {
                Pipelines.clean(conf, pipeline);
            }
        }
        env.execute(cfg.targetTableName);
    }
}

