/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.sink.utils;

import java.io.Serializable;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.ProcessOperator;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
import org.apache.flink.table.types.logical.RowType;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.sink.CleanFunction;
import org.apache.hudi.sink.StreamWriteOperator;
import org.apache.hudi.sink.append.AppendWriteOperator;
import org.apache.hudi.sink.bootstrap.BootstrapOperator;
import org.apache.hudi.sink.bootstrap.batch.BatchBootstrapOperator;
import org.apache.hudi.sink.bulk.BulkInsertWriteOperator;
import org.apache.hudi.sink.bulk.RowDataKeyGen;
import org.apache.hudi.sink.bulk.sort.SortOperatorGen;
import org.apache.hudi.sink.common.WriteOperatorFactory;
import org.apache.hudi.sink.compact.CompactFunction;
import org.apache.hudi.sink.compact.CompactionCommitEvent;
import org.apache.hudi.sink.compact.CompactionCommitSink;
import org.apache.hudi.sink.compact.CompactionPlanEvent;
import org.apache.hudi.sink.compact.CompactionPlanOperator;
import org.apache.hudi.sink.partitioner.BucketAssignFunction;
import org.apache.hudi.sink.transform.RowDataToHoodieFunctions;
import org.apache.hudi.table.format.FilePathUtils;

