/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.common.table.log;

import java.io.DataInputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import javax.annotation.Nullable;
import org.apache.avro.Schema;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BufferedFSInputStream;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hudi.common.fs.BoundedFsDataInputStream;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.fs.SchemeAwareFSDataInputStream;
import org.apache.hudi.common.fs.StorageSchemes;
import org.apache.hudi.common.fs.TimedFSDataInputStream;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.log.HoodieLogFormatVersion;
import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
import org.apache.hudi.common.table.log.block.HoodieCDCDataBlock;
import org.apache.hudi.common.table.log.block.HoodieCommandBlock;
import org.apache.hudi.common.table.log.block.HoodieCorruptBlock;
import org.apache.hudi.common.table.log.block.HoodieDeleteBlock;
import org.apache.hudi.common.table.log.block.HoodieHFileDataBlock;
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import org.apache.hudi.common.table.log.block.HoodieParquetDataBlock;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.exception.CorruptedLogFileException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.hudi.internal.schema.InternalSchema;
import org.apache.hudi.org.apache.hadoop.hbase.util.Bytes;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

public class HoodieLogFileReader
implements HoodieLogFormat.Reader {
    public static final int DEFAULT_BUFFER_SIZE = 0x1000000;
    private static final int BLOCK_SCAN_READ_BUFFER_SIZE = 0x100000;
    private static final Logger LOG = LogManager.getLogger(HoodieLogFileReader.class);
    private final FileSystem fs;
    private final Configuration hadoopConf;
    private final FSDataInputStream inputStream;
    private final HoodieLogFile logFile;
    private final byte[] magicBuffer = new byte[6];
    private final Schema readerSchema;
    private InternalSchema internalSchema;
    private final String keyField;
    private boolean readBlockLazily;
    private long reverseLogFilePosition;
    private long lastReverseLogFilePosition;
    private boolean reverseReader;
    private boolean enableRecordLookups;
    private boolean closed = false;
    private transient Thread shutdownThread = null;

    public HoodieLogFileReader(FileSystem fs, HoodieLogFile logFile, Schema readerSchema, int bufferSize, boolean readBlockLazily) throws IOException {
        this(fs, logFile, readerSchema, bufferSize, readBlockLazily, false);
    }

    public HoodieLogFileReader(FileSystem fs, HoodieLogFile logFile, Schema readerSchema, int bufferSize, boolean readBlockLazily, boolean reverseReader) throws IOException {
        this(fs, logFile, readerSchema, bufferSize, readBlockLazily, reverseReader, false, HoodieRecord.RECORD_KEY_METADATA_FIELD);
    }

    public HoodieLogFileReader(FileSystem fs, HoodieLogFile logFile, Schema readerSchema, int bufferSize, boolean readBlockLazily, boolean reverseReader, boolean enableRecordLookups, String keyField) throws IOException {
        this(fs, logFile, readerSchema, bufferSize, readBlockLazily, reverseReader, enableRecordLookups, keyField, InternalSchema.getEmptyInternalSchema());
    }

    public HoodieLogFileReader(FileSystem fs, HoodieLogFile logFile, Schema readerSchema, int bufferSize, boolean readBlockLazily, boolean reverseReader, boolean enableRecordLookups, String keyField, InternalSchema internalSchema) throws IOException {
        this.fs = fs;
        this.hadoopConf = fs.getConf();
        this.logFile = new HoodieLogFile(FSUtils.makeQualified(fs, logFile.getPath()), logFile.getFileSize());
        this.inputStream = HoodieLogFileReader.getFSDataInputStream(fs, this.logFile, bufferSize);
        this.readerSchema = readerSchema;
        this.readBlockLazily = readBlockLazily;
        this.reverseReader = reverseReader;
        this.enableRecordLookups = enableRecordLookups;
        this.keyField = keyField;
        InternalSchema internalSchema2 = this.internalSchema = internalSchema == null ? InternalSchema.getEmptyInternalSchema() : internalSchema;
        if (this.reverseReader) {
            this.reverseLogFilePosition = this.lastReverseLogFilePosition = this.logFile.getFileSize();
        }
        this.addShutDownHook();
    }

    @Override
    public HoodieLogFile getLogFile() {
        return this.logFile;
    }

    private void addShutDownHook() {
        this.shutdownThread = new Thread(() -> {
            try {
                this.close();
            }
            catch (Exception e) {
                LOG.warn((Object)("unable to close input stream for log file " + this.logFile), (Throwable)e);
            }
        });
        Runtime.getRuntime().addShutdownHook(this.shutdownThread);
    }

    private HoodieLogBlock readBlock() throws IOException {
        Map<HoodieLogBlock.HeaderMetadataType, String> footer;
        int blockSize;
        long blockStartPos = this.inputStream.getPos();
        try {
            blockSize = (int)this.inputStream.readLong();
        }
        catch (EOFException | CorruptedLogFileException e) {
            return this.createCorruptBlock(blockStartPos);
        }
        boolean isCorrupted = this.isBlockCorrupted(blockSize);
        if (isCorrupted) {
            return this.createCorruptBlock(blockStartPos);
        }
        HoodieLogFormat.LogFormatVersion nextBlockVersion = this.readVersion();
        HoodieLogBlock.HoodieLogBlockType blockType = this.tryReadBlockType(nextBlockVersion);
        Map<HoodieLogBlock.HeaderMetadataType, String> header = nextBlockVersion.hasHeader() ? HoodieLogBlock.getLogMetadata((DataInputStream)this.inputStream) : null;
        int contentLength = nextBlockVersion.getVersion() != 0 ? (int)this.inputStream.readLong() : blockSize;
        long contentPosition = this.inputStream.getPos();
        boolean shouldReadLazily = this.readBlockLazily && nextBlockVersion.getVersion() != 0;
        Option<byte[]> content = HoodieLogBlock.tryReadContent(this.inputStream, contentLength, shouldReadLazily);
        Map<HoodieLogBlock.HeaderMetadataType, String> map = footer = nextBlockVersion.hasFooter() ? HoodieLogBlock.getLogMetadata((DataInputStream)this.inputStream) : null;
        if (nextBlockVersion.hasLogBlockLength()) {
            this.inputStream.readLong();
        }
        long blockEndPos = this.inputStream.getPos();
        HoodieLogBlock.HoodieLogBlockContentLocation logBlockContentLoc = new HoodieLogBlock.HoodieLogBlockContentLocation(this.hadoopConf, this.logFile, contentPosition, contentLength, blockEndPos);
        switch (Objects.requireNonNull(blockType)) {
            case AVRO_DATA_BLOCK: {
                if (nextBlockVersion.getVersion() == 0) {
                    return HoodieAvroDataBlock.getBlock(content.get(), this.readerSchema, this.internalSchema);
                }
                return new HoodieAvroDataBlock(this.inputStream, content, this.readBlockLazily, logBlockContentLoc, this.getTargetReaderSchemaForBlock(), header, footer, this.keyField);
            }
            case HFILE_DATA_BLOCK: {
                ValidationUtils.checkState(nextBlockVersion.getVersion() != 0, String.format("HFile block could not be of version (%d)", 0));
                return new HoodieHFileDataBlock(this.inputStream, content, this.readBlockLazily, logBlockContentLoc, Option.ofNullable(this.readerSchema), header, footer, this.enableRecordLookups, this.logFile.getPath());
            }
            case PARQUET_DATA_BLOCK: {
                ValidationUtils.checkState(nextBlockVersion.getVersion() != 0, String.format("Parquet block could not be of version (%d)", 0));
                return new HoodieParquetDataBlock(this.inputStream, content, this.readBlockLazily, logBlockContentLoc, this.getTargetReaderSchemaForBlock(), header, footer, this.keyField);
            }
            case DELETE_BLOCK: {
                return new HoodieDeleteBlock(content, this.inputStream, this.readBlockLazily, Option.of(logBlockContentLoc), header, footer);
            }
            case COMMAND_BLOCK: {
                return new HoodieCommandBlock(content, this.inputStream, this.readBlockLazily, Option.of(logBlockContentLoc), header, footer);
            }
            case CDC_DATA_BLOCK: {
                return new HoodieCDCDataBlock(this.inputStream, content, this.readBlockLazily, logBlockContentLoc, this.readerSchema, header, this.keyField);
            }
        }
        throw new HoodieNotSupportedException("Unsupported Block " + (Object)((Object)blockType));
    }

    private Option<Schema> getTargetReaderSchemaForBlock() {
        if (this.internalSchema.isEmptySchema()) {
            return Option.ofNullable(this.readerSchema);
        }
        return Option.empty();
    }

    @Nullable
    private HoodieLogBlock.HoodieLogBlockType tryReadBlockType(HoodieLogFormat.LogFormatVersion blockVersion) throws IOException {
        if (blockVersion.getVersion() == 0) {
            return null;
        }
        int type = this.inputStream.readInt();
        ValidationUtils.checkArgument(type < HoodieLogBlock.HoodieLogBlockType.values().length, "Invalid block byte type found " + type);
        return HoodieLogBlock.HoodieLogBlockType.values()[type];
    }

    private HoodieLogBlock createCorruptBlock(long blockStartPos) throws IOException {
        LOG.info((Object)("Log " + this.logFile + " has a corrupted block at " + blockStartPos));
        this.inputStream.seek(blockStartPos);
        long nextBlockOffset = this.scanForNextAvailableBlockOffset();
        this.inputStream.seek(blockStartPos);
        LOG.info((Object)("Next available block in " + this.logFile + " starts at " + nextBlockOffset));
        int corruptedBlockSize = (int)(nextBlockOffset - blockStartPos);
        long contentPosition = this.inputStream.getPos();
        Option<byte[]> corruptedBytes = HoodieLogBlock.tryReadContent(this.inputStream, corruptedBlockSize, this.readBlockLazily);
        HoodieLogBlock.HoodieLogBlockContentLocation logBlockContentLoc = new HoodieLogBlock.HoodieLogBlockContentLocation(this.hadoopConf, this.logFile, contentPosition, corruptedBlockSize, nextBlockOffset);
        return new HoodieCorruptBlock(corruptedBytes, this.inputStream, this.readBlockLazily, Option.of(logBlockContentLoc), new HashMap<HoodieLogBlock.HeaderMetadataType, String>(), new HashMap<HoodieLogBlock.HeaderMetadataType, String>());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean isBlockCorrupted(int blocksize) throws IOException {
        long blockSizeFromFooter;
        if (StorageSchemes.isWriteTransactional(this.fs.getScheme())) {
            return false;
        }
        long currentPos = this.inputStream.getPos();
        try {
            this.inputStream.seek(currentPos + (long)blocksize - 8L);
            blockSizeFromFooter = this.inputStream.readLong() - (long)this.magicBuffer.length;
        }
        catch (EOFException e) {
            LOG.info((Object)("Found corrupted block in file " + this.logFile + " with block size(" + blocksize + ") running past EOF"));
            this.inputStream.seek(currentPos);
            return true;
        }
        if ((long)blocksize != blockSizeFromFooter) {
            LOG.info((Object)("Found corrupted block in file " + this.logFile + ". Header block size(" + blocksize + ") did not match the footer block size(" + blockSizeFromFooter + ")"));
            this.inputStream.seek(currentPos);
            return true;
        }
        try {
            this.readMagic();
            boolean e = false;
            return e;
        }
        catch (CorruptedLogFileException e) {
            LOG.info((Object)("Found corrupted block in file " + this.logFile + ". No magic hash found right after footer block size entry"));
            boolean bl = true;
            return bl;
        }
        finally {
            this.inputStream.seek(currentPos);
        }
    }

    private long scanForNextAvailableBlockOffset() throws IOException {
        byte[] dataBuf = new byte[0x100000];
        boolean eof = false;
        while (true) {
            long currentPos = this.inputStream.getPos();
            try {
                Arrays.fill(dataBuf, (byte)0);
                this.inputStream.readFully(dataBuf, 0, dataBuf.length);
            }
            catch (EOFException e) {
                eof = true;
            }
            long pos = Bytes.indexOf(dataBuf, HoodieLogFormat.MAGIC);
            if (pos >= 0L) {
                return currentPos + pos;
            }
            if (eof) {
                return this.inputStream.getPos();
            }
            this.inputStream.seek(currentPos + (long)dataBuf.length - (long)HoodieLogFormat.MAGIC.length);
        }
    }

    @Override
    public void close() throws IOException {
        if (!this.closed) {
            this.inputStream.close();
            if (null != this.shutdownThread) {
                Runtime.getRuntime().removeShutdownHook(this.shutdownThread);
            }
            this.closed = true;
        }
    }

    @Override
    public boolean hasNext() {
        try {
            return this.readMagic();
        }
        catch (IOException e) {
            throw new HoodieIOException("IOException when reading logfile " + this.logFile, e);
        }
    }

    private HoodieLogFormat.LogFormatVersion readVersion() throws IOException {
        return new HoodieLogFormatVersion(this.inputStream.readInt());
    }

    private boolean readMagic() throws IOException {
        try {
            boolean hasMagic = this.hasNextMagic();
            if (!hasMagic) {
                throw new CorruptedLogFileException(this.logFile + " could not be read. Did not find the magic bytes at the start of the block");
            }
            return hasMagic;
        }
        catch (EOFException e) {
            return false;
        }
    }

    private boolean hasNextMagic() throws IOException {
        this.inputStream.readFully(this.magicBuffer, 0, 6);
        return Arrays.equals(this.magicBuffer, HoodieLogFormat.MAGIC);
    }

    @Override
    public HoodieLogBlock next() {
        try {
            return this.readBlock();
        }
        catch (IOException io) {
            throw new HoodieIOException("IOException when reading logblock from log file " + this.logFile, io);
        }
    }

    @Override
    public boolean hasPrev() {
        try {
            if (!this.reverseReader) {
                throw new HoodieNotSupportedException("Reverse log reader has not been enabled");
            }
            this.reverseLogFilePosition = this.lastReverseLogFilePosition;
            this.reverseLogFilePosition -= 8L;
            this.lastReverseLogFilePosition = this.reverseLogFilePosition;
            this.inputStream.seek(this.reverseLogFilePosition);
        }
        catch (Exception e) {
            return false;
        }
        return true;
    }

    @Override
    public HoodieLogBlock prev() throws IOException {
        if (!this.reverseReader) {
            throw new HoodieNotSupportedException("Reverse log reader has not been enabled");
        }
        long blockSize = this.inputStream.readLong();
        long blockEndPos = this.inputStream.getPos();
        try {
            this.inputStream.seek(this.reverseLogFilePosition - blockSize);
        }
        catch (Exception e) {
            this.inputStream.seek(blockEndPos);
            throw new CorruptedLogFileException("Found possible corrupted block, cannot read log file in reverse, fallback to forward reading of logfile");
        }
        boolean hasNext = this.hasNext();
        this.reverseLogFilePosition -= blockSize;
        this.lastReverseLogFilePosition = this.reverseLogFilePosition;
        return this.next();
    }

    public long moveToPrev() throws IOException {
        if (!this.reverseReader) {
            throw new HoodieNotSupportedException("Reverse log reader has not been enabled");
        }
        this.inputStream.seek(this.lastReverseLogFilePosition);
        long blockSize = this.inputStream.readLong();
        this.inputStream.seek(this.reverseLogFilePosition - blockSize);
        this.reverseLogFilePosition -= blockSize;
        this.lastReverseLogFilePosition = this.reverseLogFilePosition;
        return this.reverseLogFilePosition;
    }

    @Override
    public void remove() {
        throw new UnsupportedOperationException("Remove not supported for HoodieLogFileReader");
    }

    private static FSDataInputStream getFSDataInputStream(FileSystem fs, HoodieLogFile logFile, int bufferSize) throws IOException {
        FSDataInputStream fsDataInputStream = fs.open(logFile.getPath(), bufferSize);
        if (FSUtils.isGCSFileSystem(fs)) {
            return new SchemeAwareFSDataInputStream((InputStream)HoodieLogFileReader.getFSDataInputStreamForGCS(fsDataInputStream, logFile, bufferSize), true);
        }
        if (FSUtils.isCHDFileSystem(fs)) {
            return new BoundedFsDataInputStream(fs, logFile.getPath(), (InputStream)fsDataInputStream);
        }
        if (fsDataInputStream.getWrappedStream() instanceof FSInputStream) {
            return new TimedFSDataInputStream(logFile.getPath(), new FSDataInputStream((InputStream)new BufferedFSInputStream((FSInputStream)fsDataInputStream.getWrappedStream(), bufferSize)));
        }
        return fsDataInputStream;
    }

    private static FSDataInputStream getFSDataInputStreamForGCS(FSDataInputStream fsDataInputStream, HoodieLogFile logFile, int bufferSize) {
        if (fsDataInputStream.getWrappedStream() instanceof FSInputStream) {
            return new TimedFSDataInputStream(logFile.getPath(), new FSDataInputStream((InputStream)new BufferedFSInputStream((FSInputStream)fsDataInputStream.getWrappedStream(), bufferSize)));
        }
        if (fsDataInputStream.getWrappedStream() instanceof FSDataInputStream && ((FSDataInputStream)fsDataInputStream.getWrappedStream()).getWrappedStream() instanceof FSInputStream) {
            FSInputStream inputStream = (FSInputStream)((FSDataInputStream)fsDataInputStream.getWrappedStream()).getWrappedStream();
            return new TimedFSDataInputStream(logFile.getPath(), new FSDataInputStream((InputStream)new BufferedFSInputStream(inputStream, bufferSize)));
        }
        return fsDataInputStream;
    }
}

