/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.client.clustering.plan.strategy;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Stream;
import org.apache.hudi.avro.model.HoodieClusteringGroup;
import org.apache.hudi.avro.model.HoodieClusteringPlan;
import org.apache.hudi.avro.model.HoodieClusteringStrategy;
import org.apache.hudi.client.clustering.plan.strategy.SparkSizeBasedClusteringPlanStrategy;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ParquetUtils;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.cluster.ClusteringPlanActionExecutor;
import org.apache.hudi.util.Lazy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SparkStreamCopyClusteringPlanStrategy<T>
extends SparkSizeBasedClusteringPlanStrategy<T> {
    private static final Logger LOG = LoggerFactory.getLogger(SparkStreamCopyClusteringPlanStrategy.class);
    private static final String SPARK_STREAM_COPY_CLUSTERING_EXECUTION_STRATEGY = "org.apache.hudi.client.clustering.run.strategy.SparkStreamCopyClusteringExecutionStrategy";

    public SparkStreamCopyClusteringPlanStrategy(HoodieTable table, HoodieEngineContext engineContext, HoodieWriteConfig writeConfig) {
        super(table, engineContext, writeConfig);
    }

    @Override
    protected Map<String, String> getStrategyParams() {
        HashMap<String, String> params = new HashMap<String, String>();
        if (!StringUtils.isNullOrEmpty((String)this.getWriteConfig().getClusteringSortColumns())) {
            params.put(HoodieClusteringConfig.PLAN_STRATEGY_SORT_COLUMNS.key(), this.getWriteConfig().getClusteringSortColumns());
        }
        return params;
    }

    @Override
    protected Pair<Stream<HoodieClusteringGroup>, Boolean> buildClusteringGroupsForPartition(String partitionPath, List<FileSlice> fileSlices) {
        if (!this.getWriteConfig().isBinaryCopySchemaEvolutionEnabled()) {
            LOG.info("Schema evolution disabled, using schema-aware grouping for partition: {}", (Object)partitionPath);
            return this.buildClusteringGroupsForPartitionWithSchemaGrouping(partitionPath, fileSlices);
        }
        LOG.info("Schema evolution enabled, using size-only grouping for partition: {}", (Object)partitionPath);
        return super.buildClusteringGroupsForPartition(partitionPath, fileSlices);
    }

    private Pair<Stream<HoodieClusteringGroup>, Boolean> buildClusteringGroupsForPartitionWithSchemaGrouping(String partitionPath, List<FileSlice> fileSlices) {
        HoodieWriteConfig writeConfig = this.getWriteConfig();
        ArrayList<Pair> fileSliceGroups = new ArrayList<Pair>();
        ArrayList<FileSlice> currentGroup = new ArrayList<FileSlice>();
        ArrayList<FileSlice> sortedFileSlices = new ArrayList<FileSlice>(fileSlices);
        sortedFileSlices.sort((o1, o2) -> (int)((o2.getBaseFile().isPresent() ? ((HoodieBaseFile)o2.getBaseFile().get()).getFileSize() : writeConfig.getParquetMaxFileSize()) - (o1.getBaseFile().isPresent() ? ((HoodieBaseFile)o1.getBaseFile().get()).getFileSize() : writeConfig.getParquetMaxFileSize())));
        long totalSizeSoFar = 0L;
        boolean partialScheduled = false;
        Integer currentGroupSchemaHash = null;
        for (FileSlice currentSlice : sortedFileSlices) {
            boolean schemaMismatch;
            long currentSize = currentSlice.getBaseFile().isPresent() ? ((HoodieBaseFile)currentSlice.getBaseFile().get()).getFileSize() : writeConfig.getParquetMaxFileSize();
            Integer currentFileSchemaHash = this.getFileSchemaHash(currentSlice);
            boolean bl = schemaMismatch = currentGroupSchemaHash != null && !currentGroupSchemaHash.equals(currentFileSchemaHash);
            if ((totalSizeSoFar + currentSize > writeConfig.getClusteringMaxBytesInGroup() || schemaMismatch) && !currentGroup.isEmpty()) {
                int numOutputGroups = this.getNumberOfOutputFileGroups(totalSizeSoFar, writeConfig.getClusteringTargetFileMaxBytes());
                LOG.info("Adding clustering group - size: {} max bytes: {} num input slices: {} output groups: {}{}", new Object[]{totalSizeSoFar, writeConfig.getClusteringMaxBytesInGroup(), currentGroup.size(), numOutputGroups, schemaMismatch ? " (schema change detected)" : ""});
                fileSliceGroups.add(Pair.of(currentGroup, (Object)numOutputGroups));
                currentGroup = new ArrayList();
                totalSizeSoFar = 0L;
                currentGroupSchemaHash = null;
                if (fileSliceGroups.size() >= writeConfig.getClusteringMaxNumGroups()) {
                    LOG.info("Having generated the maximum number of groups: {}", (Object)writeConfig.getClusteringMaxNumGroups());
                    partialScheduled = true;
                    break;
                }
            }
            currentGroup.add(currentSlice);
            if (currentGroupSchemaHash == null) {
                currentGroupSchemaHash = currentFileSchemaHash;
            }
            totalSizeSoFar += currentSize;
        }
        if (!currentGroup.isEmpty() && (currentGroup.size() > 1 || writeConfig.shouldClusteringSingleGroup())) {
            int numOutputGroups = this.getNumberOfOutputFileGroups(totalSizeSoFar, writeConfig.getClusteringTargetFileMaxBytes());
            LOG.info("Adding final clustering group - size: {} max bytes: {} num input slices: {} output groups: {}", new Object[]{totalSizeSoFar, writeConfig.getClusteringMaxBytesInGroup(), currentGroup.size(), numOutputGroups});
            fileSliceGroups.add(Pair.of(currentGroup, (Object)numOutputGroups));
        }
        return Pair.of(fileSliceGroups.stream().map(fileSliceGroup -> HoodieClusteringGroup.newBuilder().setSlices(SparkStreamCopyClusteringPlanStrategy.getFileSliceInfo((List)((List)fileSliceGroup.getLeft()))).setNumOutputFileGroups((Integer)fileSliceGroup.getRight()).setMetrics(this.buildMetrics((List)fileSliceGroup.getLeft())).build()), (Object)partialScheduled);
    }

    private Integer getFileSchemaHash(FileSlice fileSlice) {
        if (fileSlice.getBaseFile().isPresent()) {
            String filePath = ((HoodieBaseFile)fileSlice.getBaseFile().get()).getPath();
            try {
                return ParquetUtils.readSchemaHash((HoodieStorage)this.getHoodieTable().getStorage(), (StoragePath)new StoragePath(filePath));
            }
            catch (Exception e) {
                LOG.warn("Failed to read schema hash from file: {}", (Object)filePath, (Object)e);
                return 0;
            }
        }
        return 0;
    }

    public Option<HoodieClusteringPlan> generateClusteringPlan(ClusteringPlanActionExecutor executor, Lazy<List<String>> partitions) {
        Option planOption = super.generateClusteringPlan(executor, partitions);
        if (planOption.isPresent()) {
            HoodieClusteringPlan plan = (HoodieClusteringPlan)planOption.get();
            HoodieClusteringStrategy strategy = HoodieClusteringStrategy.newBuilder().setStrategyClassName(SPARK_STREAM_COPY_CLUSTERING_EXECUTION_STRATEGY).setStrategyParams(this.getStrategyParams()).build();
            LOG.info("Setting execution strategy to SparkStreamCopyClusteringExecutionStrategy for stream copy clustering");
            return Option.of((Object)HoodieClusteringPlan.newBuilder().setStrategy(strategy).setInputGroups(plan.getInputGroups()).setExtraMetadata(this.getExtraMetadata()).setVersion(Integer.valueOf(this.getPlanVersion())).setPreserveHoodieMetadata(Boolean.valueOf(true)).setMissingSchedulePartitions(plan.getMissingSchedulePartitions()).build());
        }
        return planOption;
    }
}

