/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.client.clustering.run.strategy;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.avro.Schema;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.client.SparkTaskContextSupplier;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.clustering.run.strategy.SingleSparkJobExecutionStrategy;
import org.apache.hudi.client.utils.LazyConcatenatingIterator;
import org.apache.hudi.common.config.HoodieConfig;
import org.apache.hudi.common.config.SerializableSchema;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.engine.ReaderContextFactory;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.ClusteringGroupInfo;
import org.apache.hudi.common.model.ClusteringOperation;
import org.apache.hudi.common.model.ConsistentHashingNode;
import org.apache.hudi.common.model.HoodieConsistentHashingMetadata;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.common.util.queue.HoodieConsumer;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieClusteringException;
import org.apache.hudi.index.bucket.ConsistentBucketIdentifier;
import org.apache.hudi.io.CreateHandleFactory;
import org.apache.hudi.io.HoodieWriteHandle;
import org.apache.hudi.io.IOUtils;
import org.apache.hudi.io.WriteHandleFactory;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.util.ExecutorFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SingleSparkJobConsistentHashingExecutionStrategy<T>
extends SingleSparkJobExecutionStrategy<T> {
    private static final Logger LOG = LoggerFactory.getLogger(SingleSparkJobConsistentHashingExecutionStrategy.class);
    private final String indexKeyFields;
    private final Schema readerSchema;

    public SingleSparkJobConsistentHashingExecutionStrategy(HoodieTable table, HoodieEngineContext engineContext, HoodieWriteConfig writeConfig) {
        super(table, engineContext, writeConfig);
        this.indexKeyFields = table.getConfig().getBucketIndexHashField();
        this.readerSchema = HoodieAvroUtils.addMetadataFields((Schema)new Schema.Parser().parse(writeConfig.getSchema()));
    }

    @Override
    protected List<WriteStatus> performClusteringForGroup(ReaderContextFactory<T> readerContextFactory, ClusteringGroupInfo clusteringGroup, Map<String, String> strategyParams, boolean preserveHoodieMetadata, SerializableSchema schema, TaskContextSupplier taskContextSupplier, String instantTime) {
        ValidationUtils.checkArgument((clusteringGroup.getNumOutputGroups() >= 1 ? 1 : 0) != 0, (String)"Number of output groups should be at least 1");
        if (clusteringGroup.getNumOutputGroups() == 1) {
            return this.performBucketMergeForGroup(readerContextFactory, clusteringGroup, strategyParams, taskContextSupplier, instantTime);
        }
        return this.performBucketSplitForGroup(readerContextFactory, clusteringGroup, strategyParams, taskContextSupplier, instantTime);
    }

    private List<ConsistentHashingNode> decodeConsistentHashingNodes(ClusteringGroupInfo clusteringGroupInfo) {
        Option extraMetadata = clusteringGroupInfo.getExtraMetadata();
        ValidationUtils.checkArgument((boolean)extraMetadata.isPresent(), (String)"Extra metadata should be present for consistent hashing operations");
        String json = (String)((Map)extraMetadata.get()).get("clustering.group.child.node");
        ValidationUtils.checkArgument((!StringUtils.isNullOrEmpty((String)json) ? 1 : 0) != 0, (String)"Child nodes should not be null or empty for consistent hashing operations");
        try {
            return ConsistentHashingNode.fromJsonString((String)json);
        }
        catch (Exception e) {
            throw new HoodieClusteringException("Failed to parse child nodes from metadata", (Throwable)e);
        }
    }

    private List<WriteStatus> performBucketMergeForGroup(ReaderContextFactory<T> readerContextFactory, ClusteringGroupInfo clusteringGroup, Map<String, String> strategyParams, TaskContextSupplier taskContextSupplier, String instantTime) {
        long maxMemoryPerCompaction = IOUtils.getMaxMemoryPerCompaction((TaskContextSupplier)taskContextSupplier, (HoodieConfig)this.writeConfig);
        LOG.info("MaxMemoryPerCompaction run as part of clustering => {}", (Object)maxMemoryPerCompaction);
        Option extraMetadata = clusteringGroup.getExtraMetadata();
        ValidationUtils.checkArgument((boolean)extraMetadata.isPresent(), (String)"Extra metadata should be present for consistent hashing operations");
        String partition = (String)((Map)extraMetadata.get()).get("clustering.group.partition");
        ValidationUtils.checkArgument((!StringUtils.isNullOrEmpty((String)partition) ? 1 : 0) != 0, (String)"Partition should not be null or empty");
        List<ConsistentHashingNode> nodes = this.decodeConsistentHashingNodes(clusteringGroup);
        Option newBucket = Option.fromJavaOptional(nodes.stream().filter(node -> node.getTag() == ConsistentHashingNode.NodeTag.REPLACE).findFirst());
        ValidationUtils.checkArgument((boolean)newBucket.isPresent(), (String)"New bucket should be present for merge operation");
        ConsistentHashingNode newBucketNode = (ConsistentHashingNode)newBucket.get();
        ArrayList readerSuppliers = new ArrayList(clusteringGroup.getOperations().size());
        clusteringGroup.getOperations().stream().forEach(op -> {
            Supplier<ClosableIterator> supplier = () -> this.getRecordIterator((ReaderContextFactory)readerContextFactory, (ClusteringOperation)op, instantTime, maxMemoryPerCompaction);
            readerSuppliers.add(supplier);
        });
        LazyConcatenatingIterator inputRecordsIter = new LazyConcatenatingIterator(readerSuppliers);
        InsertHandler insertHandler = new InsertHandler(this.writeConfig, instantTime, this.getHoodieTable(), taskContextSupplier, (WriteHandleFactory)new FixedIdSuffixCreateHandleFactory(), true, record -> newBucketNode.getFileIdPrefix(), this.readerSchema);
        return (List)ExecutorFactory.create((HoodieWriteConfig)this.writeConfig, (Iterator)inputRecordsIter, insertHandler, op -> op, (Runnable)this.getHoodieTable().getPreExecuteRunnable()).execute();
    }

    private List<WriteStatus> performBucketSplitForGroup(ReaderContextFactory<T> readerContextFactory, ClusteringGroupInfo clusteringGroup, Map<String, String> strategyParams, TaskContextSupplier taskContextSupplier, String instantTime) {
        ValidationUtils.checkArgument((clusteringGroup.getOperations().size() == 1 ? 1 : 0) != 0, (String)"Split operation should have only one operation");
        Option extraMetadata = clusteringGroup.getExtraMetadata();
        ValidationUtils.checkArgument((boolean)extraMetadata.isPresent(), (String)"Extra metadata should be present for consistent hashing operations");
        String partition = (String)((Map)extraMetadata.get()).get("clustering.group.partition");
        ValidationUtils.checkArgument((!StringUtils.isNullOrEmpty((String)partition) ? 1 : 0) != 0, (String)"Partition should not be null or empty");
        List<ConsistentHashingNode> nodes = this.decodeConsistentHashingNodes(clusteringGroup);
        Integer seqNo = Integer.parseInt((String)((Map)extraMetadata.get()).get("clustering.group.sequence.no"));
        HoodieConsistentHashingMetadata metadata = new HoodieConsistentHashingMetadata(0, partition, instantTime, 0, seqNo + 1, Collections.emptyList());
        metadata.setChildrenNodes(nodes);
        ConsistentBucketIdentifier identifier = new ConsistentBucketIdentifier(metadata);
        ClusteringOperation operation = (ClusteringOperation)clusteringGroup.getOperations().get(0);
        ClosableIterator iterator = this.getRecordIterator(readerContextFactory, operation, instantTime, IOUtils.getMaxMemoryPerCompaction((TaskContextSupplier)new SparkTaskContextSupplier(), (HoodieConfig)this.writeConfig));
        Function<HoodieRecord, String> fileIdPrefixExtractor = record -> identifier.getBucket(record.getRecordKey(), this.indexKeyFields).getFileIdPrefix();
        InsertHandler insertHandler = new InsertHandler(this.writeConfig, instantTime, this.getHoodieTable(), taskContextSupplier, (WriteHandleFactory)new FixedIdSuffixCreateHandleFactory(), false, fileIdPrefixExtractor, this.readerSchema);
        return (List)ExecutorFactory.create((HoodieWriteConfig)this.writeConfig, (Iterator)iterator, insertHandler, op -> op, (Runnable)this.getHoodieTable().getPreExecuteRunnable()).execute();
    }

    static class InsertHandler<T>
    implements HoodieConsumer<HoodieRecord<T>, List<WriteStatus>> {
        private final HoodieWriteConfig config;
        private final String instantTime;
        private final HoodieTable hoodieTable;
        private final TaskContextSupplier taskContextSupplier;
        private final WriteHandleFactory writeHandleFactory;
        private final List<WriteStatus> statuses;
        private final boolean recordsSorted;
        private final Map<String, HoodieWriteHandle> writeHandles;
        private final Function<HoodieRecord, String> fileIdPrefixExtractor;
        private final Schema schema;

        public InsertHandler(HoodieWriteConfig config, String instantTime, HoodieTable hoodieTable, TaskContextSupplier taskContextSupplier, WriteHandleFactory writeHandleFactory, boolean recordsSorted, Function<HoodieRecord, String> fileIdPrefixExtractor, Schema schema) {
            this.config = config;
            this.instantTime = instantTime;
            this.hoodieTable = hoodieTable;
            this.taskContextSupplier = taskContextSupplier;
            this.writeHandleFactory = writeHandleFactory;
            this.statuses = new ArrayList<WriteStatus>();
            this.recordsSorted = recordsSorted;
            this.writeHandles = new HashMap<String, HoodieWriteHandle>();
            this.fileIdPrefixExtractor = fileIdPrefixExtractor;
            this.schema = schema;
        }

        public void consume(HoodieRecord record) throws Exception {
            String fileIdPrefix = this.fileIdPrefixExtractor.apply(record);
            HoodieWriteHandle handle = this.writeHandles.get(fileIdPrefix);
            if (handle == null) {
                if (this.recordsSorted) {
                    this.closeOpenHandles();
                }
                handle = this.writeHandleFactory.create(this.config, this.instantTime, this.hoodieTable, record.getPartitionPath(), fileIdPrefix, this.taskContextSupplier);
                this.writeHandles.put(fileIdPrefix, handle);
            }
            handle.write(record, this.schema, this.config.getProps());
        }

        public List<WriteStatus> finish() {
            this.closeOpenHandles();
            return this.statuses;
        }

        private void closeOpenHandles() {
            for (HoodieWriteHandle handle : this.writeHandles.values()) {
                this.statuses.addAll(handle.close());
            }
            this.writeHandles.clear();
        }
    }

    static class FixedIdSuffixCreateHandleFactory
    extends CreateHandleFactory {
        FixedIdSuffixCreateHandleFactory() {
        }

        protected String getNextFileId(String idPfx) {
            return FSUtils.createNewFileId((String)idPfx, (int)0);
        }
    }
}

