/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.operation;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.paimon.Snapshot;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataFilePathFactory;
import org.apache.paimon.manifest.FileKind;
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.manifest.ManifestFile;
import org.apache.paimon.manifest.ManifestFileMeta;
import org.apache.paimon.manifest.ManifestList;
import org.apache.paimon.operation.FileStoreCommit;
import org.apache.paimon.operation.FileStoreScan;
import org.apache.paimon.operation.Lock;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.CommitMessageImpl;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.RowDataToObjectArrayConverter;
import org.apache.paimon.utils.SnapshotManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FileStoreCommitImpl
implements FileStoreCommit {
    private static final Logger LOG = LoggerFactory.getLogger(FileStoreCommitImpl.class);
    private final FileIO fileIO;
    private final SchemaManager schemaManager;
    private final String commitUser;
    private final RowType partitionType;
    private final RowDataToObjectArrayConverter partitionObjectConverter;
    private final FileStorePathFactory pathFactory;
    private final SnapshotManager snapshotManager;
    private final ManifestFile manifestFile;
    private final ManifestList manifestList;
    private final FileStoreScan scan;
    private final int numBucket;
    private final MemorySize manifestTargetSize;
    private final int manifestMergeMinCount;
    private final boolean dynamicPartitionOverwrite;
    @Nullable
    private final Comparator<InternalRow> keyComparator;
    @Nullable
    private Lock lock;
    private boolean ignoreEmptyCommit;

    public FileStoreCommitImpl(FileIO fileIO, SchemaManager schemaManager, String commitUser, RowType partitionType, FileStorePathFactory pathFactory, SnapshotManager snapshotManager, ManifestFile.Factory manifestFileFactory, ManifestList.Factory manifestListFactory, FileStoreScan scan, int numBucket, MemorySize manifestTargetSize, int manifestMergeMinCount, boolean dynamicPartitionOverwrite, @Nullable Comparator<InternalRow> keyComparator) {
        this.fileIO = fileIO;
        this.schemaManager = schemaManager;
        this.commitUser = commitUser;
        this.partitionType = partitionType;
        this.partitionObjectConverter = new RowDataToObjectArrayConverter(partitionType);
        this.pathFactory = pathFactory;
        this.snapshotManager = snapshotManager;
        this.manifestFile = manifestFileFactory.create();
        this.manifestList = manifestListFactory.create();
        this.scan = scan;
        this.numBucket = numBucket;
        this.manifestTargetSize = manifestTargetSize;
        this.manifestMergeMinCount = manifestMergeMinCount;
        this.dynamicPartitionOverwrite = dynamicPartitionOverwrite;
        this.keyComparator = keyComparator;
        this.lock = null;
        this.ignoreEmptyCommit = true;
    }

    @Override
    public FileStoreCommit withLock(Lock lock) {
        this.lock = lock;
        return this;
    }

    @Override
    public FileStoreCommit ignoreEmptyCommit(boolean ignoreEmptyCommit) {
        this.ignoreEmptyCommit = ignoreEmptyCommit;
        return this;
    }

    @Override
    public Set<Long> filterCommitted(Set<Long> commitIdentifiers) {
        if (commitIdentifiers.isEmpty()) {
            return commitIdentifiers;
        }
        Optional<Snapshot> latestSnapshot = this.snapshotManager.latestSnapshotOfUser(this.commitUser);
        if (latestSnapshot.isPresent()) {
            HashSet<Long> result = new HashSet<Long>();
            for (Long identifier : commitIdentifiers) {
                if (identifier <= latestSnapshot.get().commitIdentifier()) continue;
                result.add(identifier);
            }
            return result;
        }
        return commitIdentifiers;
    }

    @Override
    public void commit(ManifestCommittable committable, Map<String, String> properties) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Ready to commit\n" + committable.toString());
        }
        Long safeLatestSnapshotId = null;
        ArrayList<ManifestEntry> baseEntries = new ArrayList<ManifestEntry>();
        ArrayList<ManifestEntry> appendTableFiles = new ArrayList<ManifestEntry>();
        ArrayList<ManifestEntry> appendChangelog = new ArrayList<ManifestEntry>();
        ArrayList<ManifestEntry> compactTableFiles = new ArrayList<ManifestEntry>();
        ArrayList<ManifestEntry> compactChangelog = new ArrayList<ManifestEntry>();
        this.collectChanges(committable.fileCommittables(), appendTableFiles, appendChangelog, compactTableFiles, compactChangelog);
        if (!(this.ignoreEmptyCommit && appendTableFiles.isEmpty() && appendChangelog.isEmpty())) {
            Long latestSnapshotId = this.snapshotManager.latestSnapshotId();
            if (latestSnapshotId != null) {
                baseEntries.addAll(this.readAllEntriesFromChangedPartitions(latestSnapshotId, appendTableFiles, compactTableFiles));
                this.noConflictsOrFail(baseEntries, appendTableFiles);
                safeLatestSnapshotId = latestSnapshotId;
            }
            this.tryCommit(appendTableFiles, appendChangelog, committable.identifier(), committable.watermark(), committable.logOffsets(), Snapshot.CommitKind.APPEND, safeLatestSnapshotId);
        }
        if (!compactTableFiles.isEmpty() || !compactChangelog.isEmpty()) {
            if (safeLatestSnapshotId != null) {
                baseEntries.addAll(appendTableFiles);
                this.noConflictsOrFail(baseEntries, compactTableFiles);
                safeLatestSnapshotId = safeLatestSnapshotId + 1L;
            }
            this.tryCommit(compactTableFiles, compactChangelog, committable.identifier(), committable.watermark(), committable.logOffsets(), Snapshot.CommitKind.COMPACT, safeLatestSnapshotId);
        }
    }

    @Override
    public void overwrite(Map<String, String> partition, ManifestCommittable committable, Map<String, String> properties) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Ready to overwrite partition {}\nManifestCommittable: {}\nProperties: {}", new Object[]{partition, committable, properties});
        }
        ArrayList<ManifestEntry> appendTableFiles = new ArrayList<ManifestEntry>();
        ArrayList<ManifestEntry> appendChangelog = new ArrayList<ManifestEntry>();
        ArrayList<ManifestEntry> compactTableFiles = new ArrayList<ManifestEntry>();
        ArrayList<ManifestEntry> compactChangelog = new ArrayList<ManifestEntry>();
        this.collectChanges(committable.fileCommittables(), appendTableFiles, appendChangelog, compactTableFiles, compactChangelog);
        if (!appendChangelog.isEmpty() || !compactChangelog.isEmpty()) {
            StringBuilder warnMessage = new StringBuilder("Overwrite mode currently does not commit any changelog.\nPlease make sure that the partition you're overwriting is not being consumed by a streaming reader.\nIgnored changelog files are:\n");
            for (ManifestEntry entry : appendChangelog) {
                warnMessage.append("  * ").append(entry.toString()).append("\n");
            }
            for (ManifestEntry entry : compactChangelog) {
                warnMessage.append("  * ").append(entry.toString()).append("\n");
            }
            LOG.warn(warnMessage.toString());
        }
        boolean skipOverwrite = false;
        Predicate partitionFilter = null;
        if (this.dynamicPartitionOverwrite) {
            if (appendTableFiles.isEmpty()) {
                skipOverwrite = true;
            } else {
                partitionFilter = appendTableFiles.stream().map(ManifestEntry::partition).distinct().map(p -> PredicateBuilder.equalPartition(p, this.partitionType)).reduce((xva$0, xva$1) -> PredicateBuilder.or(xva$0, xva$1)).orElseThrow(() -> new RuntimeException("Failed to get dynamic partition filter. This is unexpected."));
            }
        } else {
            partitionFilter = PredicateBuilder.partition(partition, this.partitionType);
            if (partitionFilter != null) {
                for (ManifestEntry entry : appendTableFiles) {
                    if (partitionFilter.test(this.partitionObjectConverter.convert(entry.partition()))) continue;
                    throw new IllegalArgumentException("Trying to overwrite partition " + partition + ", but the changes in " + this.pathFactory.getPartitionString(entry.partition()) + " does not belong to this partition");
                }
            }
        }
        if (!skipOverwrite) {
            this.tryOverwrite(partitionFilter, appendTableFiles, committable.identifier(), committable.watermark(), committable.logOffsets());
        }
        if (!compactTableFiles.isEmpty()) {
            this.tryCommit(compactTableFiles, Collections.emptyList(), committable.identifier(), committable.watermark(), committable.logOffsets(), Snapshot.CommitKind.COMPACT, null);
        }
    }

    @Override
    public void dropPartitions(List<Map<String, String>> partitions, long commitIdentifier) {
        Preconditions.checkArgument(!partitions.isEmpty(), "Partitions list cannot be empty.");
        if (LOG.isDebugEnabled()) {
            LOG.debug("Ready to drop partitions {}", (Object)partitions.stream().map(Objects::toString).collect(Collectors.joining(",")));
        }
        Predicate partitionFilter = partitions.stream().map(partition -> PredicateBuilder.partition(partition, this.partitionType)).reduce((xva$0, xva$1) -> PredicateBuilder.or(xva$0, xva$1)).orElseThrow(() -> new RuntimeException("Failed to get partition filter."));
        this.tryOverwrite(partitionFilter, Collections.emptyList(), commitIdentifier, null, Collections.emptyMap());
    }

    @Override
    public void abort(List<CommitMessage> commitMessages) {
        HashMap<Pair, DataFilePathFactory> factoryMap = new HashMap<Pair, DataFilePathFactory>();
        for (CommitMessage message : commitMessages) {
            DataFilePathFactory pathFactory = factoryMap.computeIfAbsent(Pair.of(message.partition(), message.bucket()), k -> this.pathFactory.createDataFilePathFactory((BinaryRow)k.getKey(), (Integer)k.getValue()));
            CommitMessageImpl commitMessage = (CommitMessageImpl)message;
            ArrayList<DataFileMeta> toDelete = new ArrayList<DataFileMeta>();
            toDelete.addAll(commitMessage.newFilesIncrement().newFiles());
            toDelete.addAll(commitMessage.newFilesIncrement().changelogFiles());
            toDelete.addAll(commitMessage.compactIncrement().compactAfter());
            toDelete.addAll(commitMessage.compactIncrement().changelogFiles());
            for (DataFileMeta file : toDelete) {
                this.fileIO.deleteQuietly(pathFactory.toPath(file.fileName()));
            }
        }
    }

    private void collectChanges(List<CommitMessage> commitMessages, List<ManifestEntry> appendTableFiles, List<ManifestEntry> appendChangelog, List<ManifestEntry> compactTableFiles, List<ManifestEntry> compactChangelog) {
        for (CommitMessage message : commitMessages) {
            CommitMessageImpl commitMessage = (CommitMessageImpl)message;
            commitMessage.newFilesIncrement().newFiles().forEach(m -> appendTableFiles.add(this.makeEntry(FileKind.ADD, commitMessage, (DataFileMeta)m)));
            commitMessage.newFilesIncrement().changelogFiles().forEach(m -> appendChangelog.add(this.makeEntry(FileKind.ADD, commitMessage, (DataFileMeta)m)));
            commitMessage.compactIncrement().compactBefore().forEach(m -> compactTableFiles.add(this.makeEntry(FileKind.DELETE, commitMessage, (DataFileMeta)m)));
            commitMessage.compactIncrement().compactAfter().forEach(m -> compactTableFiles.add(this.makeEntry(FileKind.ADD, commitMessage, (DataFileMeta)m)));
            commitMessage.compactIncrement().changelogFiles().forEach(m -> compactChangelog.add(this.makeEntry(FileKind.ADD, commitMessage, (DataFileMeta)m)));
        }
    }

    private ManifestEntry makeEntry(FileKind kind, CommitMessage commitMessage, DataFileMeta file) {
        return new ManifestEntry(kind, commitMessage.partition(), commitMessage.bucket(), this.numBucket, file);
    }

    private void tryCommit(List<ManifestEntry> tableFiles, List<ManifestEntry> changelogFiles, long identifier, @Nullable Long watermark, Map<Integer, Long> logOffsets, Snapshot.CommitKind commitKind, Long safeLatestSnapshotId) {
        Long latestSnapshotId;
        while (!this.tryCommitOnce(tableFiles, changelogFiles, identifier, watermark, logOffsets, commitKind, latestSnapshotId = this.snapshotManager.latestSnapshotId(), safeLatestSnapshotId)) {
        }
    }

    private void tryOverwrite(Predicate partitionFilter, List<ManifestEntry> changes, long identifier, @Nullable Long watermark, Map<Integer, Long> logOffsets) {
        Long latestSnapshotId;
        ArrayList<ManifestEntry> changesWithOverwrite;
        do {
            latestSnapshotId = this.snapshotManager.latestSnapshotId();
            changesWithOverwrite = new ArrayList<ManifestEntry>();
            if (latestSnapshotId != null) {
                List<ManifestEntry> currentEntries = this.scan.withSnapshot(latestSnapshotId).withPartitionFilter(partitionFilter).plan().files();
                for (ManifestEntry entry : currentEntries) {
                    changesWithOverwrite.add(new ManifestEntry(FileKind.DELETE, entry.partition(), entry.bucket(), entry.totalBuckets(), entry.file()));
                }
            }
            changesWithOverwrite.addAll(changes);
        } while (!this.tryCommitOnce(changesWithOverwrite, Collections.emptyList(), identifier, watermark, logOffsets, Snapshot.CommitKind.OVERWRITE, latestSnapshotId, null));
    }

    @VisibleForTesting
    public boolean tryCommitOnce(List<ManifestEntry> tableFiles, List<ManifestEntry> changelogFiles, long identifier, @Nullable Long watermark, Map<Integer, Long> logOffsets, Snapshot.CommitKind commitKind, Long latestSnapshotId, Long safeLatestSnapshotId) {
        boolean success;
        Snapshot newSnapshot;
        long newSnapshotId = latestSnapshotId == null ? 1L : latestSnapshotId + 1L;
        Path newSnapshotPath = this.snapshotManager.snapshotPath(newSnapshotId);
        if (LOG.isDebugEnabled()) {
            LOG.debug("Ready to commit table files to snapshot #" + newSnapshotId);
            for (ManifestEntry entry : tableFiles) {
                LOG.debug("  * " + entry.toString());
            }
            LOG.debug("Ready to commit changelog to snapshot #" + newSnapshotId);
            for (ManifestEntry entry : changelogFiles) {
                LOG.debug("  * " + entry.toString());
            }
        }
        Snapshot latestSnapshot = null;
        if (latestSnapshotId != null) {
            if (!latestSnapshotId.equals(safeLatestSnapshotId)) {
                this.noConflictsOrFail(latestSnapshotId, tableFiles);
            }
            latestSnapshot = this.snapshotManager.snapshot(latestSnapshotId);
        }
        String previousChangesListName = null;
        String newChangesListName = null;
        String changelogListName = null;
        ArrayList<ManifestFileMeta> oldMetas = new ArrayList<ManifestFileMeta>();
        ArrayList<ManifestFileMeta> newMetas = new ArrayList<ManifestFileMeta>();
        ArrayList<ManifestFileMeta> changelogMetas = new ArrayList<ManifestFileMeta>();
        try {
            long previousTotalRecordCount = 0L;
            Long currentWatermark = watermark;
            if (latestSnapshot != null) {
                previousTotalRecordCount = latestSnapshot.totalRecordCount(this.scan);
                List<ManifestFileMeta> previousManifests = latestSnapshot.dataManifests(this.manifestList);
                oldMetas.addAll(previousManifests);
                latestSnapshot.logOffsets().forEach(logOffsets::putIfAbsent);
                Long latestWatermark = latestSnapshot.watermark();
                if (latestWatermark != null) {
                    currentWatermark = currentWatermark == null ? latestWatermark : Math.max(currentWatermark, latestWatermark);
                }
            }
            newMetas.addAll(ManifestFileMeta.merge(oldMetas, this.manifestFile, this.manifestTargetSize.getBytes(), this.manifestMergeMinCount));
            previousChangesListName = this.manifestList.write(newMetas);
            long deltaRecordCount = Snapshot.recordCount(tableFiles);
            List<ManifestFileMeta> newChangesManifests = this.manifestFile.write(tableFiles);
            newMetas.addAll(newChangesManifests);
            newChangesListName = this.manifestList.write(newChangesManifests);
            if (!changelogFiles.isEmpty()) {
                changelogMetas.addAll(this.manifestFile.write(changelogFiles));
                changelogListName = this.manifestList.write(changelogMetas);
            }
            newSnapshot = new Snapshot(newSnapshotId, this.schemaManager.latest().get().id(), previousChangesListName, newChangesListName, changelogListName, this.commitUser, identifier, commitKind, System.currentTimeMillis(), logOffsets, previousTotalRecordCount + deltaRecordCount, deltaRecordCount, Snapshot.recordCount(changelogFiles), currentWatermark);
        }
        catch (Throwable e) {
            this.cleanUpTmpManifests(previousChangesListName, newChangesListName, changelogListName, oldMetas, newMetas, changelogMetas);
            throw new RuntimeException(String.format("Exception occurs when preparing snapshot #%d (path %s) by user %s with hash %s and kind %s. Clean up.", newSnapshotId, newSnapshotPath.toString(), this.commitUser, identifier, commitKind.name()), e);
        }
        try {
            Callable<Boolean> callable = () -> {
                boolean committed = this.fileIO.writeFileUtf8(newSnapshotPath, newSnapshot.toJson());
                if (committed) {
                    this.snapshotManager.commitLatestHint(newSnapshotId);
                }
                return committed;
            };
            success = this.lock != null ? this.lock.runWithLock(() -> !this.fileIO.exists(newSnapshotPath) && (Boolean)callable.call() != false).booleanValue() : callable.call().booleanValue();
        }
        catch (Throwable e) {
            throw new RuntimeException(String.format("Exception occurs when committing snapshot #%d (path %s) by user %s with identifier %s and kind %s. Cannot clean up because we can't determine the success.", newSnapshotId, newSnapshotPath, this.commitUser, identifier, commitKind.name()), e);
        }
        if (success) {
            if (LOG.isDebugEnabled()) {
                LOG.debug(String.format("Successfully commit snapshot #%d (path %s) by user %s with identifier %s and kind %s.", newSnapshotId, newSnapshotPath, this.commitUser, identifier, commitKind.name()));
            }
            return true;
        }
        LOG.warn(String.format("Atomic commit failed for snapshot #%d (path %s) by user %s with identifier %s and kind %s. Clean up and try again.", newSnapshotId, newSnapshotPath, this.commitUser, identifier, commitKind.name()));
        this.cleanUpTmpManifests(previousChangesListName, newChangesListName, changelogListName, oldMetas, newMetas, changelogMetas);
        return false;
    }

    @SafeVarargs
    private final List<ManifestEntry> readAllEntriesFromChangedPartitions(long snapshotId, List<ManifestEntry> ... changes) {
        List<BinaryRow> changedPartitions = Arrays.stream(changes).flatMap(Collection::stream).map(ManifestEntry::partition).distinct().collect(Collectors.toList());
        try {
            return this.scan.withSnapshot(snapshotId).withPartitionFilter(changedPartitions).plan().files();
        }
        catch (Throwable e) {
            throw new RuntimeException("Cannot read manifest entries from changed partitions.", e);
        }
    }

    private void noConflictsOrFail(long snapshotId, List<ManifestEntry> changes) {
        this.noConflictsOrFail(this.readAllEntriesFromChangedPartitions(snapshotId, changes), changes);
    }

    private void noConflictsOrFail(List<ManifestEntry> baseEntries, List<ManifestEntry> changes) {
        Collection<ManifestEntry> mergedEntries;
        ArrayList<ManifestEntry> allEntries = new ArrayList<ManifestEntry>(baseEntries);
        allEntries.addAll(changes);
        try {
            mergedEntries = ManifestEntry.mergeEntries(allEntries);
            ManifestEntry.assertNoDelete(mergedEntries);
        }
        catch (Throwable e) {
            LOG.warn("File deletion conflicts detected! Give up committing.", e);
            throw this.createConflictException("File deletion conflicts detected! Give up committing.", baseEntries, changes);
        }
        if (this.keyComparator == null) {
            return;
        }
        HashMap<LevelIdentifier, List> levels = new HashMap<LevelIdentifier, List>();
        for (ManifestEntry entry : mergedEntries) {
            int level = entry.file().level();
            if (level < 1) continue;
            levels.computeIfAbsent(new LevelIdentifier(entry.partition(), entry.bucket(), level), lv -> new ArrayList()).add(entry);
        }
        for (List entries : levels.values()) {
            entries.sort((a, b) -> this.keyComparator.compare(a.file().minKey(), b.file().minKey()));
            int i = 0;
            while (i + 1 < entries.size()) {
                ManifestEntry a2 = (ManifestEntry)entries.get(i);
                ManifestEntry b2 = (ManifestEntry)entries.get(i + 1);
                if (this.keyComparator.compare(a2.file().maxKey(), b2.file().minKey()) >= 0) {
                    throw this.createConflictException("LSM conflicts detected! Give up committing. Conflict files are:\n" + a2.identifier().toString(this.pathFactory) + "\n" + b2.identifier().toString(this.pathFactory), baseEntries, changes);
                }
                ++i;
            }
        }
    }

    private RuntimeException createConflictException(String message, List<ManifestEntry> baseEntries, List<ManifestEntry> changes) {
        String possibleCauses = String.join((CharSequence)"\n", "Conflicts during commits are normal and this failure is intended to resolve the conflicts.", "Conflicts are mainly caused by the following scenarios:", "1. Multiple jobs are writing into the same partition at the same time, you can use https://paimon.apache.org/docs/master/maintenance/write-performance/#dedicated-compaction-job to support multiple writing.", "2. You're recovering from an old savepoint, or you're creating multiple jobs from a savepoint.", "   The job will fail continuously in this scenario to protect metadata from corruption.", "   You can either recover from the latest savepoint, or you can revert the table to the snapshot corresponding to the old savepoint.");
        String baseEntriesString = "Base entries are:\n" + baseEntries.stream().map(ManifestEntry::toString).collect(Collectors.joining("\n"));
        String changesString = "Changes are:\n" + changes.stream().map(ManifestEntry::toString).collect(Collectors.joining("\n"));
        return new RuntimeException(message + "\n\n" + possibleCauses + "\n\n" + baseEntriesString + "\n\n" + changesString);
    }

    private void cleanUpTmpManifests(String previousChangesListName, String newChangesListName, String changelogListName, List<ManifestFileMeta> oldMetas, List<ManifestFileMeta> newMetas, List<ManifestFileMeta> changelogMetas) {
        if (previousChangesListName != null) {
            this.manifestList.delete(previousChangesListName);
        }
        if (newChangesListName != null) {
            this.manifestList.delete(newChangesListName);
        }
        if (changelogListName != null) {
            this.manifestList.delete(changelogListName);
        }
        HashSet<ManifestFileMeta> oldMetaSet = new HashSet<ManifestFileMeta>(oldMetas);
        for (ManifestFileMeta suspect : newMetas) {
            if (oldMetaSet.contains(suspect)) continue;
            this.manifestList.delete(suspect.fileName());
        }
        for (ManifestFileMeta meta : changelogMetas) {
            this.manifestList.delete(meta.fileName());
        }
    }

    private static class LevelIdentifier {
        private final BinaryRow partition;
        private final int bucket;
        private final int level;

        private LevelIdentifier(BinaryRow partition, int bucket, int level) {
            this.partition = partition;
            this.bucket = bucket;
            this.level = level;
        }

        public boolean equals(Object o) {
            if (!(o instanceof LevelIdentifier)) {
                return false;
            }
            LevelIdentifier that = (LevelIdentifier)o;
            return Objects.equals(this.partition, that.partition) && this.bucket == that.bucket && this.level == that.level;
        }

        public int hashCode() {
            return Objects.hash(this.partition, this.bucket, this.level);
        }
    }
}

