/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.spark.actions;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.ContentFile;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.HasTableOperations;
import org.apache.iceberg.ManifestContent;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.ManifestFiles;
import org.apache.iceberg.ManifestWriter;
import org.apache.iceberg.MetadataTableType;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Partitioning;
import org.apache.iceberg.RewriteManifests;
import org.apache.iceberg.RollingManifestWriter;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.SnapshotUpdate;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableOperations;
import org.apache.iceberg.actions.ImmutableRewriteManifests;
import org.apache.iceberg.actions.RewriteManifests;
import org.apache.iceberg.exceptions.CommitStateUnknownException;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.JobGroupInfo;
import org.apache.iceberg.spark.SparkContentFile;
import org.apache.iceberg.spark.SparkDataFile;
import org.apache.iceberg.spark.SparkDeleteFile;
import org.apache.iceberg.spark.actions.BaseSnapshotUpdateSparkAction;
import org.apache.iceberg.spark.source.SerializableTableWithSize;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.iceberg.util.Tasks;
import org.apache.iceberg.util.ThreadPools;
import org.apache.spark.api.java.function.MapPartitionsFunction;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.StructType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RewriteManifestsSparkAction
extends BaseSnapshotUpdateSparkAction<RewriteManifestsSparkAction>
implements org.apache.iceberg.actions.RewriteManifests {
    public static final String USE_CACHING = "use-caching";
    public static final boolean USE_CACHING_DEFAULT = false;
    private static final Logger LOG = LoggerFactory.getLogger(RewriteManifestsSparkAction.class);
    private static final RewriteManifests.Result EMPTY_RESULT = ImmutableRewriteManifests.Result.builder().rewrittenManifests((Iterable)ImmutableList.of()).addedManifests((Iterable)ImmutableList.of()).build();
    private final Table table;
    private final int formatVersion;
    private final long targetManifestSizeBytes;
    private final boolean shouldStageManifests;
    private PartitionSpec spec = null;
    private Predicate<ManifestFile> predicate = manifest -> true;
    private String outputLocation = null;

    RewriteManifestsSparkAction(SparkSession spark, Table table) {
        super(spark);
        this.table = table;
        this.spec = table.spec();
        this.targetManifestSizeBytes = PropertyUtil.propertyAsLong((Map)table.properties(), (String)"commit.manifest.target-size-bytes", (long)0x800000L);
        TableOperations ops = ((HasTableOperations)table).operations();
        Path metadataFilePath = new Path(ops.metadataFileLocation("file"));
        this.outputLocation = metadataFilePath.getParent().toString();
        this.formatVersion = ops.current().formatVersion();
        boolean snapshotIdInheritanceEnabled = PropertyUtil.propertyAsBoolean((Map)table.properties(), (String)"compatibility.snapshot-id-inheritance.enabled", (boolean)false);
        this.shouldStageManifests = this.formatVersion == 1 && !snapshotIdInheritanceEnabled;
    }

    @Override
    protected RewriteManifestsSparkAction self() {
        return this;
    }

    public RewriteManifestsSparkAction specId(int specId) {
        Preconditions.checkArgument((boolean)this.table.specs().containsKey(specId), (String)"Invalid spec id %s", (int)specId);
        this.spec = (PartitionSpec)this.table.specs().get(specId);
        return this;
    }

    public RewriteManifestsSparkAction rewriteIf(Predicate<ManifestFile> newPredicate) {
        this.predicate = newPredicate;
        return this;
    }

    public RewriteManifestsSparkAction stagingLocation(String newStagingLocation) {
        if (this.shouldStageManifests) {
            this.outputLocation = newStagingLocation;
        } else {
            LOG.warn("Ignoring provided staging location as new manifests will be committed directly");
        }
        return this;
    }

    public RewriteManifests.Result execute() {
        String desc = String.format("Rewriting manifests in %s", this.table.name());
        JobGroupInfo info = this.newJobGroupInfo("REWRITE-MANIFESTS", desc);
        return this.withJobGroupInfo(info, this::doExecute);
    }

    private RewriteManifests.Result doExecute() {
        ArrayList rewrittenManifests = Lists.newArrayList();
        ArrayList addedManifests = Lists.newArrayList();
        RewriteManifests.Result dataResult = this.rewriteManifests(ManifestContent.DATA);
        Iterables.addAll((Collection)rewrittenManifests, (Iterable)dataResult.rewrittenManifests());
        Iterables.addAll((Collection)addedManifests, (Iterable)dataResult.addedManifests());
        RewriteManifests.Result deletesResult = this.rewriteManifests(ManifestContent.DELETES);
        Iterables.addAll((Collection)rewrittenManifests, (Iterable)deletesResult.rewrittenManifests());
        Iterables.addAll((Collection)addedManifests, (Iterable)deletesResult.addedManifests());
        if (rewrittenManifests.isEmpty()) {
            return EMPTY_RESULT;
        }
        this.replaceManifests(rewrittenManifests, addedManifests);
        return ImmutableRewriteManifests.Result.builder().rewrittenManifests((Iterable)rewrittenManifests).addedManifests((Iterable)addedManifests).build();
    }

    private RewriteManifests.Result rewriteManifests(ManifestContent content) {
        List<ManifestFile> matchingManifests = this.findMatchingManifests(content);
        if (matchingManifests.isEmpty()) {
            return EMPTY_RESULT;
        }
        int targetNumManifests = this.targetNumManifests(this.totalSizeBytes(matchingManifests));
        if (targetNumManifests == 1 && matchingManifests.size() == 1) {
            return EMPTY_RESULT;
        }
        Dataset<Row> manifestEntryDF = this.buildManifestEntryDF(matchingManifests);
        List<ManifestFile> newManifests = this.spec.isUnpartitioned() ? this.writeUnpartitionedManifests(content, manifestEntryDF, targetNumManifests) : this.writePartitionedManifests(content, manifestEntryDF, targetNumManifests);
        return ImmutableRewriteManifests.Result.builder().rewrittenManifests(matchingManifests).addedManifests(newManifests).build();
    }

    private Dataset<Row> buildManifestEntryDF(List<ManifestFile> manifests) {
        Dataset manifestDF = this.spark().createDataset(Lists.transform(manifests, ManifestFile::path), Encoders.STRING()).toDF(new String[]{"manifest"});
        Dataset manifestEntryDF = this.loadMetadataTable(this.table, MetadataTableType.ENTRIES).filter("status < 2").selectExpr(new String[]{"input_file_name() as manifest", "snapshot_id", "sequence_number", "file_sequence_number", "data_file"});
        Column joinCond = manifestDF.col("manifest").equalTo((Object)manifestEntryDF.col("manifest"));
        return manifestEntryDF.join(manifestDF, joinCond, "left_semi").select("snapshot_id", new String[]{"sequence_number", "file_sequence_number", "data_file"});
    }

    private List<ManifestFile> writeUnpartitionedManifests(ManifestContent content, Dataset<Row> manifestEntryDF, int numManifests) {
        WriteManifests<?> writeFunc = this.newWriteManifestsFunc(content, manifestEntryDF.schema());
        Dataset transformedManifestEntryDF = manifestEntryDF.repartition(numManifests);
        return writeFunc.apply((Dataset<Row>)transformedManifestEntryDF).collectAsList();
    }

    private List<ManifestFile> writePartitionedManifests(ManifestContent content, Dataset<Row> manifestEntryDF, int numManifests) {
        return this.withReusableDS(manifestEntryDF, df -> {
            WriteManifests<?> writeFunc = this.newWriteManifestsFunc(content, df.schema());
            Column partitionColumn = df.col("data_file.partition");
            Dataset<Row> transformedDF = this.repartitionAndSort((Dataset<Row>)df, partitionColumn, numManifests);
            return writeFunc.apply(transformedDF).collectAsList();
        });
    }

    private WriteManifests<?> newWriteManifestsFunc(ManifestContent content, StructType sparkType) {
        ManifestWriterFactory writers = this.manifestWriters();
        StructType sparkFileType = (StructType)sparkType.apply("data_file").dataType();
        Types.StructType combinedFileType = DataFile.getType((Types.StructType)Partitioning.partitionType((Table)this.table));
        Types.StructType fileType = DataFile.getType((Types.StructType)this.spec.partitionType());
        if (content == ManifestContent.DATA) {
            return new WriteDataManifests(writers, combinedFileType, fileType, sparkFileType);
        }
        return new WriteDeleteManifests(writers, combinedFileType, fileType, sparkFileType);
    }

    private Dataset<Row> repartitionAndSort(Dataset<Row> df, Column col, int numPartitions) {
        return df.repartitionByRange(numPartitions, new Column[]{col}).sortWithinPartitions(new Column[]{col});
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private <T, U> U withReusableDS(Dataset<T> ds, Function<Dataset<T>, U> func) {
        boolean useCaching = PropertyUtil.propertyAsBoolean(this.options(), (String)USE_CACHING, (boolean)false);
        Dataset reusableDS = useCaching ? ds.cache() : ds;
        try {
            U u = func.apply(reusableDS);
            return u;
        }
        finally {
            if (useCaching) {
                reusableDS.unpersist(false);
            }
        }
    }

    private List<ManifestFile> findMatchingManifests(ManifestContent content) {
        Snapshot currentSnapshot = this.table.currentSnapshot();
        if (currentSnapshot == null) {
            return ImmutableList.of();
        }
        List<ManifestFile> manifests = this.loadManifests(content, currentSnapshot);
        return manifests.stream().filter(manifest -> manifest.partitionSpecId() == this.spec.specId() && this.predicate.test((ManifestFile)manifest)).collect(Collectors.toList());
    }

    private List<ManifestFile> loadManifests(ManifestContent content, Snapshot snapshot) {
        switch (content) {
            case DATA: {
                return snapshot.dataManifests(this.table.io());
            }
            case DELETES: {
                return snapshot.deleteManifests(this.table.io());
            }
        }
        throw new IllegalArgumentException("Unknown manifest content: " + content);
    }

    private int targetNumManifests(long totalSizeBytes) {
        return (int)((totalSizeBytes + this.targetManifestSizeBytes - 1L) / this.targetManifestSizeBytes);
    }

    private long totalSizeBytes(Iterable<ManifestFile> manifests) {
        long totalSizeBytes = 0L;
        for (ManifestFile manifest : manifests) {
            ValidationException.check((boolean)this.hasFileCounts(manifest), (String)"No file counts in manifest: %s", (Object[])new Object[]{manifest.path()});
            totalSizeBytes += manifest.length();
        }
        return totalSizeBytes;
    }

    private boolean hasFileCounts(ManifestFile manifest) {
        return manifest.addedFilesCount() != null && manifest.existingFilesCount() != null && manifest.deletedFilesCount() != null;
    }

    private void replaceManifests(Iterable<ManifestFile> deletedManifests, Iterable<ManifestFile> addedManifests) {
        try {
            RewriteManifests rewriteManifests = this.table.rewriteManifests();
            deletedManifests.forEach(arg_0 -> ((RewriteManifests)rewriteManifests).deleteManifest(arg_0));
            addedManifests.forEach(arg_0 -> ((RewriteManifests)rewriteManifests).addManifest(arg_0));
            this.commit((SnapshotUpdate<?>)rewriteManifests);
            if (this.shouldStageManifests) {
                this.deleteFiles(Iterables.transform(addedManifests, ManifestFile::path));
            }
        }
        catch (CommitStateUnknownException commitStateUnknownException) {
            throw commitStateUnknownException;
        }
        catch (Exception e) {
            this.deleteFiles(Iterables.transform(addedManifests, ManifestFile::path));
            throw e;
        }
    }

    private void deleteFiles(Iterable<String> locations) {
        Tasks.foreach(locations).executeWith(ThreadPools.getWorkerPool()).noRetry().suppressFailureWhenFinished().onFailure((location, exc) -> LOG.warn("Failed to delete: {}", location, (Object)exc)).run(location -> this.table.io().deleteFile(location));
    }

    private ManifestWriterFactory manifestWriters() {
        return new ManifestWriterFactory((Broadcast<Table>)this.sparkContext().broadcast((Object)SerializableTableWithSize.copyOf(this.table)), this.formatVersion, this.spec.specId(), this.outputLocation, (long)(1.2 * (double)this.targetManifestSizeBytes));
    }

    private static class ManifestWriterFactory
    implements Serializable {
        private final Broadcast<Table> tableBroadcast;
        private final int formatVersion;
        private final int specId;
        private final String outputLocation;
        private final long maxManifestSizeBytes;

        ManifestWriterFactory(Broadcast<Table> tableBroadcast, int formatVersion, int specId, String outputLocation, long maxManifestSizeBytes) {
            this.tableBroadcast = tableBroadcast;
            this.formatVersion = formatVersion;
            this.specId = specId;
            this.outputLocation = outputLocation;
            this.maxManifestSizeBytes = maxManifestSizeBytes;
        }

        public RollingManifestWriter<DataFile> newRollingManifestWriter() {
            return new RollingManifestWriter(this::newManifestWriter, this.maxManifestSizeBytes);
        }

        private ManifestWriter<DataFile> newManifestWriter() {
            return ManifestFiles.write((int)this.formatVersion, (PartitionSpec)this.spec(), (OutputFile)this.newOutputFile(), null);
        }

        public RollingManifestWriter<DeleteFile> newRollingDeleteManifestWriter() {
            return new RollingManifestWriter(this::newDeleteManifestWriter, this.maxManifestSizeBytes);
        }

        private ManifestWriter<DeleteFile> newDeleteManifestWriter() {
            return ManifestFiles.writeDeleteManifest((int)this.formatVersion, (PartitionSpec)this.spec(), (OutputFile)this.newOutputFile(), null);
        }

        private PartitionSpec spec() {
            return (PartitionSpec)this.table().specs().get(this.specId);
        }

        private OutputFile newOutputFile() {
            return this.table().io().newOutputFile(this.newManifestLocation());
        }

        private String newManifestLocation() {
            String fileName = FileFormat.AVRO.addExtension("optimized-m-" + UUID.randomUUID());
            Path filePath = new Path(this.outputLocation, fileName);
            return filePath.toString();
        }

        private Table table() {
            return (Table)this.tableBroadcast.value();
        }
    }

    private static abstract class WriteManifests<F extends ContentFile<F>>
    implements MapPartitionsFunction<Row, ManifestFile> {
        private static final Encoder<ManifestFile> MANIFEST_ENCODER = Encoders.javaSerialization(ManifestFile.class);
        private final ManifestWriterFactory writers;
        private final Types.StructType combinedFileType;
        private final Types.StructType fileType;
        private final StructType sparkFileType;

        WriteManifests(ManifestWriterFactory writers, Types.StructType combinedFileType, Types.StructType fileType, StructType sparkFileType) {
            this.writers = writers;
            this.combinedFileType = combinedFileType;
            this.fileType = fileType;
            this.sparkFileType = sparkFileType;
        }

        protected abstract SparkContentFile<F> newFileWrapper();

        protected abstract RollingManifestWriter<F> newManifestWriter();

        public Dataset<ManifestFile> apply(Dataset<Row> input) {
            return input.mapPartitions((MapPartitionsFunction)this, MANIFEST_ENCODER);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public Iterator<ManifestFile> call(Iterator<Row> rows) throws Exception {
            SparkContentFile<F> fileWrapper = this.newFileWrapper();
            try (RollingManifestWriter<F> writer = this.newManifestWriter();){
                while (rows.hasNext()) {
                    Row row = rows.next();
                    long snapshotId = row.getLong(0);
                    long sequenceNumber = row.getLong(1);
                    Long fileSequenceNumber = row.isNullAt(2) ? null : Long.valueOf(row.getLong(2));
                    Row file = row.getStruct(3);
                    writer.existing((ContentFile)fileWrapper.wrap(file), snapshotId, sequenceNumber, fileSequenceNumber);
                }
            }
            return writer.toManifestFiles().iterator();
        }

        protected ManifestWriterFactory writers() {
            return this.writers;
        }

        protected Types.StructType combinedFileType() {
            return this.combinedFileType;
        }

        protected Types.StructType fileType() {
            return this.fileType;
        }

        protected StructType sparkFileType() {
            return this.sparkFileType;
        }
    }

    private static class WriteDeleteManifests
    extends WriteManifests<DeleteFile> {
        WriteDeleteManifests(ManifestWriterFactory manifestWriters, Types.StructType combinedFileType, Types.StructType fileType, StructType sparkFileType) {
            super(manifestWriters, combinedFileType, fileType, sparkFileType);
        }

        protected SparkDeleteFile newFileWrapper() {
            return new SparkDeleteFile(this.combinedFileType(), this.fileType(), this.sparkFileType());
        }

        @Override
        protected RollingManifestWriter<DeleteFile> newManifestWriter() {
            return this.writers().newRollingDeleteManifestWriter();
        }
    }

    private static class WriteDataManifests
    extends WriteManifests<DataFile> {
        WriteDataManifests(ManifestWriterFactory manifestWriters, Types.StructType combinedPartitionType, Types.StructType partitionType, StructType sparkFileType) {
            super(manifestWriters, combinedPartitionType, partitionType, sparkFileType);
        }

        protected SparkDataFile newFileWrapper() {
            return new SparkDataFile(this.combinedFileType(), this.fileType(), this.sparkFileType());
        }

        @Override
        protected RollingManifestWriter<DataFile> newManifestWriter() {
            return this.writers().newRollingManifestWriter();
        }
    }
}

