/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import org.apache.iceberg.ContentFile;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.DeleteFileIndex;
import org.apache.iceberg.FileContent;
import org.apache.iceberg.GenericManifestFile;
import org.apache.iceberg.ManifestContent;
import org.apache.iceberg.ManifestEntry;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.ManifestFiles;
import org.apache.iceberg.ManifestFilterManager;
import org.apache.iceberg.ManifestGroup;
import org.apache.iceberg.ManifestMergeManager;
import org.apache.iceberg.ManifestReader;
import org.apache.iceberg.ManifestWriter;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.SnapshotProducer;
import org.apache.iceberg.SnapshotSummary;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableOperations;
import org.apache.iceberg.encryption.EncryptedOutputFile;
import org.apache.iceberg.events.CreateSnapshotEvent;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.base.Predicate;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Iterators;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.util.CharSequenceSet;
import org.apache.iceberg.util.ContentFileUtil;
import org.apache.iceberg.util.DataFileSet;
import org.apache.iceberg.util.DeleteFileSet;
import org.apache.iceberg.util.Pair;
import org.apache.iceberg.util.PartitionSet;
import org.apache.iceberg.util.SnapshotUtil;
import org.apache.iceberg.util.Tasks;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

abstract class MergingSnapshotProducer<ThisT>
extends SnapshotProducer<ThisT> {
    private static final Logger LOG = LoggerFactory.getLogger(MergingSnapshotProducer.class);
    private static final Set<String> VALIDATE_ADDED_FILES_OPERATIONS = ImmutableSet.of("append", "overwrite");
    private static final Set<String> VALIDATE_DATA_FILES_EXIST_OPERATIONS = ImmutableSet.of("overwrite", "replace", "delete");
    private static final Set<String> VALIDATE_DATA_FILES_EXIST_SKIP_DELETE_OPERATIONS = ImmutableSet.of("overwrite", "replace");
    private static final Set<String> VALIDATE_ADDED_DELETE_FILES_OPERATIONS = ImmutableSet.of("overwrite", "delete");
    private static final Set<String> VALIDATE_ADDED_DVS_OPERATIONS = ImmutableSet.of("overwrite", "delete", "replace");
    private final String tableName;
    private final SnapshotSummary.Builder summaryBuilder = SnapshotSummary.builder();
    private final ManifestMergeManager<DataFile> mergeManager;
    private final ManifestFilterManager<DataFile> filterManager;
    private final ManifestMergeManager<DeleteFile> deleteMergeManager;
    private final ManifestFilterManager<DeleteFile> deleteFilterManager;
    private final Map<Integer, DataFileSet> newDataFilesBySpec = Maps.newHashMap();
    private Long newDataFilesDataSequenceNumber;
    private final Map<Integer, DeleteFileSet> newDeleteFilesBySpec = Maps.newHashMap();
    private final Set<String> newDVRefs = Sets.newHashSet();
    private final List<ManifestFile> appendManifests = Lists.newArrayList();
    private final List<ManifestFile> rewrittenAppendManifests = Lists.newArrayList();
    private final SnapshotSummary.Builder addedFilesSummary = SnapshotSummary.builder();
    private final SnapshotSummary.Builder appendedManifestsSummary = SnapshotSummary.builder();
    private Expression deleteExpression = Expressions.alwaysFalse();
    private final List<ManifestFile> cachedNewDataManifests = Lists.newLinkedList();
    private boolean hasNewDataFiles = false;
    private final List<ManifestFile> cachedNewDeleteManifests = Lists.newLinkedList();
    private boolean hasNewDeleteFiles = false;
    private boolean caseSensitive = true;

    MergingSnapshotProducer(String tableName, TableOperations ops) {
        super(ops);
        this.tableName = tableName;
        long targetSizeBytes = ops.current().propertyAsLong("commit.manifest.target-size-bytes", 0x800000L);
        int minCountToMerge = ops.current().propertyAsInt("commit.manifest.min-count-to-merge", 100);
        boolean mergeEnabled = ops.current().propertyAsBoolean("commit.manifest-merge.enabled", true);
        this.mergeManager = new DataFileMergeManager(targetSizeBytes, minCountToMerge, mergeEnabled);
        this.filterManager = new DataFileFilterManager();
        this.deleteMergeManager = new DeleteFileMergeManager(targetSizeBytes, minCountToMerge, mergeEnabled);
        this.deleteFilterManager = new DeleteFileFilterManager();
    }

    @Override
    public ThisT set(String property, String value) {
        this.summaryBuilder.set(property, value);
        return this.self();
    }

    public ThisT caseSensitive(boolean isCaseSensitive) {
        this.caseSensitive = isCaseSensitive;
        this.filterManager.caseSensitive(isCaseSensitive);
        this.deleteFilterManager.caseSensitive(isCaseSensitive);
        return this.self();
    }

    protected boolean isCaseSensitive() {
        return this.caseSensitive;
    }

    protected PartitionSpec dataSpec() {
        Set<Integer> specIds = this.newDataFilesBySpec.keySet();
        Preconditions.checkState(!specIds.isEmpty(), "Cannot determine partition specs: no data files have been added");
        Preconditions.checkState(specIds.size() == 1, "Cannot return a single partition spec: data files with different partition specs have been added");
        return this.spec(Iterables.getOnlyElement(specIds));
    }

    protected Expression rowFilter() {
        return this.deleteExpression;
    }

    protected List<DataFile> addedDataFiles() {
        return this.newDataFilesBySpec.values().stream().flatMap(Collection::stream).collect(ImmutableList.toImmutableList());
    }

    protected void failAnyDelete() {
        this.filterManager.failAnyDelete();
        this.deleteFilterManager.failAnyDelete();
    }

    protected void failMissingDeletePaths() {
        this.filterManager.failMissingDeletePaths();
        this.deleteFilterManager.failMissingDeletePaths();
    }

    protected void deleteByRowFilter(Expression expr) {
        this.deleteExpression = expr;
        this.filterManager.deleteByRowFilter(expr);
        this.deleteFilterManager.deleteByRowFilter(expr);
    }

    protected void dropPartition(int specId, StructLike partition) {
        this.filterManager.dropPartition(specId, partition);
        this.deleteFilterManager.dropPartition(specId, partition);
    }

    protected void delete(DataFile file) {
        this.filterManager.delete(file);
    }

    protected void delete(DeleteFile file) {
        this.deleteFilterManager.delete(file);
    }

    protected void delete(CharSequence path) {
        this.filterManager.delete(path);
    }

    protected boolean deletesDataFiles() {
        return this.filterManager.containsDeletes();
    }

    protected boolean deletesDeleteFiles() {
        return this.deleteFilterManager.containsDeletes();
    }

    protected boolean addsDataFiles() {
        return !this.newDataFilesBySpec.isEmpty();
    }

    protected boolean addsDeleteFiles() {
        return !this.newDeleteFilesBySpec.isEmpty();
    }

    protected void add(DataFile file) {
        Preconditions.checkNotNull(file, "Invalid data file: null");
        PartitionSpec spec = this.spec(file.specId());
        Preconditions.checkArgument(spec != null, "Cannot find partition spec %s for data file: %s", file.specId(), (Object)file.location());
        DataFileSet dataFiles = this.newDataFilesBySpec.computeIfAbsent(spec.specId(), ignored -> DataFileSet.create());
        if (dataFiles.add(file)) {
            this.addedFilesSummary.addedFile(spec, file);
            this.hasNewDataFiles = true;
        }
    }

    private PartitionSpec spec(int specId) {
        return this.ops().current().spec(specId);
    }

    protected void add(DeleteFile file) {
        this.validateNewDeleteFile(file);
        this.add(new SnapshotProducer.PendingDeleteFile(file));
    }

    protected void add(DeleteFile file, long dataSequenceNumber) {
        this.validateNewDeleteFile(file);
        this.add(new SnapshotProducer.PendingDeleteFile(file, dataSequenceNumber));
    }

    private void add(SnapshotProducer.PendingDeleteFile file) {
        PartitionSpec spec = this.spec(file.specId());
        Preconditions.checkArgument(spec != null, "Cannot find partition spec %s for delete file: %s", file.specId(), (Object)file.location());
        DeleteFileSet deleteFiles = this.newDeleteFilesBySpec.computeIfAbsent(spec.specId(), ignored -> DeleteFileSet.create());
        if (deleteFiles.add(file)) {
            this.addedFilesSummary.addedFile(spec, file);
            this.hasNewDeleteFiles = true;
            if (ContentFileUtil.isDV(file)) {
                this.newDVRefs.add(file.referencedDataFile());
            }
        }
    }

    protected void validateNewDeleteFile(DeleteFile file) {
        Preconditions.checkNotNull(file, "Invalid delete file: null");
        switch (this.formatVersion()) {
            case 1: {
                throw new IllegalArgumentException("Deletes are supported in V2 and above");
            }
            case 2: {
                Preconditions.checkArgument(file.content() == FileContent.EQUALITY_DELETES || !ContentFileUtil.isDV(file), "Must not use DVs for position deletes in V2: %s", (Object)ContentFileUtil.dvDesc(file));
                break;
            }
            case 3: {
                Preconditions.checkArgument(file.content() == FileContent.EQUALITY_DELETES || ContentFileUtil.isDV(file), "Must use DVs for position deletes in V%s: %s", this.formatVersion(), (Object)file.location());
                break;
            }
            default: {
                throw new IllegalArgumentException("Unsupported format version: " + this.formatVersion());
            }
        }
    }

    private int formatVersion() {
        return this.ops().current().formatVersion();
    }

    protected void add(ManifestFile manifest) {
        Preconditions.checkArgument(manifest.content() == ManifestContent.DATA, "Cannot append delete manifest: %s", (Object)manifest);
        if (this.canInheritSnapshotId() && manifest.snapshotId() == null) {
            this.appendedManifestsSummary.addedManifest(manifest);
            this.appendManifests.add(manifest);
        } else {
            ManifestFile copiedManifest = this.copyManifest(manifest);
            this.rewrittenAppendManifests.add(copiedManifest);
        }
    }

    private ManifestFile copyManifest(ManifestFile manifest) {
        TableMetadata current = this.ops().current();
        InputFile toCopy = this.ops().io().newInputFile(manifest);
        EncryptedOutputFile newManifestFile = this.newManifestOutputFile();
        return ManifestFiles.copyAppendManifest(current.formatVersion(), manifest.partitionSpecId(), toCopy, current.specsById(), newManifestFile, this.snapshotId(), this.appendedManifestsSummary);
    }

    protected void validateAddedDataFiles(TableMetadata base, Long startingSnapshotId, PartitionSet partitionSet, Snapshot parent) {
        CloseableIterable<ManifestEntry<DataFile>> conflictEntries = this.addedDataFiles(base, startingSnapshotId, null, partitionSet, parent);
        try (Iterator conflicts = conflictEntries.iterator();){
            if (conflicts.hasNext()) {
                throw new ValidationException("Found conflicting files that can contain records matching partitions %s: %s", partitionSet, Iterators.toString(Iterators.transform(conflicts, entry -> ((DataFile)entry.file()).location().toString())));
            }
        }
        catch (IOException e) {
            throw new UncheckedIOException(String.format("Failed to validate no appends matching %s", partitionSet), e);
        }
    }

    protected void validateAddedDataFiles(TableMetadata base, Long startingSnapshotId, Expression conflictDetectionFilter, Snapshot parent) {
        CloseableIterable<ManifestEntry<DataFile>> conflictEntries = this.addedDataFiles(base, startingSnapshotId, conflictDetectionFilter, null, parent);
        try (Iterator conflicts = conflictEntries.iterator();){
            if (conflicts.hasNext()) {
                throw new ValidationException("Found conflicting files that can contain records matching %s: %s", conflictDetectionFilter, Iterators.toString(Iterators.transform(conflicts, entry -> ((DataFile)entry.file()).location().toString())));
            }
        }
        catch (IOException e) {
            throw new UncheckedIOException(String.format("Failed to validate no appends matching %s", conflictDetectionFilter), e);
        }
    }

    private CloseableIterable<ManifestEntry<DataFile>> addedDataFiles(TableMetadata base, Long startingSnapshotId, Expression dataFilter, PartitionSet partitionSet, Snapshot parent) {
        if (parent == null) {
            return CloseableIterable.empty();
        }
        Pair<List<ManifestFile>, Set<Long>> history = this.validationHistory(base, startingSnapshotId, VALIDATE_ADDED_FILES_OPERATIONS, ManifestContent.DATA, parent);
        List<ManifestFile> manifests = history.first();
        Set<Long> newSnapshots = history.second();
        ManifestGroup manifestGroup = new ManifestGroup(this.ops().io(), manifests, ImmutableList.of()).caseSensitive(this.caseSensitive).filterManifestEntries(entry -> newSnapshots.contains(entry.snapshotId())).specsById(base.specsById()).ignoreDeleted().ignoreExisting();
        if (dataFilter != null) {
            manifestGroup = manifestGroup.filterData(dataFilter);
        }
        if (partitionSet != null) {
            manifestGroup = manifestGroup.filterManifestEntries(entry -> partitionSet.contains(((DataFile)entry.file()).specId(), ((DataFile)entry.file()).partition()));
        }
        return manifestGroup.entries();
    }

    protected void validateNoNewDeletesForDataFiles(TableMetadata base, Long startingSnapshotId, Iterable<DataFile> dataFiles, Snapshot parent) {
        this.validateNoNewDeletesForDataFiles(base, startingSnapshotId, null, dataFiles, this.newDataFilesDataSequenceNumber != null, parent);
    }

    protected void validateNoNewDeletesForDataFiles(TableMetadata base, Long startingSnapshotId, Expression dataFilter, Iterable<DataFile> dataFiles, Snapshot parent) {
        this.validateNoNewDeletesForDataFiles(base, startingSnapshotId, dataFilter, dataFiles, false, parent);
    }

    private void validateNoNewDeletesForDataFiles(TableMetadata base, Long startingSnapshotId, Expression dataFilter, Iterable<DataFile> dataFiles, boolean ignoreEqualityDeletes, Snapshot parent) {
        if (parent == null || base.formatVersion() < 2) {
            return;
        }
        DeleteFileIndex deletes = this.addedDeleteFiles(base, startingSnapshotId, dataFilter, null, parent);
        long startingSequenceNumber = this.startingSequenceNumber(base, startingSnapshotId);
        for (DataFile dataFile : dataFiles) {
            DeleteFile[] deleteFiles = deletes.forDataFile(startingSequenceNumber, dataFile);
            if (ignoreEqualityDeletes) {
                ValidationException.check(Arrays.stream(deleteFiles).noneMatch(deleteFile -> deleteFile.content() == FileContent.POSITION_DELETES), "Cannot commit, found new position delete for replaced data file: %s", dataFile);
                continue;
            }
            ValidationException.check(deleteFiles.length == 0, "Cannot commit, found new delete for replaced data file: %s", dataFile);
        }
    }

    protected void validateNoNewDeleteFiles(TableMetadata base, Long startingSnapshotId, Expression dataFilter, Snapshot parent) {
        DeleteFileIndex deletes = this.addedDeleteFiles(base, startingSnapshotId, dataFilter, null, parent);
        ValidationException.check(deletes.isEmpty(), "Found new conflicting delete files that can apply to records matching %s: %s", dataFilter, Iterables.transform(deletes.referencedDeleteFiles(), ContentFile::location));
    }

    protected void validateNoNewDeleteFiles(TableMetadata base, Long startingSnapshotId, PartitionSet partitionSet, Snapshot parent) {
        DeleteFileIndex deletes = this.addedDeleteFiles(base, startingSnapshotId, null, partitionSet, parent);
        ValidationException.check(deletes.isEmpty(), "Found new conflicting delete files that can apply to records matching %s: %s", partitionSet, Iterables.transform(deletes.referencedDeleteFiles(), ContentFile::location));
    }

    protected DeleteFileIndex addedDeleteFiles(TableMetadata base, Long startingSnapshotId, Expression dataFilter, PartitionSet partitionSet, Snapshot parent) {
        if (parent == null || base.formatVersion() < 2) {
            return DeleteFileIndex.builderFor(this.ops().io(), ImmutableList.of()).specsById(base.specsById()).build();
        }
        Pair<List<ManifestFile>, Set<Long>> history = this.validationHistory(base, startingSnapshotId, VALIDATE_ADDED_DELETE_FILES_OPERATIONS, ManifestContent.DELETES, parent);
        List<ManifestFile> deleteManifests = history.first();
        long startingSequenceNumber = this.startingSequenceNumber(base, startingSnapshotId);
        return this.buildDeleteFileIndex(deleteManifests, startingSequenceNumber, dataFilter, partitionSet);
    }

    protected void validateDeletedDataFiles(TableMetadata base, Long startingSnapshotId, Expression dataFilter, Snapshot parent) {
        CloseableIterable<ManifestEntry<DataFile>> conflictEntries = this.deletedDataFiles(base, startingSnapshotId, dataFilter, null, parent);
        try (Iterator conflicts = conflictEntries.iterator();){
            if (conflicts.hasNext()) {
                throw new ValidationException("Found conflicting deleted files that can contain records matching %s: %s", dataFilter, Iterators.toString(Iterators.transform(conflicts, entry -> ((DataFile)entry.file()).location().toString())));
            }
        }
        catch (IOException e) {
            throw new UncheckedIOException(String.format("Failed to validate no deleted data files matching %s", dataFilter), e);
        }
    }

    protected void validateDeletedDataFiles(TableMetadata base, Long startingSnapshotId, PartitionSet partitionSet, Snapshot parent) {
        CloseableIterable<ManifestEntry<DataFile>> conflictEntries = this.deletedDataFiles(base, startingSnapshotId, null, partitionSet, parent);
        try (Iterator conflicts = conflictEntries.iterator();){
            if (conflicts.hasNext()) {
                throw new ValidationException("Found conflicting deleted files that can apply to records matching %s: %s", partitionSet, Iterators.toString(Iterators.transform(conflicts, entry -> ((DataFile)entry.file()).location().toString())));
            }
        }
        catch (IOException e) {
            throw new UncheckedIOException(String.format("Failed to validate no appends matching %s", partitionSet), e);
        }
    }

    private CloseableIterable<ManifestEntry<DataFile>> deletedDataFiles(TableMetadata base, Long startingSnapshotId, Expression dataFilter, PartitionSet partitionSet, Snapshot parent) {
        if (parent == null) {
            return CloseableIterable.empty();
        }
        Pair<List<ManifestFile>, Set<Long>> history = this.validationHistory(base, startingSnapshotId, VALIDATE_DATA_FILES_EXIST_OPERATIONS, ManifestContent.DATA, parent);
        List<ManifestFile> manifests = history.first();
        Set<Long> newSnapshots = history.second();
        ManifestGroup manifestGroup = new ManifestGroup(this.ops().io(), manifests, ImmutableList.of()).caseSensitive(this.caseSensitive).filterManifestEntries(entry -> newSnapshots.contains(entry.snapshotId())).filterManifestEntries(entry -> entry.status().equals((Object)ManifestEntry.Status.DELETED)).specsById(base.specsById()).ignoreExisting();
        if (dataFilter != null) {
            manifestGroup = manifestGroup.filterData(dataFilter);
        }
        if (partitionSet != null) {
            manifestGroup = manifestGroup.filterManifestEntries(entry -> partitionSet.contains(((DataFile)entry.file()).specId(), ((DataFile)entry.file()).partition()));
        }
        return manifestGroup.entries();
    }

    protected void setNewDataFilesDataSequenceNumber(long sequenceNumber) {
        this.newDataFilesDataSequenceNumber = sequenceNumber;
    }

    private long startingSequenceNumber(TableMetadata metadata, Long startingSnapshotId) {
        if (startingSnapshotId != null && metadata.snapshot(startingSnapshotId) != null) {
            Snapshot startingSnapshot = metadata.snapshot(startingSnapshotId);
            return startingSnapshot.sequenceNumber();
        }
        return 0L;
    }

    private DeleteFileIndex buildDeleteFileIndex(List<ManifestFile> deleteManifests, long startingSequenceNumber, Expression dataFilter, PartitionSet partitionSet) {
        DeleteFileIndex.Builder builder = DeleteFileIndex.builderFor(this.ops().io(), deleteManifests).afterSequenceNumber(startingSequenceNumber).caseSensitive(this.caseSensitive).specsById(this.ops().current().specsById());
        if (dataFilter != null) {
            builder.filterData(dataFilter);
        }
        if (partitionSet != null) {
            builder.filterPartitions(partitionSet);
        }
        return builder.build();
    }

    protected void validateDataFilesExist(TableMetadata base, Long startingSnapshotId, CharSequenceSet requiredDataFiles, boolean skipDeletes, Expression conflictDetectionFilter, Snapshot parent) {
        if (parent == null) {
            return;
        }
        Set<String> matchingOperations = skipDeletes ? VALIDATE_DATA_FILES_EXIST_SKIP_DELETE_OPERATIONS : VALIDATE_DATA_FILES_EXIST_OPERATIONS;
        Pair<List<ManifestFile>, Set<Long>> history = this.validationHistory(base, startingSnapshotId, matchingOperations, ManifestContent.DATA, parent);
        List<ManifestFile> manifests = history.first();
        Set<Long> newSnapshots = history.second();
        ManifestGroup matchingDeletesGroup = new ManifestGroup(this.ops().io(), manifests, ImmutableList.of()).filterManifestEntries(entry -> entry.status() != ManifestEntry.Status.ADDED && newSnapshots.contains(entry.snapshotId()) && requiredDataFiles.contains(((DataFile)entry.file()).location())).specsById(base.specsById()).ignoreExisting();
        if (conflictDetectionFilter != null) {
            matchingDeletesGroup.filterData(conflictDetectionFilter);
        }
        try (Iterator deletes = matchingDeletesGroup.entries().iterator();){
            if (deletes.hasNext()) {
                throw new ValidationException("Cannot commit, missing data files: %s", Iterators.toString(Iterators.transform(deletes, entry -> ((DataFile)entry.file()).location().toString())));
            }
        }
        catch (IOException e) {
            throw new UncheckedIOException("Failed to validate required files exist", e);
        }
    }

    protected void validateAddedDVs(TableMetadata base, Long startingSnapshotId, Expression conflictDetectionFilter, Snapshot parent) {
        if (parent == null || this.newDVRefs.isEmpty()) {
            return;
        }
        Pair<List<ManifestFile>, Set<Long>> history = this.validationHistory(base, startingSnapshotId, VALIDATE_ADDED_DVS_OPERATIONS, ManifestContent.DELETES, parent);
        List<ManifestFile> newDeleteManifests = history.first();
        Set<Long> newSnapshotIds = history.second();
        Tasks.foreach(newDeleteManifests).stopOnFailure().throwFailureWhenFinished().executeWith(this.workerPool()).run(manifest -> this.validateAddedDVs((ManifestFile)manifest, conflictDetectionFilter, newSnapshotIds));
    }

    private void validateAddedDVs(ManifestFile manifest, Expression conflictDetectionFilter, Set<Long> newSnapshotIds) {
        try (CloseableIterable<ManifestEntry<DeleteFile>> entries = ManifestFiles.readDeleteManifest(manifest, this.ops().io(), this.ops().current().specsById()).filterRows(conflictDetectionFilter).caseSensitive(this.caseSensitive).liveEntries();){
            for (ManifestEntry manifestEntry : entries) {
                DeleteFile file = (DeleteFile)manifestEntry.file();
                if (!newSnapshotIds.contains(manifestEntry.snapshotId()) || !ContentFileUtil.isDV(file)) continue;
                ValidationException.check(!this.newDVRefs.contains(file.referencedDataFile()), "Found concurrently added DV for %s: %s", file.referencedDataFile(), ContentFileUtil.dvDesc(file));
            }
        }
        catch (IOException e) {
            throw new UncheckedIOException(e);
        }
    }

    private Pair<List<ManifestFile>, Set<Long>> validationHistory(TableMetadata base, Long startingSnapshotId, Set<String> matchingOperations, ManifestContent content, Snapshot parent) {
        ArrayList<ManifestFile> manifests = Lists.newArrayList();
        HashSet<Long> newSnapshots = Sets.newHashSet();
        Snapshot lastSnapshot = null;
        Iterable<Snapshot> snapshots = SnapshotUtil.ancestorsBetween(parent.snapshotId(), startingSnapshotId, base::snapshot);
        Iterator<Snapshot> iterator = snapshots.iterator();
        while (iterator.hasNext()) {
            Snapshot currentSnapshot;
            lastSnapshot = currentSnapshot = iterator.next();
            if (!matchingOperations.contains(currentSnapshot.operation())) continue;
            newSnapshots.add(currentSnapshot.snapshotId());
            if (content == ManifestContent.DATA) {
                for (ManifestFile manifest : currentSnapshot.dataManifests(this.ops().io())) {
                    if (manifest.snapshotId().longValue() != currentSnapshot.snapshotId()) continue;
                    manifests.add(manifest);
                }
                continue;
            }
            for (ManifestFile manifest : currentSnapshot.deleteManifests(this.ops().io())) {
                if (manifest.snapshotId().longValue() != currentSnapshot.snapshotId()) continue;
                manifests.add(manifest);
            }
        }
        ValidationException.check(lastSnapshot == null || Objects.equals(lastSnapshot.parentId(), startingSnapshotId), "Cannot determine history between starting snapshot %s and the last known ancestor %s", startingSnapshotId, lastSnapshot != null ? Long.valueOf(lastSnapshot.snapshotId()) : null);
        return Pair.of(manifests, newSnapshots);
    }

    @Override
    protected Map<String, String> summary() {
        this.summaryBuilder.setPartitionSummaryLimit(this.ops().current().propertyAsInt("write.summary.partition-limit", 0));
        return this.summaryBuilder.build();
    }

    @Override
    public List<ManifestFile> apply(TableMetadata base, Snapshot snapshot) {
        List<ManifestFile> filtered = this.filterManager.filterManifests(SnapshotUtil.schemaFor(base, this.targetBranch()), snapshot != null ? snapshot.dataManifests(this.ops().io()) : null);
        long minDataSequenceNumber = filtered.stream().map(ManifestFile::minSequenceNumber).filter(seq -> seq != -1L).reduce(base.lastSequenceNumber(), Math::min);
        this.deleteFilterManager.dropDeleteFilesOlderThan(minDataSequenceNumber);
        List<ManifestFile> filteredDeletes = this.deleteFilterManager.filterManifests(SnapshotUtil.schemaFor(base, this.targetBranch()), snapshot != null ? snapshot.deleteManifests(this.ops().io()) : null);
        Predicate shouldKeep = manifest -> manifest.hasAddedFiles() || manifest.hasExistingFiles() || manifest.snapshotId().longValue() == this.snapshotId();
        Iterable<ManifestFile> unmergedManifests = Iterables.filter(Iterables.concat(this.prepareNewDataManifests(), filtered), shouldKeep);
        Iterable<ManifestFile> unmergedDeleteManifests = Iterables.filter(Iterables.concat(this.prepareDeleteManifests(), filteredDeletes), shouldKeep);
        this.summaryBuilder.clear();
        this.summaryBuilder.merge(this.addedFilesSummary);
        this.summaryBuilder.merge(this.appendedManifestsSummary);
        this.summaryBuilder.merge(this.filterManager.buildSummary(filtered));
        this.summaryBuilder.merge(this.deleteFilterManager.buildSummary(filteredDeletes));
        ArrayList<ManifestFile> manifests = Lists.newArrayList();
        Iterables.addAll(manifests, this.mergeManager.mergeManifests(unmergedManifests));
        Iterables.addAll(manifests, this.deleteMergeManager.mergeManifests(unmergedDeleteManifests));
        return manifests;
    }

    @Override
    public Object updateEvent() {
        Map<String, String> summary;
        long snapshotId = this.snapshotId();
        Snapshot justSaved = this.ops().refresh().snapshot(snapshotId);
        long sequenceNumber = -1L;
        if (justSaved == null) {
            LOG.warn("Failed to load committed snapshot: omitting sequence number from notifications");
            summary = this.summary();
        } else {
            sequenceNumber = justSaved.sequenceNumber();
            summary = justSaved.summary();
        }
        return new CreateSnapshotEvent(this.tableName, this.operation(), snapshotId, sequenceNumber, summary);
    }

    private void cleanUncommittedAppends(Set<ManifestFile> committed) {
        if (!this.cachedNewDataManifests.isEmpty()) {
            boolean hasDeletes = false;
            for (ManifestFile manifest : this.cachedNewDataManifests) {
                if (committed.contains(manifest)) continue;
                this.deleteFile(manifest.path());
                hasDeletes = true;
            }
            if (hasDeletes) {
                this.cachedNewDataManifests.clear();
            }
        }
        boolean hasDeleteDeletes = false;
        for (ManifestFile cachedNewDeleteManifest : this.cachedNewDeleteManifests) {
            if (committed.contains(cachedNewDeleteManifest)) continue;
            this.deleteFile(cachedNewDeleteManifest.path());
            hasDeleteDeletes = true;
        }
        if (hasDeleteDeletes) {
            this.cachedNewDeleteManifests.clear();
        }
        for (ManifestFile manifest : this.rewrittenAppendManifests) {
            if (committed.contains(manifest)) continue;
            this.deleteFile(manifest.path());
        }
        if (!committed.isEmpty()) {
            for (ManifestFile manifest : this.appendManifests) {
                if (committed.contains(manifest)) continue;
                this.deleteFile(manifest.path());
            }
        }
    }

    @Override
    protected void cleanUncommitted(Set<ManifestFile> committed) {
        this.mergeManager.cleanUncommitted(committed);
        this.filterManager.cleanUncommitted(committed);
        this.deleteMergeManager.cleanUncommitted(committed);
        this.deleteFilterManager.cleanUncommitted(committed);
        this.cleanUncommittedAppends(committed);
    }

    private Iterable<ManifestFile> prepareNewDataManifests() {
        Iterable<ManifestFile> newManifests;
        if (!this.newDataFilesBySpec.isEmpty()) {
            List<ManifestFile> dataFileManifests = this.newDataFilesAsManifests();
            newManifests = Iterables.concat(dataFileManifests, this.appendManifests, this.rewrittenAppendManifests);
        } else {
            newManifests = Iterables.concat(this.appendManifests, this.rewrittenAppendManifests);
        }
        return Iterables.transform(newManifests, manifest -> GenericManifestFile.copyOf(manifest).withSnapshotId(this.snapshotId()).build());
    }

    private List<ManifestFile> newDataFilesAsManifests() {
        if (this.hasNewDataFiles && !this.cachedNewDataManifests.isEmpty()) {
            this.cachedNewDataManifests.forEach(file -> this.deleteFile(file.path()));
            this.cachedNewDataManifests.clear();
        }
        if (this.cachedNewDataManifests.isEmpty()) {
            this.newDataFilesBySpec.forEach((specId, dataFiles) -> {
                List<ManifestFile> newDataManifests = this.writeDataManifests((Collection<DataFile>)dataFiles, this.newDataFilesDataSequenceNumber, this.spec((int)specId));
                this.cachedNewDataManifests.addAll(newDataManifests);
            });
            this.hasNewDataFiles = false;
        }
        return this.cachedNewDataManifests;
    }

    private Iterable<ManifestFile> prepareDeleteManifests() {
        if (this.newDeleteFilesBySpec.isEmpty()) {
            return ImmutableList.of();
        }
        return this.newDeleteFilesAsManifests();
    }

    private List<ManifestFile> newDeleteFilesAsManifests() {
        if (this.hasNewDeleteFiles && !this.cachedNewDeleteManifests.isEmpty()) {
            for (ManifestFile cachedNewDeleteManifest : this.cachedNewDeleteManifests) {
                this.deleteFile(cachedNewDeleteManifest.path());
            }
            this.cachedNewDeleteManifests.clear();
        }
        if (this.cachedNewDeleteManifests.isEmpty()) {
            this.newDeleteFilesBySpec.forEach((specId, deleteFiles) -> {
                PartitionSpec spec = this.ops().current().spec((int)specId);
                List<ManifestFile> newDeleteManifests = this.writeDeleteManifests((Collection<DeleteFile>)deleteFiles, spec);
                this.cachedNewDeleteManifests.addAll(newDeleteManifests);
            });
            this.hasNewDeleteFiles = false;
        }
        return this.cachedNewDeleteManifests;
    }

    private class DeleteFileMergeManager
    extends ManifestMergeManager<DeleteFile> {
        DeleteFileMergeManager(long targetSizeBytes, int minCountToMerge, boolean mergeEnabled) {
            super(targetSizeBytes, minCountToMerge, mergeEnabled, MergingSnapshotProducer.this::workerPool);
        }

        @Override
        protected long snapshotId() {
            return MergingSnapshotProducer.this.snapshotId();
        }

        @Override
        protected PartitionSpec spec(int specId) {
            return MergingSnapshotProducer.this.ops().current().spec(specId);
        }

        @Override
        protected void deleteFile(String location) {
            MergingSnapshotProducer.this.deleteFile(location);
        }

        @Override
        protected ManifestWriter<DeleteFile> newManifestWriter(PartitionSpec manifestSpec) {
            return MergingSnapshotProducer.this.newDeleteManifestWriter(manifestSpec);
        }

        @Override
        protected ManifestReader<DeleteFile> newManifestReader(ManifestFile manifest) {
            return MergingSnapshotProducer.this.newDeleteManifestReader(manifest);
        }
    }

    private class DeleteFileFilterManager
    extends ManifestFilterManager<DeleteFile> {
        private DeleteFileFilterManager() {
            super(MergingSnapshotProducer.this.ops().current().specsById(), MergingSnapshotProducer.this::workerPool);
        }

        @Override
        protected void deleteFile(String location) {
            MergingSnapshotProducer.this.deleteFile(location);
        }

        @Override
        protected ManifestWriter<DeleteFile> newManifestWriter(PartitionSpec manifestSpec) {
            return MergingSnapshotProducer.this.newDeleteManifestWriter(manifestSpec);
        }

        @Override
        protected ManifestReader<DeleteFile> newManifestReader(ManifestFile manifest) {
            return MergingSnapshotProducer.this.newDeleteManifestReader(manifest);
        }

        @Override
        protected Set<DeleteFile> newFileSet() {
            return DeleteFileSet.create();
        }
    }

    private class DataFileMergeManager
    extends ManifestMergeManager<DataFile> {
        DataFileMergeManager(long targetSizeBytes, int minCountToMerge, boolean mergeEnabled) {
            super(targetSizeBytes, minCountToMerge, mergeEnabled, MergingSnapshotProducer.this::workerPool);
        }

        @Override
        protected long snapshotId() {
            return MergingSnapshotProducer.this.snapshotId();
        }

        @Override
        protected PartitionSpec spec(int specId) {
            return MergingSnapshotProducer.this.ops().current().spec(specId);
        }

        @Override
        protected void deleteFile(String location) {
            MergingSnapshotProducer.this.deleteFile(location);
        }

        @Override
        protected ManifestWriter<DataFile> newManifestWriter(PartitionSpec manifestSpec) {
            return MergingSnapshotProducer.this.newManifestWriter(manifestSpec);
        }

        @Override
        protected ManifestReader<DataFile> newManifestReader(ManifestFile manifest) {
            return MergingSnapshotProducer.this.newManifestReader(manifest);
        }
    }

    private class DataFileFilterManager
    extends ManifestFilterManager<DataFile> {
        private DataFileFilterManager() {
            super(MergingSnapshotProducer.this.ops().current().specsById(), MergingSnapshotProducer.this::workerPool);
        }

        @Override
        protected void deleteFile(String location) {
            MergingSnapshotProducer.this.deleteFile(location);
        }

        @Override
        protected ManifestWriter<DataFile> newManifestWriter(PartitionSpec manifestSpec) {
            return MergingSnapshotProducer.this.newManifestWriter(manifestSpec);
        }

        @Override
        protected ManifestReader<DataFile> newManifestReader(ManifestFile manifest) {
            return MergingSnapshotProducer.this.newManifestReader(manifest);
        }

        @Override
        protected Set<DataFile> newFileSet() {
            return DataFileSet.create();
        }
    }
}

