/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.sink.utils;

import java.io.Serializable;
import java.util.Arrays;
import java.util.HashMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.ProcessOperator;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.logical.RowType;
import org.apache.hudi.common.model.ClusteringOperation;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.sink.CleanFunction;
import org.apache.hudi.sink.StreamWriteOperator;
import org.apache.hudi.sink.append.AppendWriteOperator;
import org.apache.hudi.sink.bootstrap.BootstrapOperator;
import org.apache.hudi.sink.bootstrap.batch.BatchBootstrapOperator;
import org.apache.hudi.sink.bucket.BucketBulkInsertWriterHelper;
import org.apache.hudi.sink.bucket.BucketStreamWriteOperator;
import org.apache.hudi.sink.bucket.ConsistentBucketAssignFunction;
import org.apache.hudi.sink.bulk.BulkInsertWriteOperator;
import org.apache.hudi.sink.bulk.RowDataKeyGen;
import org.apache.hudi.sink.bulk.sort.SortOperatorGen;
import org.apache.hudi.sink.clustering.ClusteringCommitEvent;
import org.apache.hudi.sink.clustering.ClusteringCommitSink;
import org.apache.hudi.sink.clustering.ClusteringOperator;
import org.apache.hudi.sink.clustering.ClusteringPlanEvent;
import org.apache.hudi.sink.clustering.ClusteringPlanOperator;
import org.apache.hudi.sink.common.WriteOperatorFactory;
import org.apache.hudi.sink.compact.CompactOperator;
import org.apache.hudi.sink.compact.CompactionCommitEvent;
import org.apache.hudi.sink.compact.CompactionCommitSink;
import org.apache.hudi.sink.compact.CompactionPlanEvent;
import org.apache.hudi.sink.compact.CompactionPlanOperator;
import org.apache.hudi.sink.partitioner.BucketAssignFunction;
import org.apache.hudi.sink.partitioner.BucketIndexPartitioner;
import org.apache.hudi.sink.transform.RowDataToHoodieFunctions;
import org.apache.hudi.table.format.FilePathUtils;

public class Pipelines {
    private static final ConcurrentHashMap<String, Integer> OPERATOR_COUNTERS = new ConcurrentHashMap();

