/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.sink.clustering;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Spliterators;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.planner.codegen.sort.SortCodeGenerator;
import org.apache.flink.table.runtime.generated.NormalizedKeyComputer;
import org.apache.flink.table.runtime.generated.RecordComparator;
import org.apache.flink.table.runtime.operators.TableStreamOperator;
import org.apache.flink.table.runtime.operators.sort.BinaryExternalSorter;
import org.apache.flink.table.runtime.typeutils.AbstractRowDataSerializer;
import org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer;
import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
import org.apache.flink.table.runtime.util.StreamRecordCollector;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.function.ThrowingRunnable;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.client.FlinkTaskContextSupplier;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.utils.ConcatenatingIterator;
import org.apache.hudi.common.model.ClusteringGroupInfo;
import org.apache.hudi.common.model.ClusteringOperation;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.log.HoodieFileSliceReader;
import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.exception.HoodieClusteringException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.io.IOUtils;
import org.apache.hudi.io.storage.HoodieFileReader;
import org.apache.hudi.io.storage.HoodieFileReaderFactory;
import org.apache.hudi.org.apache.avro.Schema;
import org.apache.hudi.org.apache.avro.generic.GenericRecord;
import org.apache.hudi.org.apache.avro.generic.GenericRecordBuilder;
import org.apache.hudi.org.apache.avro.generic.IndexedRecord;
import org.apache.hudi.sink.bulk.BulkInsertWriterHelper;
import org.apache.hudi.sink.bulk.sort.SortOperatorGen;
import org.apache.hudi.sink.clustering.ClusteringCommitEvent;
import org.apache.hudi.sink.clustering.ClusteringPlanEvent;
import org.apache.hudi.sink.utils.NonThrownExecutor;
import org.apache.hudi.table.HoodieFlinkTable;
import org.apache.hudi.table.format.FormatUtils;
import org.apache.hudi.util.AvroSchemaConverter;
import org.apache.hudi.util.AvroToRowDataConverters;
import org.apache.hudi.util.StreamerUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ClusteringOperator
extends TableStreamOperator<ClusteringCommitEvent>
implements OneInputStreamOperator<ClusteringPlanEvent, ClusteringCommitEvent>,
BoundedOneInput {
    private static final Logger LOG = LoggerFactory.getLogger(ClusteringOperator.class);
    private final Configuration conf;
    private final RowType rowType;
    private int taskID;
    private transient HoodieWriteConfig writeConfig;
    private transient HoodieFlinkTable<?> table;
    private transient Schema schema;
    private transient Schema readerSchema;
    private transient int[] requiredPos;
    private transient AvroToRowDataConverters.AvroToRowDataConverter avroToRowDataConverter;
    private transient HoodieFlinkWriteClient writeClient;
    private transient BulkInsertWriterHelper writerHelper;
    private transient BinaryExternalSorter sorter;
    private transient StreamRecordCollector<ClusteringCommitEvent> collector;
    private transient BinaryRowDataSerializer binarySerializer;
    private final boolean asyncClustering;
    private final boolean sortClusteringEnabled;
    private transient NonThrownExecutor executor;

    public ClusteringOperator(Configuration conf, RowType rowType) {
        this.conf = conf;
        this.rowType = rowType;
        this.asyncClustering = OptionsResolver.needsAsyncClustering(conf);
        this.sortClusteringEnabled = OptionsResolver.sortClusteringEnabled(conf);
    }

    public void open() throws Exception {
        super.open();
        this.taskID = this.getRuntimeContext().getIndexOfThisSubtask();
        this.writeConfig = StreamerUtil.getHoodieClientConfig(this.conf);
        this.writeClient = StreamerUtil.createWriteClient(this.conf, (RuntimeContext)this.getRuntimeContext());
        this.table = this.writeClient.getHoodieTable();
        this.schema = AvroSchemaConverter.convertToSchema((LogicalType)this.rowType);
        this.readerSchema = HoodieAvroUtils.addMetadataFields(this.schema);
        this.requiredPos = this.getRequiredPositions();
        this.avroToRowDataConverter = AvroToRowDataConverters.createRowConverter(this.rowType);
        this.binarySerializer = new BinaryRowDataSerializer(this.rowType.getFieldCount());
        if (this.sortClusteringEnabled) {
            this.initSorter();
        }
        if (this.asyncClustering) {
            this.executor = NonThrownExecutor.builder(LOG).build();
        }
        this.collector = new StreamRecordCollector(this.output);
    }

    public void processElement(StreamRecord<ClusteringPlanEvent> element) throws Exception {
        ClusteringPlanEvent event = (ClusteringPlanEvent)element.getValue();
        String instantTime = event.getClusteringInstantTime();
        if (this.asyncClustering) {
            this.executor.execute((ThrowingRunnable<Throwable>)((ThrowingRunnable)() -> this.doClustering(instantTime, event)), (errMsg, t) -> this.collector.collect((Object)new ClusteringCommitEvent(instantTime, this.taskID)), "Execute clustering for instant %s from task %d", instantTime, this.taskID);
        } else {
            LOG.info("Execute clustering for instant {} from task {}", (Object)instantTime, (Object)this.taskID);
            this.doClustering(instantTime, event);
        }
    }

    public void close() {
        if (this.writeClient != null) {
            this.writeClient.cleanHandlesGracefully();
            this.writeClient.close();
        }
    }

    public void endInput() {
    }

    private void doClustering(String instantTime, ClusteringPlanEvent event) throws Exception {
        ClusteringGroupInfo clusteringGroupInfo = event.getClusteringGroupInfo();
        this.initWriterHelper(instantTime);
        List<ClusteringOperation> clusteringOps = clusteringGroupInfo.getOperations();
        boolean hasLogFiles = clusteringOps.stream().anyMatch(op -> op.getDeltaFilePaths().size() > 0);
        Iterator<RowData> iterator2 = hasLogFiles ? this.readRecordsForGroupWithLogs(clusteringOps, instantTime) : this.readRecordsForGroupBaseFiles(clusteringOps);
        RowDataSerializer rowDataSerializer = new RowDataSerializer(this.rowType);
        if (this.sortClusteringEnabled) {
            while (iterator2.hasNext()) {
                RowData rowData = iterator2.next();
                BinaryRowData binaryRowData = rowDataSerializer.toBinaryRow(rowData).copy();
                this.sorter.write((RowData)binaryRowData);
            }
            BinaryRowData row = this.binarySerializer.createInstance();
            while ((row = (BinaryRowData)this.sorter.getIterator().next((Object)row)) != null) {
                this.writerHelper.write((RowData)row);
            }
        } else {
            while (iterator2.hasNext()) {
                this.writerHelper.write(iterator2.next());
            }
        }
        List<WriteStatus> writeStatuses = this.writerHelper.getWriteStatuses(this.taskID);
        this.collector.collect((Object)new ClusteringCommitEvent(instantTime, writeStatuses, this.taskID));
        this.writerHelper = null;
    }

    private void initWriterHelper(String clusteringInstantTime) {
        if (this.writerHelper == null) {
            this.writerHelper = new BulkInsertWriterHelper(this.conf, this.table, this.writeConfig, clusteringInstantTime, this.taskID, this.getRuntimeContext().getNumberOfParallelSubtasks(), this.getRuntimeContext().getAttemptNumber(), this.rowType);
        }
    }

    private Iterator<RowData> readRecordsForGroupWithLogs(List<ClusteringOperation> clusteringOps, String instantTime) {
        ArrayList recordIterators = new ArrayList();
        long maxMemoryPerCompaction = IOUtils.getMaxMemoryPerCompaction(new FlinkTaskContextSupplier(null), this.writeConfig);
        LOG.info("MaxMemoryPerCompaction run as part of clustering => " + maxMemoryPerCompaction);
        for (ClusteringOperation clusteringOp : clusteringOps) {
            try {
                Option<HoodieFileReader> baseFileReader = StringUtils.isNullOrEmpty(clusteringOp.getDataFilePath()) ? Option.empty() : Option.of(HoodieFileReaderFactory.getFileReader(this.table.getHadoopConf(), new Path(clusteringOp.getDataFilePath())));
                HoodieMergedLogRecordScanner scanner = ((HoodieMergedLogRecordScanner.Builder)HoodieMergedLogRecordScanner.newBuilder().withFileSystem(this.table.getMetaClient().getFs()).withBasePath(this.table.getMetaClient().getBasePath()).withLogFilePaths((List)clusteringOp.getDeltaFilePaths())).withReaderSchema(this.readerSchema).withLatestInstantTime(instantTime).withMaxMemorySizeInBytes(maxMemoryPerCompaction).withReadBlocksLazily(this.writeConfig.getCompactionLazyBlockReadEnabled()).withReverseReader(this.writeConfig.getCompactionReverseLogReadEnabled()).withBufferSize(this.writeConfig.getMaxDFSStreamBufferSize()).withSpillableMapBasePath(this.writeConfig.getSpillableMapBasePath()).withDiskMapType(this.writeConfig.getCommonConfig().getSpillableDiskMapType()).withBitCaskDiskMapCompressionEnabled(this.writeConfig.getCommonConfig().isBitCaskDiskMapCompressionEnabled()).build();
                HoodieTableConfig tableConfig = this.table.getMetaClient().getTableConfig();
                HoodieFileSliceReader hoodieFileSliceReader = HoodieFileSliceReader.getFileSliceReader(baseFileReader, scanner, this.readerSchema, tableConfig.getPayloadClass(), tableConfig.getPreCombineField(), tableConfig.populateMetaFields() ? Option.empty() : Option.of(Pair.of(tableConfig.getRecordKeyFieldProp(), tableConfig.getPartitionFieldProp())));
                recordIterators.add(StreamSupport.stream(Spliterators.spliteratorUnknownSize(hoodieFileSliceReader, 256), false).map(hoodieRecord -> {
                    try {
                        return this.transform(((HoodieRecordPayload)hoodieRecord.getData()).getInsertValue(this.readerSchema).get());
                    }
                    catch (IOException e) {
                        throw new HoodieIOException("Failed to read next record", e);
                    }
                }).iterator());
            }
            catch (IOException e) {
                throw new HoodieClusteringException("Error reading input data for " + clusteringOp.getDataFilePath() + " and " + clusteringOp.getDeltaFilePaths(), e);
            }
        }
        return new ConcatenatingIterator<RowData>(recordIterators);
    }

    private Iterator<RowData> readRecordsForGroupBaseFiles(List<ClusteringOperation> clusteringOps) {
        List iteratorsForPartition = clusteringOps.stream().map(clusteringOp -> {
            Iterable indexedRecords = () -> {
                try {
                    return HoodieFileReaderFactory.getFileReader(this.table.getHadoopConf(), new Path(clusteringOp.getDataFilePath())).getRecordIterator(this.readerSchema);
                }
                catch (IOException e) {
                    throw new HoodieClusteringException("Error reading input data for " + clusteringOp.getDataFilePath() + " and " + clusteringOp.getDeltaFilePaths(), e);
                }
            };
            return StreamSupport.stream(indexedRecords.spliterator(), false).map(this::transform).iterator();
        }).collect(Collectors.toList());
        return new ConcatenatingIterator<RowData>(iteratorsForPartition);
    }

    private RowData transform(IndexedRecord indexedRecord) {
        GenericRecord record = FormatUtils.buildAvroRecordBySchema(indexedRecord, this.schema, this.requiredPos, new GenericRecordBuilder(this.schema));
        return (RowData)this.avroToRowDataConverter.convert(record);
    }

    private int[] getRequiredPositions() {
        List fieldNames = this.readerSchema.getFields().stream().map(Schema.Field::name).collect(Collectors.toList());
        return this.schema.getFields().stream().map(field -> fieldNames.indexOf(field.name())).mapToInt(i -> i).toArray();
    }

    private void initSorter() {
        ClassLoader cl = this.getContainingTask().getUserCodeClassLoader();
        NormalizedKeyComputer computer = (NormalizedKeyComputer)this.createSortCodeGenerator().generateNormalizedKeyComputer("SortComputer").newInstance(cl);
        RecordComparator comparator = (RecordComparator)this.createSortCodeGenerator().generateRecordComparator("SortComparator").newInstance(cl);
        MemoryManager memManager = this.getContainingTask().getEnvironment().getMemoryManager();
        this.sorter = new BinaryExternalSorter((Object)this.getContainingTask(), memManager, this.computeMemorySize(), this.getContainingTask().getEnvironment().getIOManager(), (AbstractRowDataSerializer)this.binarySerializer, this.binarySerializer, computer, comparator, this.getContainingTask().getJobConfiguration());
        this.sorter.startThreads();
        this.getMetricGroup().gauge("memoryUsedSizeInBytes", () -> ((BinaryExternalSorter)this.sorter).getUsedMemoryInBytes());
        this.getMetricGroup().gauge("numSpillFiles", () -> ((BinaryExternalSorter)this.sorter).getNumSpillFiles());
        this.getMetricGroup().gauge("spillInBytes", () -> ((BinaryExternalSorter)this.sorter).getSpillInBytes());
    }

    private SortCodeGenerator createSortCodeGenerator() {
        SortOperatorGen sortOperatorGen = new SortOperatorGen(this.rowType, this.conf.getString(FlinkOptions.CLUSTERING_SORT_COLUMNS).split(","));
        return sortOperatorGen.createSortCodeGenerator();
    }

    @VisibleForTesting
    public void setExecutor(NonThrownExecutor executor) {
        this.executor = executor;
    }

    @VisibleForTesting
    public void setOutput(Output<StreamRecord<ClusteringCommitEvent>> output) {
        this.output = output;
    }
}

