/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.common.util.collection;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.RandomAccessFile;
import java.io.Serializable;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Stream;
import java.util.zip.Deflater;
import java.util.zip.DeflaterOutputStream;
import java.util.zip.InflaterInputStream;
import org.apache.hudi.common.fs.SizeAwareDataOutputStream;
import org.apache.hudi.common.util.BinaryUtil;
import org.apache.hudi.common.util.BufferedRandomAccessFile;
import org.apache.hudi.common.util.ClosableIterator;
import org.apache.hudi.common.util.SerializationUtils;
import org.apache.hudi.common.util.SpillableMapUtils;
import org.apache.hudi.common.util.collection.DiskMap;
import org.apache.hudi.common.util.collection.ExternalSpillableMap;
import org.apache.hudi.common.util.collection.LazyFileIterable;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

public final class BitCaskDiskMap<T extends Serializable, R extends Serializable>
extends DiskMap<T, R> {
    public static final int BUFFER_SIZE = 131072;
    private static final Logger LOG = LogManager.getLogger(BitCaskDiskMap.class);
    private static final ThreadLocal<CompressionHandler> DISK_COMPRESSION_REF = ThreadLocal.withInitial(CompressionHandler::new);
    private final Map<T, ValueMetadata> valueMetadataMap;
    private final boolean isCompressionEnabled;
    private final File writeOnlyFile;
    private final SizeAwareDataOutputStream writeOnlyFileHandle;
    private final FileOutputStream fileOutputStream;
    private final AtomicLong filePosition;
    private final String filePath;
    private final ThreadLocal<BufferedRandomAccessFile> randomAccessFile = new ThreadLocal();
    private final Queue<BufferedRandomAccessFile> openedAccessFiles = new ConcurrentLinkedQueue<BufferedRandomAccessFile>();
    private final List<ClosableIterator<R>> iterators = new ArrayList<ClosableIterator<R>>();

    public BitCaskDiskMap(String baseFilePath, boolean isCompressionEnabled) throws IOException {
        super(baseFilePath, ExternalSpillableMap.DiskMapType.BITCASK.name());
        this.valueMetadataMap = new ConcurrentHashMap<T, ValueMetadata>();
        this.isCompressionEnabled = isCompressionEnabled;
        this.writeOnlyFile = new File(this.diskMapPath, UUID.randomUUID().toString());
        this.filePath = this.writeOnlyFile.getPath();
        this.initFile(this.writeOnlyFile);
        this.fileOutputStream = new FileOutputStream(this.writeOnlyFile, true);
        this.writeOnlyFileHandle = new SizeAwareDataOutputStream(this.fileOutputStream, 131072);
        this.filePosition = new AtomicLong(0L);
    }

    public BitCaskDiskMap(String baseFilePath) throws IOException {
        this(baseFilePath, false);
    }

    private BufferedRandomAccessFile getRandomAccessFile() {
        try {
            BufferedRandomAccessFile readHandle = this.randomAccessFile.get();
            if (readHandle == null) {
                readHandle = new BufferedRandomAccessFile(this.filePath, "r", 131072);
                readHandle.seek(0L);
                this.randomAccessFile.set(readHandle);
                this.openedAccessFiles.offer(readHandle);
            }
            return readHandle;
        }
        catch (IOException ioe) {
            throw new HoodieException(ioe);
        }
    }

    private void initFile(File writeOnlyFile) throws IOException {
        if (writeOnlyFile.exists()) {
            writeOnlyFile.delete();
        }
        if (!writeOnlyFile.getParentFile().exists()) {
            writeOnlyFile.getParentFile().mkdir();
        }
        writeOnlyFile.createNewFile();
        LOG.debug((Object)("Spilling to file location " + writeOnlyFile.getAbsolutePath()));
        writeOnlyFile.deleteOnExit();
    }

    private void flushToDisk() {
        try {
            this.writeOnlyFileHandle.flush();
        }
        catch (IOException e) {
            throw new HoodieIOException("Failed to flush to BitCaskDiskMap file", e);
        }
    }

    @Override
    public Iterator<R> iterator() {
        Iterator iterator2 = new LazyFileIterable(this.filePath, this.valueMetadataMap, this.isCompressionEnabled).iterator();
        this.iterators.add((ClosableIterator<R>)iterator2);
        return iterator2;
    }

    @Override
    public long sizeOfFileOnDiskInBytes() {
        return this.filePosition.get();
    }

    @Override
    public int size() {
        return this.valueMetadataMap.size();
    }

    @Override
    public boolean isEmpty() {
        return this.valueMetadataMap.isEmpty();
    }

    @Override
    public boolean containsKey(Object key) {
        return this.valueMetadataMap.containsKey(key);
    }

    @Override
    public boolean containsValue(Object value) {
        throw new HoodieNotSupportedException("unable to compare values in map");
    }

    @Override
    public R get(Object key) {
        ValueMetadata entry = this.valueMetadataMap.get(key);
        if (entry == null) {
            return null;
        }
        return this.get(entry);
    }

    private R get(ValueMetadata entry) {
        return (R)((Serializable)BitCaskDiskMap.get(entry, this.getRandomAccessFile(), this.isCompressionEnabled));
    }

    public static <R> R get(ValueMetadata entry, RandomAccessFile file, boolean isCompressionEnabled) {
        try {
            byte[] bytesFromDisk = SpillableMapUtils.readBytesFromDisk(file, entry.getOffsetOfValue(), entry.getSizeOfValue());
            if (isCompressionEnabled) {
                return (R)SerializationUtils.deserialize(BitCaskDiskMap.DISK_COMPRESSION_REF.get().decompressBytes(bytesFromDisk));
            }
            return (R)SerializationUtils.deserialize(bytesFromDisk);
        }
        catch (IOException e) {
            throw new HoodieIOException("Unable to readFromDisk Hoodie Record from disk", e);
        }
    }

    private synchronized R put(T key, R value, boolean flush) {
        try {
            byte[] val = this.isCompressionEnabled ? BitCaskDiskMap.DISK_COMPRESSION_REF.get().compressBytes(SerializationUtils.serialize(value)) : SerializationUtils.serialize(value);
            Integer valueSize = val.length;
            Long timestamp = System.currentTimeMillis();
            this.valueMetadataMap.put(key, new ValueMetadata(this.filePath, valueSize, this.filePosition.get(), timestamp));
            byte[] serializedKey = SerializationUtils.serialize(key);
            this.filePosition.set(SpillableMapUtils.spillToDisk(this.writeOnlyFileHandle, new FileEntry(BinaryUtil.generateChecksum(val), serializedKey.length, valueSize, serializedKey, val, timestamp)));
            if (flush) {
                this.flushToDisk();
            }
        }
        catch (IOException io) {
            throw new HoodieIOException("Unable to store data in Disk Based map", io);
        }
        return value;
    }

    @Override
    public R put(T key, R value) {
        return this.put(key, value, true);
    }

    @Override
    public R remove(Object key) {
        Object value = this.get(key);
        this.valueMetadataMap.remove(key);
        return (R)value;
    }

    @Override
    public void putAll(Map<? extends T, ? extends R> m) {
        for (Map.Entry<T, R> entry : m.entrySet()) {
            this.put((Serializable)entry.getKey(), (Serializable)entry.getValue(), false);
        }
        this.flushToDisk();
    }

    @Override
    public void clear() {
        this.valueMetadataMap.clear();
    }

    @Override
    public void close() {
        this.valueMetadataMap.clear();
        try {
            if (this.writeOnlyFileHandle != null) {
                this.writeOnlyFileHandle.flush();
                this.fileOutputStream.getChannel().force(false);
                this.writeOnlyFileHandle.close();
            }
            while (!this.openedAccessFiles.isEmpty()) {
                BufferedRandomAccessFile file = this.openedAccessFiles.poll();
                if (null == file) continue;
                try {
                    file.close();
                }
                catch (IOException iOException) {}
            }
            this.writeOnlyFile.delete();
            this.iterators.forEach(ClosableIterator::close);
        }
        catch (Exception e) {
            this.writeOnlyFile.delete();
        }
        finally {
            super.close();
        }
    }

    @Override
    public Set<T> keySet() {
        return this.valueMetadataMap.keySet();
    }

    @Override
    public Collection<R> values() {
        throw new HoodieException("Unsupported Operation Exception");
    }

    @Override
    public Stream<R> valueStream() {
        BufferedRandomAccessFile file = this.getRandomAccessFile();
        return ((Stream)this.valueMetadataMap.values().stream().sorted().sequential()).map(valueMetaData -> (Serializable)BitCaskDiskMap.get(valueMetaData, file, this.isCompressionEnabled));
    }

    @Override
    public Set<Map.Entry<T, R>> entrySet() {
        HashSet<Map.Entry<T, R>> entrySet = new HashSet<Map.Entry<T, R>>();
        for (Serializable key : this.valueMetadataMap.keySet()) {
            entrySet.add(new AbstractMap.SimpleEntry<Serializable, Object>(key, this.get(key)));
        }
        return entrySet;
    }

    private static class CompressionHandler
    implements Serializable {
        private static final int DISK_COMPRESSION_INITIAL_BUFFER_SIZE = 0x100000;
        private static final int DECOMPRESS_INTERMEDIATE_BUFFER_SIZE = 8192;
        private final ByteArrayOutputStream compressBaos = new ByteArrayOutputStream(0x100000);
        private final ByteArrayOutputStream decompressBaos = new ByteArrayOutputStream(0x100000);
        private final byte[] decompressIntermediateBuffer = new byte[8192];

        CompressionHandler() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private byte[] compressBytes(byte[] value) throws IOException {
            this.compressBaos.reset();
            Deflater deflater = new Deflater(9);
            DeflaterOutputStream dos = new DeflaterOutputStream((OutputStream)this.compressBaos, deflater);
            try {
                dos.write(value);
            }
            finally {
                dos.close();
                deflater.end();
            }
            return this.compressBaos.toByteArray();
        }

        /*
         * Enabled aggressive block sorting
         * Enabled unnecessary exception pruning
         * Enabled aggressive exception aggregation
         */
        private byte[] decompressBytes(byte[] bytes) throws IOException {
            this.decompressBaos.reset();
            try (InflaterInputStream in = new InflaterInputStream(new ByteArrayInputStream(bytes));){
                int len;
                while ((len = ((InputStream)in).read(this.decompressIntermediateBuffer)) > 0) {
                    this.decompressBaos.write(this.decompressIntermediateBuffer, 0, len);
                }
                byte[] byArray = this.decompressBaos.toByteArray();
                return byArray;
            }
            catch (IOException e) {
                throw new HoodieIOException("IOException while decompressing bytes", e);
            }
        }
    }

    public static final class ValueMetadata
    implements Comparable<ValueMetadata> {
        private String filePath;
        private Integer sizeOfValue;
        private Long offsetOfValue;
        private Long timestamp;

        protected ValueMetadata(String filePath, int sizeOfValue, long offsetOfValue, long timestamp) {
            this.filePath = filePath;
            this.sizeOfValue = sizeOfValue;
            this.offsetOfValue = offsetOfValue;
            this.timestamp = timestamp;
        }

        public String getFilePath() {
            return this.filePath;
        }

        public int getSizeOfValue() {
            return this.sizeOfValue;
        }

        public Long getOffsetOfValue() {
            return this.offsetOfValue;
        }

        public long getTimestamp() {
            return this.timestamp;
        }

        @Override
        public int compareTo(ValueMetadata o) {
            return Long.compare(this.offsetOfValue, o.offsetOfValue);
        }
    }

    public static final class FileEntry {
        private Long crc;
        private Integer sizeOfKey;
        private Integer sizeOfValue;
        private byte[] key;
        private byte[] value;
        private Long timestamp;

        public FileEntry(long crc, int sizeOfKey, int sizeOfValue, byte[] key, byte[] value, long timestamp) {
            this.crc = crc;
            this.sizeOfKey = sizeOfKey;
            this.sizeOfValue = sizeOfValue;
            this.key = key;
            this.value = value;
            this.timestamp = timestamp;
        }

        public long getCrc() {
            return this.crc;
        }

        public int getSizeOfKey() {
            return this.sizeOfKey;
        }

        public int getSizeOfValue() {
            return this.sizeOfValue;
        }

        public byte[] getKey() {
            return this.key;
        }

        public byte[] getValue() {
            return this.value;
        }

        public long getTimestamp() {
            return this.timestamp;
        }
    }
}