    public static DataStreamSink<Object> bulkInsert(Configuration conf, RowType rowType, DataStream<RowData> dataStream) {
        int PARALLELISM_VALUE = conf.getInteger(FlinkOptions.WRITE_TASKS);
        boolean isBucketIndexType = OptionsResolver.isBucketIndexType(conf);
        if (isBucketIndexType) {
            if (OptionsResolver.isConsistentHashingBucketIndexType(conf)) {
                throw new HoodieException("Consistent hashing bucket index does not work with bulk insert using FLINK engine. Use simple bucket index or Spark engine.");
            }
            String indexKeys = OptionsResolver.getIndexKeyField(conf);
            int numBuckets = conf.getInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS);
            BucketIndexPartitioner partitioner = new BucketIndexPartitioner(numBuckets, indexKeys);
            RowDataKeyGen keyGen = RowDataKeyGen.instance(conf, rowType);
            RowType rowTypeWithFileId = BucketBulkInsertWriterHelper.rowTypeWithFileId(rowType);
            InternalTypeInfo typeInfo = InternalTypeInfo.of((RowType)rowTypeWithFileId);
            boolean needFixedFileIdSuffix = OptionsResolver.isNonBlockingConcurrencyControl(conf);
            HashMap bucketIdToFileId = new HashMap();
            dataStream = dataStream.partitionCustom(partitioner, keyGen::getHoodieKey).map((MapFunction & Serializable)record -> BucketBulkInsertWriterHelper.rowWithFileId(bucketIdToFileId, keyGen, record, indexKeys, numBuckets, needFixedFileIdSuffix), (TypeInformation)typeInfo).setParallelism(PARALLELISM_VALUE);
            if (conf.getBoolean(FlinkOptions.WRITE_BULK_INSERT_SORT_INPUT)) {
                SortOperatorGen sortOperatorGen = BucketBulkInsertWriterHelper.getFileIdSorterGen(rowTypeWithFileId);
                dataStream = dataStream.transform("file_sorter", (TypeInformation)typeInfo, sortOperatorGen.createSortOperator(conf)).setParallelism(PARALLELISM_VALUE);
                ExecNodeUtil.setManagedMemoryWeight((Transformation)dataStream.getTransformation(), (long)((long)conf.getInteger(FlinkOptions.WRITE_SORT_MEMORY) * 1024L * 1024L));
            }
        } else if (!FlinkOptions.isDefaultValueDefined(conf, FlinkOptions.PARTITION_PATH_FIELD)) {
            if (conf.getBoolean(FlinkOptions.WRITE_BULK_INSERT_SHUFFLE_INPUT)) {
                Partitioner & Serializable partitioner = (Partitioner & Serializable)(key, channels) -> KeyGroupRangeAssignment.assignKeyToParallelOperator((Object)key, (int)KeyGroupRangeAssignment.computeDefaultMaxParallelism((int)PARALLELISM_VALUE), (int)channels);
                RowDataKeyGen rowDataKeyGen = RowDataKeyGen.instance(conf, rowType);
                dataStream = dataStream.partitionCustom((Partitioner)partitioner, rowDataKeyGen::getPartitionPath);
            }
            if (conf.getBoolean(FlinkOptions.WRITE_BULK_INSERT_SORT_INPUT)) {
                boolean isNeededSortInput = conf.getBoolean(FlinkOptions.WRITE_BULK_INSERT_SORT_INPUT_BY_RECORD_KEY);
                String[] partitionFields = FilePathUtils.extractPartitionKeys(conf);
                String[] recordKeyFields = conf.getString(FlinkOptions.RECORD_KEY_FIELD).split(",");
                String[] sortFields = isNeededSortInput ? (String[])Stream.concat(Arrays.stream(partitionFields), Arrays.stream(recordKeyFields)).toArray(String[]::new) : partitionFields;
                SortOperatorGen sortOperatorGen = new SortOperatorGen(rowType, sortFields);
                dataStream = dataStream.transform(isNeededSortInput ? "sorter:(partition_key, record_key)" : "sorter:(partition_key)", (TypeInformation)InternalTypeInfo.of((RowType)rowType), sortOperatorGen.createSortOperator(conf)).setParallelism(PARALLELISM_VALUE);
                ExecNodeUtil.setManagedMemoryWeight((Transformation)dataStream.getTransformation(), (long)((long)conf.getInteger(FlinkOptions.WRITE_SORT_MEMORY) * 1024L * 1024L));
            }
        }
        return dataStream.transform(Pipelines.opName(isBucketIndexType ? "bucket_bulk_insert" : "hoodie_bulk_insert_write", conf), TypeInformation.of(Object.class), BulkInsertWriteOperator.getFactory(conf, rowType)).uid(Pipelines.opUID("bucket_bulk_insert", conf)).setParallelism(PARALLELISM_VALUE).addSink((SinkFunction)DummySink.INSTANCE).name("dummy");
    }

    public static DataStream<Object> append(Configuration conf, RowType rowType, DataStream<RowData> dataStream) {
        WriteOperatorFactory operatorFactory = AppendWriteOperator.getFactory(conf, rowType);
        return dataStream.transform(Pipelines.opName("hoodie_append_write", conf), TypeInformation.of(Object.class), operatorFactory).uid(Pipelines.opUID("hoodie_stream_write", conf)).setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS));
    }

    public static DataStream<HoodieRecord> bootstrap(Configuration conf, RowType rowType, DataStream<RowData> dataStream) {
        return Pipelines.bootstrap(conf, rowType, dataStream, false, false);
    }

    public static DataStream<HoodieRecord> bootstrap(Configuration conf, RowType rowType, DataStream<RowData> dataStream, boolean bounded, boolean overwrite) {
        boolean globalIndex = conf.getBoolean(FlinkOptions.INDEX_GLOBAL_ENABLED);
        if (overwrite || OptionsResolver.isBucketIndexType(conf)) {
            return Pipelines.rowDataToHoodieRecord(conf, rowType, dataStream);
        }
        if (bounded && !globalIndex && OptionsResolver.isPartitionedTable(conf)) {
            return Pipelines.boundedBootstrap(conf, rowType, dataStream);
        }
        return Pipelines.streamBootstrap(conf, rowType, dataStream, bounded);
    }

    private static DataStream<HoodieRecord> streamBootstrap(Configuration conf, RowType rowType, DataStream<RowData> dataStream, boolean bounded) {
        SingleOutputStreamOperator dataStream1 = Pipelines.rowDataToHoodieRecord(conf, rowType, dataStream);
        if (conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED) || bounded) {
            dataStream1 = dataStream1.transform("index_bootstrap", TypeInformation.of(HoodieRecord.class), new BootstrapOperator(conf)).setParallelism(conf.getOptional(FlinkOptions.INDEX_BOOTSTRAP_TASKS).orElse(dataStream1.getParallelism()).intValue()).uid(Pipelines.opUID("index_bootstrap", conf));
        }
        return dataStream1;
    }

    private static DataStream<HoodieRecord> boundedBootstrap(Configuration conf, RowType rowType, DataStream<RowData> dataStream) {
        RowDataKeyGen rowDataKeyGen = RowDataKeyGen.instance(conf, rowType);
        dataStream = dataStream.keyBy(rowDataKeyGen::getPartitionPath);
        return Pipelines.rowDataToHoodieRecord(conf, rowType, (DataStream<RowData>)dataStream).transform("batch_index_bootstrap", TypeInformation.of(HoodieRecord.class), new BatchBootstrapOperator(conf)).setParallelism(conf.getOptional(FlinkOptions.INDEX_BOOTSTRAP_TASKS).orElse(dataStream.getParallelism()).intValue()).uid(Pipelines.opUID("batch_index_bootstrap", conf));
    }

    public static DataStream<HoodieRecord> rowDataToHoodieRecord(Configuration conf, RowType rowType, DataStream<RowData> dataStream) {
        return dataStream.map(RowDataToHoodieFunctions.create(rowType, conf), TypeInformation.of(HoodieRecord.class)).setParallelism(dataStream.getParallelism()).name("row_data_to_hoodie_record");
    }

    public static DataStream<Object> hoodieStreamWrite(Configuration conf, DataStream<HoodieRecord> dataStream) {
        if (OptionsResolver.isBucketIndexType(conf)) {
            HoodieIndex.BucketIndexEngineType bucketIndexEngineType = OptionsResolver.getBucketEngineType(conf);
            switch (bucketIndexEngineType) {
                case SIMPLE: {
                    int bucketNum = conf.getInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS);
                    String indexKeyFields = OptionsResolver.getIndexKeyField(conf);
                    BucketIndexPartitioner partitioner = new BucketIndexPartitioner(bucketNum, indexKeyFields);
                    return dataStream.partitionCustom(partitioner, HoodieRecord::getKey).transform(Pipelines.opName("bucket_write", conf), TypeInformation.of(Object.class), BucketStreamWriteOperator.getFactory(conf)).uid(Pipelines.opUID("bucket_write", conf)).setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS));
                }
                case CONSISTENT_HASHING: {
                    if (OptionsResolver.isInsertOverwrite(conf)) {
                        throw new HoodieException("Consistent hashing bucket index does not work with insert overwrite using FLINK engine. Use simple bucket index or Spark engine.");
                    }
                    return dataStream.transform(Pipelines.opName("consistent_bucket_assigner", conf), TypeInformation.of(HoodieRecord.class), (OneInputStreamOperator)new ProcessOperator((ProcessFunction)new ConsistentBucketAssignFunction(conf))).uid(Pipelines.opUID("consistent_bucket_assigner", conf)).setParallelism(conf.getInteger(FlinkOptions.BUCKET_ASSIGN_TASKS)).keyBy((KeySelector & Serializable)record -> record.getCurrentLocation().getFileId()).transform(Pipelines.opName("consistent_bucket_write", conf), TypeInformation.of(Object.class), BucketStreamWriteOperator.getFactory(conf)).uid(Pipelines.opUID("consistent_bucket_write", conf)).setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS));
                }
            }
            throw new HoodieNotSupportedException("Unknown bucket index engine type: " + (Object)((Object)bucketIndexEngineType));
        }
        WriteOperatorFactory operatorFactory = StreamWriteOperator.getFactory(conf);
        return dataStream.keyBy(HoodieRecord::getRecordKey).transform("bucket_assigner", TypeInformation.of(HoodieRecord.class), (OneInputStreamOperator)new KeyedProcessOperator(new BucketAssignFunction(conf))).uid(Pipelines.opUID("bucket_assigner", conf)).setParallelism(conf.getInteger(FlinkOptions.BUCKET_ASSIGN_TASKS)).keyBy((KeySelector & Serializable)record -> record.getCurrentLocation().getFileId()).transform(Pipelines.opName("stream_write", conf), TypeInformation.of(Object.class), operatorFactory).uid(Pipelines.opUID("stream_write", conf)).setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS));
    }

    public static DataStreamSink<CompactionCommitEvent> compact(Configuration conf, DataStream<Object> dataStream) {
        DataStreamSink compactionCommitEventDataStream = dataStream.transform("compact_plan_generate", TypeInformation.of(CompactionPlanEvent.class), (OneInputStreamOperator)new CompactionPlanOperator(conf)).setParallelism(1).setMaxParallelism(1).partitionCustom((Partitioner)new IndexPartitioner(), CompactionPlanEvent::getIndex).transform("compact_task", TypeInformation.of(CompactionCommitEvent.class), (OneInputStreamOperator)new CompactOperator(conf)).setParallelism(conf.getInteger(FlinkOptions.COMPACTION_TASKS)).addSink((SinkFunction)new CompactionCommitSink(conf)).name("compact_commit").setParallelism(1);
        compactionCommitEventDataStream.getTransformation().setMaxParallelism(1);
        return compactionCommitEventDataStream;
    }

    public static DataStreamSink<ClusteringCommitEvent> cluster(Configuration conf, RowType rowType, DataStream<Object> dataStream) {
        SingleOutputStreamOperator clusteringStream = dataStream.transform("cluster_plan_generate", TypeInformation.of(ClusteringPlanEvent.class), (OneInputStreamOperator)new ClusteringPlanOperator(conf)).setParallelism(1).setMaxParallelism(1).keyBy((KeySelector & Serializable)plan -> plan.getClusteringGroupInfo().getOperations().stream().map(ClusteringOperation::getFileId).collect(Collectors.joining())).transform("clustering_task", TypeInformation.of(ClusteringCommitEvent.class), (OneInputStreamOperator)new ClusteringOperator(conf, rowType)).setParallelism(conf.getInteger(FlinkOptions.CLUSTERING_TASKS));
        if (OptionsResolver.sortClusteringEnabled(conf)) {
            ExecNodeUtil.setManagedMemoryWeight((Transformation)clusteringStream.getTransformation(), (long)((long)conf.getInteger(FlinkOptions.WRITE_SORT_MEMORY) * 1024L * 1024L));
        }
        DataStreamSink clusteringCommitEventDataStream = clusteringStream.addSink((SinkFunction)new ClusteringCommitSink(conf)).name("clustering_commit").setParallelism(1);
        clusteringCommitEventDataStream.getTransformation().setMaxParallelism(1);
        return clusteringCommitEventDataStream;
    }

    public static DataStreamSink<Object> clean(Configuration conf, DataStream<Object> dataStream) {
        DataStreamSink cleanCommitDataStream = dataStream.addSink(new CleanFunction(conf)).setParallelism(1).name("clean_commits");
        cleanCommitDataStream.getTransformation().setMaxParallelism(1);
        return cleanCommitDataStream;
    }

    public static DataStreamSink<Object> dummySink(DataStream<Object> dataStream) {
        return dataStream.addSink((SinkFunction)DummySink.INSTANCE).setParallelism(1).name("dummy");
    }

    public static String opName(String operatorN, Configuration conf) {
        return operatorN + ": " + Pipelines.getTablePath(conf);
    }

    public static String opUID(String operatorN, Configuration conf) {
        Integer operatorCount = OPERATOR_COUNTERS.merge(operatorN, 1, (oldValue, value) -> oldValue + value);
        return "uid_" + operatorN + (operatorCount == 1 ? "" : "_" + (operatorCount - 1)) + "_" + Pipelines.getTablePath(conf);
    }

    public static String getTablePath(Configuration conf) {
        String databaseName = conf.getString(FlinkOptions.DATABASE_NAME);
        return StringUtils.isNullOrEmpty(databaseName) ? conf.getString(FlinkOptions.TABLE_NAME) : databaseName + "." + conf.getString(FlinkOptions.TABLE_NAME);
    }

    public static class IndexPartitioner
    implements Partitioner<Integer> {
        public int partition(Integer key, int numPartitions) {
            return key % numPartitions;
        }
    }

    public static class DummySink
    implements SinkFunction<Object> {
        private static final long serialVersionUID = 1L;
        public static DummySink INSTANCE = new DummySink();
    }
}