public class Pipelines {
    public static DataStreamSink<Object> bulkInsert(Configuration conf, RowType rowType, DataStream<RowData> dataStream) {
        WriteOperatorFactory operatorFactory = BulkInsertWriteOperator.getFactory(conf, rowType);
        String[] partitionFields = FilePathUtils.extractPartitionKeys(conf);
        if (partitionFields.length > 0) {
            RowDataKeyGen rowDataKeyGen = RowDataKeyGen.instance(conf, rowType);
            if (conf.getBoolean(FlinkOptions.WRITE_BULK_INSERT_SHUFFLE_BY_PARTITION)) {
                dataStream = dataStream.keyBy(rowDataKeyGen::getPartitionPath);
            }
            if (conf.getBoolean(FlinkOptions.WRITE_BULK_INSERT_SORT_BY_PARTITION)) {
                SortOperatorGen sortOperatorGen = new SortOperatorGen(rowType, partitionFields);
                dataStream = dataStream.transform("partition_key_sorter", TypeInformation.of(RowData.class), sortOperatorGen.createSortOperator()).setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS));
                ExecNodeUtil.setManagedMemoryWeight((Transformation)dataStream.getTransformation(), (long)((long)conf.getInteger(FlinkOptions.WRITE_SORT_MEMORY) * 1024L * 1024L));
            }
        }
        return dataStream.transform("hoodie_bulk_insert_write", TypeInformation.of(Object.class), operatorFactory).setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS)).addSink((SinkFunction)DummySink.INSTANCE).name("dummy");
    }

    public static DataStreamSink<Object> append(Configuration conf, RowType rowType, DataStream<RowData> dataStream) {
        WriteOperatorFactory operatorFactory = AppendWriteOperator.getFactory(conf, rowType);
        return dataStream.transform("hoodie_append_write", TypeInformation.of(Object.class), operatorFactory).uid("uid_hoodie_stream_write" + conf.getString(FlinkOptions.TABLE_NAME)).setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS)).addSink((SinkFunction)DummySink.INSTANCE).name("dummy");
    }

    public static DataStream<HoodieRecord> bootstrap(Configuration conf, RowType rowType, int defaultParallelism, DataStream<RowData> dataStream) {
        return Pipelines.bootstrap(conf, rowType, defaultParallelism, dataStream, false, false);
    }

    public static DataStream<HoodieRecord> bootstrap(Configuration conf, RowType rowType, int defaultParallelism, DataStream<RowData> dataStream, boolean bounded, boolean overwrite) {
        boolean globalIndex = conf.getBoolean(FlinkOptions.INDEX_GLOBAL_ENABLED);
        if (overwrite) {
            return Pipelines.rowDataToHoodieRecord(conf, rowType, dataStream);
        }
        if (bounded && !globalIndex && OptionsResolver.isPartitionedTable(conf)) {
            return Pipelines.boundedBootstrap(conf, rowType, defaultParallelism, dataStream);
        }
        return Pipelines.streamBootstrap(conf, rowType, defaultParallelism, dataStream, bounded);
    }

    private static DataStream<HoodieRecord> streamBootstrap(Configuration conf, RowType rowType, int defaultParallelism, DataStream<RowData> dataStream, boolean bounded) {
        SingleOutputStreamOperator dataStream1 = Pipelines.rowDataToHoodieRecord(conf, rowType, dataStream);
        if (conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED) || bounded) {
            dataStream1 = dataStream1.transform("index_bootstrap", TypeInformation.of(HoodieRecord.class), new BootstrapOperator(conf)).setParallelism(conf.getOptional(FlinkOptions.INDEX_BOOTSTRAP_TASKS).orElse(defaultParallelism).intValue()).uid("uid_index_bootstrap_" + conf.getString(FlinkOptions.TABLE_NAME));
        }
        return dataStream1;
    }

    private static DataStream<HoodieRecord> boundedBootstrap(Configuration conf, RowType rowType, int defaultParallelism, DataStream<RowData> dataStream) {
        RowDataKeyGen rowDataKeyGen = RowDataKeyGen.instance(conf, rowType);
        dataStream = dataStream.keyBy(rowDataKeyGen::getPartitionPath);
        return Pipelines.rowDataToHoodieRecord(conf, rowType, (DataStream<RowData>)dataStream).transform("batch_index_bootstrap", TypeInformation.of(HoodieRecord.class), new BatchBootstrapOperator(conf)).setParallelism(conf.getOptional(FlinkOptions.INDEX_BOOTSTRAP_TASKS).orElse(defaultParallelism).intValue()).uid("uid_batch_index_bootstrap_" + conf.getString(FlinkOptions.TABLE_NAME));
    }

    public static DataStream<HoodieRecord> rowDataToHoodieRecord(Configuration conf, RowType rowType, DataStream<RowData> dataStream) {
        return dataStream.map(RowDataToHoodieFunctions.create(rowType, conf), TypeInformation.of(HoodieRecord.class));
    }

    public static DataStream<Object> hoodieStreamWrite(Configuration conf, int defaultParallelism, DataStream<HoodieRecord> dataStream) {
        WriteOperatorFactory operatorFactory = StreamWriteOperator.getFactory(conf);
        return dataStream.keyBy(HoodieRecord::getRecordKey).transform("bucket_assigner", TypeInformation.of(HoodieRecord.class), (OneInputStreamOperator)new KeyedProcessOperator(new BucketAssignFunction(conf))).uid("uid_bucket_assigner_" + conf.getString(FlinkOptions.TABLE_NAME)).setParallelism(conf.getOptional(FlinkOptions.BUCKET_ASSIGN_TASKS).orElse(defaultParallelism).intValue()).keyBy((KeySelector & Serializable)record -> record.getCurrentLocation().getFileId()).transform("hoodie_stream_write", TypeInformation.of(Object.class), operatorFactory).uid("uid_hoodie_stream_write" + conf.getString(FlinkOptions.TABLE_NAME)).setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS));
    }

    public static DataStreamSink<CompactionCommitEvent> compact(Configuration conf, DataStream<Object> dataStream) {
        return dataStream.transform("compact_plan_generate", TypeInformation.of(CompactionPlanEvent.class), (OneInputStreamOperator)new CompactionPlanOperator(conf)).setParallelism(1).rebalance().transform("compact_task", TypeInformation.of(CompactionCommitEvent.class), (OneInputStreamOperator)new ProcessOperator((ProcessFunction)new CompactFunction(conf))).setParallelism(conf.getInteger(FlinkOptions.COMPACTION_TASKS)).addSink((SinkFunction)new CompactionCommitSink(conf)).name("compact_commit").setParallelism(1);
    }

    public static DataStreamSink<Object> clean(Configuration conf, DataStream<Object> dataStream) {
        return dataStream.addSink(new CleanFunction(conf)).setParallelism(1).name("clean_commits");
    }

    public static class DummySink
    implements SinkFunction<Object> {
        private static final long serialVersionUID = 1L;
        public static DummySink INSTANCE = new DummySink();
    }
}

