/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.mergetree;

import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.time.Duration;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import org.apache.paimon.KeyValue;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.serializer.RowCompactedSerializer;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataOutputSerializer;
import org.apache.paimon.lookup.LookupStoreFactory;
import org.apache.paimon.lookup.LookupStoreReader;
import org.apache.paimon.lookup.LookupStoreWriter;
import org.apache.paimon.memory.MemorySegment;
import org.apache.paimon.mergetree.Levels;
import org.apache.paimon.mergetree.SortedRun;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.shade.guava30.com.google.common.cache.Cache;
import org.apache.paimon.shade.guava30.com.google.common.cache.CacheBuilder;
import org.apache.paimon.shade.guava30.com.google.common.cache.RemovalNotification;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.FileIOUtils;
import org.apache.paimon.utils.IOFunction;

public class LookupLevels
implements Levels.DropFileCallback,
Closeable {
    private final Levels levels;
    private final Comparator<InternalRow> keyComparator;
    private final RowCompactedSerializer keySerializer;
    private final RowCompactedSerializer valueSerializer;
    private final IOFunction<DataFileMeta, RecordReader<KeyValue>> fileReaderFactory;
    private final Supplier<File> localFileFactory;
    private final LookupStoreFactory lookupStoreFactory;
    private final Cache<String, LookupFile> lookupFiles;

    public LookupLevels(Levels levels, Comparator<InternalRow> keyComparator, RowType keyType, RowType valueType, IOFunction<DataFileMeta, RecordReader<KeyValue>> fileReaderFactory, Supplier<File> localFileFactory, LookupStoreFactory lookupStoreFactory, Duration fileRetention, MemorySize maxDiskSize) {
        this.levels = levels;
        this.keyComparator = keyComparator;
        this.keySerializer = new RowCompactedSerializer(keyType);
        this.valueSerializer = new RowCompactedSerializer(valueType);
        this.fileReaderFactory = fileReaderFactory;
        this.localFileFactory = localFileFactory;
        this.lookupStoreFactory = lookupStoreFactory;
        this.lookupFiles = CacheBuilder.newBuilder().expireAfterAccess(fileRetention).maximumWeight(maxDiskSize.getKibiBytes()).weigher(this::fileWeigh).removalListener(this::removalCallback).build();
        levels.addDropFileCallback(this);
    }

    @VisibleForTesting
    Cache<String, LookupFile> lookupFiles() {
        return this.lookupFiles;
    }

    @Override
    public void notifyDropFile(String file) {
        this.lookupFiles.invalidate(file);
    }

    @Nullable
    public KeyValue lookup(InternalRow key, int startLevel) throws IOException {
        SortedRun level;
        if (startLevel == 0) {
            throw new IllegalArgumentException("Start level can not be zero.");
        }
        KeyValue kv = null;
        for (int i = startLevel; i < this.levels.numberOfLevels() && (kv = this.lookup(key, level = this.levels.runOfLevel(i))) == null; ++i) {
        }
        return kv;
    }

    @Nullable
    private KeyValue lookup(InternalRow target, SortedRun level) throws IOException {
        List<DataFileMeta> files = level.files();
        int left = 0;
        int right = files.size() - 1;
        while (left < right) {
            int mid = (left + right) / 2;
            if (this.keyComparator.compare(files.get(mid).maxKey(), target) < 0) {
                left = mid + 1;
                continue;
            }
            right = mid;
        }
        int index = right;
        if (index == files.size() - 1 && this.keyComparator.compare(files.get(index).maxKey(), target) < 0) {
            ++index;
        }
        return index < files.size() ? this.lookup(target, files.get(index)) : null;
    }

    @Nullable
    private KeyValue lookup(InternalRow key, DataFileMeta file) throws IOException {
        LookupFile lookupFile;
        try {
            lookupFile = this.lookupFiles.get(file.fileName(), () -> this.createLookupFile(file));
        }
        catch (ExecutionException e) {
            throw new IOException(e);
        }
        byte[] keyBytes = this.keySerializer.serializeToBytes(key);
        byte[] valueBytes = lookupFile.get(keyBytes);
        if (valueBytes == null) {
            return null;
        }
        InternalRow value = this.valueSerializer.deserialize(valueBytes);
        long sequenceNumber = MemorySegment.wrap(valueBytes).getLong(valueBytes.length - 9);
        RowKind rowKind = RowKind.fromByteValue(valueBytes[valueBytes.length - 1]);
        return new KeyValue().replace(key, sequenceNumber, rowKind, value).setLevel(lookupFile.remoteFile().level());
    }

    private int fileWeigh(String file, LookupFile lookupFile) {
        return lookupFile.fileKibiBytes();
    }

    private void removalCallback(RemovalNotification<String, LookupFile> notification) {
        LookupFile reader = (LookupFile)notification.getValue();
        if (reader != null) {
            try {
                reader.close();
            }
            catch (IOException e) {
                throw new UncheckedIOException(e);
            }
        }
    }

    private LookupFile createLookupFile(DataFileMeta file) throws IOException {
        File localFile = this.localFileFactory.get();
        if (!localFile.createNewFile()) {
            throw new IOException("Can not create new file: " + localFile);
        }
        try (LookupStoreWriter kvWriter = this.lookupStoreFactory.createWriter(localFile);
             RecordReader<KeyValue> reader = this.fileReaderFactory.apply(file);){
            RecordReader.RecordIterator<KeyValue> batch;
            DataOutputSerializer valueOut = new DataOutputSerializer(32);
            while ((batch = reader.readBatch()) != null) {
                KeyValue kv;
                while ((kv = batch.next()) != null) {
                    byte[] keyBytes = this.keySerializer.serializeToBytes(kv.key());
                    valueOut.clear();
                    valueOut.write(this.valueSerializer.serializeToBytes(kv.value()));
                    valueOut.writeLong(kv.sequenceNumber());
                    valueOut.writeByte(kv.valueKind().toByteValue());
                    byte[] valueBytes = valueOut.getCopyOfBuffer();
                    kvWriter.put(keyBytes, valueBytes);
                }
                batch.releaseBatch();
            }
        }
        catch (IOException e) {
            FileIOUtils.deleteFileOrDirectory(localFile);
            throw e;
        }
        return new LookupFile(localFile, file, this.lookupStoreFactory.createReader(localFile));
    }

    @Override
    public void close() throws IOException {
        this.lookupFiles.invalidateAll();
    }

    private static class LookupFile
    implements Closeable {
        private final File localFile;
        private final DataFileMeta remoteFile;
        private final LookupStoreReader reader;

        public LookupFile(File localFile, DataFileMeta remoteFile, LookupStoreReader reader) {
            this.localFile = localFile;
            this.remoteFile = remoteFile;
            this.reader = reader;
        }

        @Nullable
        public byte[] get(byte[] key) throws IOException {
            return this.reader.lookup(key);
        }

        public int fileKibiBytes() {
            long kibiBytes = this.localFile.length() >> 10;
            if (kibiBytes > Integer.MAX_VALUE) {
                throw new RuntimeException("Lookup file is too big: " + MemorySize.ofKibiBytes(kibiBytes));
            }
            return (int)kibiBytes;
        }

        public DataFileMeta remoteFile() {
            return this.remoteFile;
        }

        @Override
        public void close() throws IOException {
            this.reader.close();
            FileIOUtils.deleteFileOrDirectory(this.localFile);
        }
    }
}

