/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.spark.actions;

import java.io.IOException;
import java.math.RoundingMode;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.RewriteJobOrder;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.actions.FileRewriter;
import org.apache.iceberg.actions.ImmutableRewriteDataFiles;
import org.apache.iceberg.actions.RewriteDataFiles;
import org.apache.iceberg.actions.RewriteDataFilesCommitManager;
import org.apache.iceberg.actions.RewriteFileGroup;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Queues;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.relocated.com.google.common.math.IntMath;
import org.apache.iceberg.relocated.com.google.common.util.concurrent.MoreExecutors;
import org.apache.iceberg.relocated.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.iceberg.spark.actions.BaseSnapshotUpdateSparkAction;
import org.apache.iceberg.spark.actions.SparkBinPackDataRewriter;
import org.apache.iceberg.spark.actions.SparkSortDataRewriter;
import org.apache.iceberg.spark.actions.SparkZOrderDataRewriter;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.iceberg.util.StructLikeMap;
import org.apache.iceberg.util.Tasks;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.internal.SQLConf;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RewriteDataFilesSparkAction
extends BaseSnapshotUpdateSparkAction<RewriteDataFilesSparkAction>
implements RewriteDataFiles {
    private static final Logger LOG = LoggerFactory.getLogger(RewriteDataFilesSparkAction.class);
    private static final Set<String> VALID_OPTIONS = ImmutableSet.of((Object)"max-concurrent-file-group-rewrites", (Object)"max-file-group-size-bytes", (Object)"partial-progress.enabled", (Object)"partial-progress.max-commits", (Object)"target-file-size-bytes", (Object)"use-starting-sequence-number", (Object[])new String[]{"rewrite-job-order"});
    private static final RewriteDataFiles.Result EMPTY_RESULT = ImmutableRewriteDataFiles.Result.builder().rewriteResults((Iterable)ImmutableList.of()).build();
    private final Table table;
    private Expression filter = Expressions.alwaysTrue();
    private int maxConcurrentFileGroupRewrites;
    private int maxCommits;
    private boolean partialProgressEnabled;
    private boolean useStartingSequenceNumber;
    private RewriteJobOrder rewriteJobOrder;
    private FileRewriter<FileScanTask, DataFile> rewriter = null;

    RewriteDataFilesSparkAction(SparkSession spark, Table table) {
        super(spark.cloneSession());
        this.spark().conf().set(SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), false);
        this.table = table;
    }

    @Override
    protected RewriteDataFilesSparkAction self() {
        return this;
    }

    public RewriteDataFilesSparkAction binPack() {
        Preconditions.checkArgument((this.rewriter == null ? 1 : 0) != 0, (Object)"Must use only one rewriter type (bin-pack, sort, zorder)");
        this.rewriter = new SparkBinPackDataRewriter(this.spark(), this.table);
        return this;
    }

    public RewriteDataFilesSparkAction sort(SortOrder sortOrder) {
        Preconditions.checkArgument((this.rewriter == null ? 1 : 0) != 0, (Object)"Must use only one rewriter type (bin-pack, sort, zorder)");
        this.rewriter = new SparkSortDataRewriter(this.spark(), this.table, sortOrder);
        return this;
    }

    public RewriteDataFilesSparkAction sort() {
        Preconditions.checkArgument((this.rewriter == null ? 1 : 0) != 0, (Object)"Must use only one rewriter type (bin-pack, sort, zorder)");
        this.rewriter = new SparkSortDataRewriter(this.spark(), this.table);
        return this;
    }

    public RewriteDataFilesSparkAction zOrder(String ... columnNames) {
        Preconditions.checkArgument((this.rewriter == null ? 1 : 0) != 0, (Object)"Must use only one rewriter type (bin-pack, sort, zorder)");
        this.rewriter = new SparkZOrderDataRewriter(this.spark(), this.table, Arrays.asList(columnNames));
        return this;
    }

    public RewriteDataFilesSparkAction filter(Expression expression) {
        this.filter = Expressions.and((Expression)this.filter, (Expression)expression);
        return this;
    }

    public RewriteDataFiles.Result execute() {
        if (this.table.currentSnapshot() == null) {
            return EMPTY_RESULT;
        }
        long startingSnapshotId = this.table.currentSnapshot().snapshotId();
        if (this.rewriter == null) {
            this.rewriter = new SparkBinPackDataRewriter(this.spark(), this.table);
        }
        this.validateAndInitOptions();
        StructLikeMap<List<List<FileScanTask>>> fileGroupsByPartition = this.planFileGroups(startingSnapshotId);
        RewriteExecutionContext ctx = new RewriteExecutionContext(fileGroupsByPartition);
        if (ctx.totalGroupCount() == 0) {
            LOG.info("Nothing found to rewrite in {}", (Object)this.table.name());
            return EMPTY_RESULT;
        }
        Stream<RewriteFileGroup> groupStream = this.toGroupStream(ctx, (Map<StructLike, List<List<FileScanTask>>>)fileGroupsByPartition);
        if (this.partialProgressEnabled) {
            return this.doExecuteWithPartialProgress(ctx, groupStream, this.commitManager(startingSnapshotId));
        }
        return this.doExecute(ctx, groupStream, this.commitManager(startingSnapshotId));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    StructLikeMap<List<List<FileScanTask>>> planFileGroups(long startingSnapshotId) {
        CloseableIterable fileScanTasks = ((TableScan)((TableScan)this.table.newScan().useSnapshot(startingSnapshotId).filter(this.filter)).ignoreResiduals()).planFiles();
        try {
            Types.StructType partitionType = this.table.spec().partitionType();
            StructLikeMap<List<FileScanTask>> filesByPartition = this.groupByPartition(partitionType, (Iterable<FileScanTask>)fileScanTasks);
            StructLikeMap<List<List<FileScanTask>>> structLikeMap = this.fileGroupsByPartition(filesByPartition);
            return structLikeMap;
        }
        finally {
            try {
                fileScanTasks.close();
            }
            catch (IOException io) {
                LOG.error("Cannot properly close file iterable while planning for rewrite", (Throwable)io);
            }
        }
    }

    private StructLikeMap<List<FileScanTask>> groupByPartition(Types.StructType partitionType, Iterable<FileScanTask> tasks) {
        StructLikeMap filesByPartition = StructLikeMap.create((Types.StructType)partitionType);
        GenericRecord emptyStruct = GenericRecord.create((Types.StructType)partitionType);
        for (FileScanTask task : tasks) {
            Object taskPartition = ((DataFile)task.file()).specId() == this.table.spec().specId() ? ((DataFile)task.file()).partition() : emptyStruct;
            List files = (List)filesByPartition.get(taskPartition);
            if (files == null) {
                files = Lists.newArrayList();
            }
            files.add(task);
            filesByPartition.put((StructLike)taskPartition, (Object)files);
        }
        return filesByPartition;
    }

    private StructLikeMap<List<List<FileScanTask>>> fileGroupsByPartition(StructLikeMap<List<FileScanTask>> filesByPartition) {
        return filesByPartition.transformValues(this::planFileGroups);
    }

    private List<List<FileScanTask>> planFileGroups(List<FileScanTask> tasks) {
        return ImmutableList.copyOf((Iterable)this.rewriter.planFileGroups(tasks));
    }

    @VisibleForTesting
    RewriteFileGroup rewriteFiles(RewriteExecutionContext ctx, RewriteFileGroup fileGroup) {
        String desc = this.jobDesc(fileGroup, ctx);
        Set addedFiles = this.withJobGroupInfo(this.newJobGroupInfo("REWRITE-DATA-FILES", desc), () -> this.rewriter.rewrite(fileGroup.fileScans()));
        fileGroup.setOutputFiles(addedFiles);
        LOG.info("Rewrite Files Ready to be Committed - {}", (Object)desc);
        return fileGroup;
    }

    private ExecutorService rewriteService() {
        return MoreExecutors.getExitingExecutorService((ThreadPoolExecutor)((ThreadPoolExecutor)Executors.newFixedThreadPool(this.maxConcurrentFileGroupRewrites, new ThreadFactoryBuilder().setNameFormat("Rewrite-Service-%d").build())));
    }

    @VisibleForTesting
    RewriteDataFilesCommitManager commitManager(long startingSnapshotId) {
        return new RewriteDataFilesCommitManager(this.table, startingSnapshotId, this.useStartingSequenceNumber, this.commitSummary());
    }

    private RewriteDataFiles.Result doExecute(RewriteExecutionContext ctx, Stream<RewriteFileGroup> groupStream, RewriteDataFilesCommitManager commitManager) {
        ExecutorService rewriteService = this.rewriteService();
        ConcurrentLinkedQueue rewrittenGroups = Queues.newConcurrentLinkedQueue();
        Tasks.Builder rewriteTaskBuilder = Tasks.foreach(groupStream).executeWith(rewriteService).stopOnFailure().noRetry().onFailure((fileGroup, exception) -> LOG.warn("Failure during rewrite process for group {}", (Object)fileGroup.info(), (Object)exception));
        try {
            rewriteTaskBuilder.run(fileGroup -> rewrittenGroups.add(this.rewriteFiles(ctx, (RewriteFileGroup)fileGroup)));
        }
        catch (Exception e) {
            LOG.error("Cannot complete rewrite, {} is not enabled and one of the file set groups failed to be rewritten. This error occurred during the writing of new files, not during the commit process. This indicates something is wrong that doesn't involve conflicts with other Iceberg operations. Enabling {} may help in this case but the root cause should be investigated. Cleaning up {} groups which finished being written.", new Object[]{"partial-progress.enabled", "partial-progress.enabled", rewrittenGroups.size(), e});
            Tasks.foreach((Iterable)rewrittenGroups).suppressFailureWhenFinished().run(arg_0 -> ((RewriteDataFilesCommitManager)commitManager).abortFileGroup(arg_0));
            throw e;
        }
        finally {
            rewriteService.shutdown();
        }
        try {
            commitManager.commitOrClean((Set)Sets.newHashSet((Iterable)rewrittenGroups));
        }
        catch (CommitFailedException | ValidationException e) {
            String errorMessage = String.format("Cannot commit rewrite because of a ValidationException or CommitFailedException. This usually means that this rewrite has conflicted with another concurrent Iceberg operation. To reduce the likelihood of conflicts, set %s which will break up the rewrite into multiple smaller commits controlled by %s. Separate smaller rewrite commits can succeed independently while any commits that conflict with another Iceberg operation will be ignored. This mode will create additional snapshots in the table history, one for each commit.", "partial-progress.enabled", "partial-progress.max-commits");
            throw new RuntimeException(errorMessage, e);
        }
        List rewriteResults = rewrittenGroups.stream().map(RewriteFileGroup::asResult).collect(Collectors.toList());
        return ImmutableRewriteDataFiles.Result.builder().rewriteResults(rewriteResults).build();
    }

    private RewriteDataFiles.Result doExecuteWithPartialProgress(RewriteExecutionContext ctx, Stream<RewriteFileGroup> groupStream, RewriteDataFilesCommitManager commitManager) {
        ExecutorService rewriteService = this.rewriteService();
        int groupsPerCommit = IntMath.divide((int)ctx.totalGroupCount(), (int)this.maxCommits, (RoundingMode)RoundingMode.CEILING);
        RewriteDataFilesCommitManager.CommitService commitService = commitManager.service(groupsPerCommit);
        commitService.start();
        ConcurrentLinkedQueue rewriteFailures = new ConcurrentLinkedQueue();
        Tasks.foreach(groupStream).suppressFailureWhenFinished().executeWith(rewriteService).noRetry().onFailure((fileGroup, exception) -> {
            LOG.error("Failure during rewrite group {}", (Object)fileGroup.info(), (Object)exception);
            rewriteFailures.add(ImmutableRewriteDataFiles.FileGroupFailureResult.builder().info(fileGroup.info()).dataFilesCount(fileGroup.numFiles()).build());
        }).run(fileGroup -> commitService.offer((Object)this.rewriteFiles(ctx, (RewriteFileGroup)fileGroup)));
        rewriteService.shutdown();
        commitService.close();
        List commitResults = commitService.results();
        if (commitResults.size() == 0) {
            LOG.error("{} is true but no rewrite commits succeeded. Check the logs to determine why the individual commits failed. If this is persistent it may help to increase {} which will break the rewrite operation into smaller commits.", (Object)"partial-progress.enabled", (Object)"partial-progress.max-commits");
        }
        List rewriteResults = commitResults.stream().map(RewriteFileGroup::asResult).collect(Collectors.toList());
        return ImmutableRewriteDataFiles.Result.builder().rewriteResults(rewriteResults).rewriteFailures(rewriteFailures).build();
    }

    Stream<RewriteFileGroup> toGroupStream(RewriteExecutionContext ctx, Map<StructLike, List<List<FileScanTask>>> groupsByPartition) {
        return groupsByPartition.entrySet().stream().filter((? super T e) -> ((List)e.getValue()).size() != 0).flatMap(e -> {
            StructLike partition = (StructLike)e.getKey();
            List scanGroups = (List)e.getValue();
            return scanGroups.stream().map(tasks -> this.newRewriteGroup(ctx, partition, (List<FileScanTask>)tasks));
        }).sorted(RewriteFileGroup.comparator((RewriteJobOrder)this.rewriteJobOrder));
    }

    private RewriteFileGroup newRewriteGroup(RewriteExecutionContext ctx, StructLike partition, List<FileScanTask> tasks) {
        int globalIndex = ctx.currentGlobalIndex();
        int partitionIndex = ctx.currentPartitionIndex(partition);
        ImmutableRewriteDataFiles.FileGroupInfo info = ImmutableRewriteDataFiles.FileGroupInfo.builder().globalIndex(globalIndex).partitionIndex(partitionIndex).partition(partition).build();
        return new RewriteFileGroup((RewriteDataFiles.FileGroupInfo)info, tasks);
    }

    void validateAndInitOptions() {
        HashSet validOptions = Sets.newHashSet((Iterable)this.rewriter.validOptions());
        validOptions.addAll(VALID_OPTIONS);
        HashSet invalidKeys = Sets.newHashSet(this.options().keySet());
        invalidKeys.removeAll(validOptions);
        Preconditions.checkArgument((boolean)invalidKeys.isEmpty(), (String)"Cannot use options %s, they are not supported by the action or the rewriter %s", (Object)invalidKeys, (Object)this.rewriter.description());
        this.rewriter.init(this.options());
        this.maxConcurrentFileGroupRewrites = PropertyUtil.propertyAsInt(this.options(), (String)"max-concurrent-file-group-rewrites", (int)5);
        this.maxCommits = PropertyUtil.propertyAsInt(this.options(), (String)"partial-progress.max-commits", (int)10);
        this.partialProgressEnabled = PropertyUtil.propertyAsBoolean(this.options(), (String)"partial-progress.enabled", (boolean)false);
        this.useStartingSequenceNumber = PropertyUtil.propertyAsBoolean(this.options(), (String)"use-starting-sequence-number", (boolean)true);
        this.rewriteJobOrder = RewriteJobOrder.fromName((String)PropertyUtil.propertyAsString(this.options(), (String)"rewrite-job-order", (String)REWRITE_JOB_ORDER_DEFAULT));
        Preconditions.checkArgument((this.maxConcurrentFileGroupRewrites >= 1 ? 1 : 0) != 0, (String)"Cannot set %s to %s, the value must be positive.", (Object)"max-concurrent-file-group-rewrites", (int)this.maxConcurrentFileGroupRewrites);
        Preconditions.checkArgument((!this.partialProgressEnabled || this.maxCommits > 0 ? 1 : 0) != 0, (String)"Cannot set %s to %s, the value must be positive when %s is true", (Object)"partial-progress.max-commits", (Object)this.maxCommits, (Object)"partial-progress.enabled");
    }

    private String jobDesc(RewriteFileGroup group, RewriteExecutionContext ctx) {
        StructLike partition = group.info().partition();
        if (partition.size() > 0) {
            return String.format("Rewriting %d files (%s, file group %d/%d, %s (%d/%d)) in %s", group.rewrittenFiles().size(), this.rewriter.description(), group.info().globalIndex(), ctx.totalGroupCount(), partition, group.info().partitionIndex(), ctx.groupsInPartition(partition), this.table.name());
        }
        return String.format("Rewriting %d files (%s, file group %d/%d) in %s", group.rewrittenFiles().size(), this.rewriter.description(), group.info().globalIndex(), ctx.totalGroupCount(), this.table.name());
    }

    @VisibleForTesting
    static class RewriteExecutionContext {
        private final StructLikeMap<Integer> numGroupsByPartition;
        private final int totalGroupCount;
        private final Map<StructLike, Integer> partitionIndexMap;
        private final AtomicInteger groupIndex;

        RewriteExecutionContext(StructLikeMap<List<List<FileScanTask>>> fileGroupsByPartition) {
            this.numGroupsByPartition = fileGroupsByPartition.transformValues(List::size);
            this.totalGroupCount = this.numGroupsByPartition.values().stream().reduce(Integer::sum).orElse(0);
            this.partitionIndexMap = Maps.newConcurrentMap();
            this.groupIndex = new AtomicInteger(1);
        }

        public int currentGlobalIndex() {
            return this.groupIndex.getAndIncrement();
        }

        public int currentPartitionIndex(StructLike partition) {
            return this.partitionIndexMap.merge(partition, 1, Integer::sum);
        }

        public int groupsInPartition(StructLike partition) {
            return (Integer)this.numGroupsByPartition.get((Object)partition);
        }

        public int totalGroupCount() {
            return this.totalGroupCount;
        }
    }
}

