/*
 * Decompiled with CFR 0.152.
 */
package org.apache.samza.system.hdfs.reader;

import java.io.IOException;
import java.net.URI;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.file.SeekableInput;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumReader;
import org.apache.hadoop.fs.AvroFSInput;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
import org.apache.samza.SamzaException;
import org.apache.samza.system.IncomingMessageEnvelope;
import org.apache.samza.system.SystemStreamPartition;
import org.apache.samza.system.hdfs.reader.SingleFileHdfsReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AvroFileHdfsReader
implements SingleFileHdfsReader {
    private static final Logger LOG = LoggerFactory.getLogger(AvroFileHdfsReader.class);
    private final SystemStreamPartition systemStreamPartition;
    private DataFileReader<GenericRecord> fileReader;
    private long curBlockStart;
    private long curRecordOffset;

    public AvroFileHdfsReader(SystemStreamPartition systemStreamPartition) {
        this.systemStreamPartition = systemStreamPartition;
        this.fileReader = null;
    }

    @Override
    public void open(String pathStr, String singleFileOffset) {
        LOG.info(String.format("%s: Open file [%s] with file offset [%s] for read", this.systemStreamPartition, pathStr, singleFileOffset));
        Path path = new Path(pathStr);
        try {
            AvroFSInput input = new AvroFSInput(FileContext.getFileContext((URI)path.toUri()), path);
            this.fileReader = new DataFileReader((SeekableInput)input, (DatumReader)new GenericDatumReader());
            this.seek(singleFileOffset);
        }
        catch (IOException e) {
            throw new SamzaException((Throwable)e);
        }
    }

    @Override
    public void seek(String singleFileOffset) {
        try {
            AvroFileCheckpoint checkpoint = new AvroFileCheckpoint(singleFileOffset);
            if (checkpoint.isStartingOffset()) {
                this.fileReader.sync(0L);
                this.curBlockStart = this.fileReader.previousSync();
                this.curRecordOffset = 0L;
                return;
            }
            this.fileReader.seek(checkpoint.getBlockStart());
            int i = 0;
            while ((long)i < checkpoint.getRecordOffset()) {
                if (this.fileReader.hasNext()) {
                    this.fileReader.next();
                }
                ++i;
            }
            this.curBlockStart = checkpoint.getBlockStart();
            this.curRecordOffset = checkpoint.getRecordOffset();
        }
        catch (IOException e) {
            throw new SamzaException((Throwable)e);
        }
    }

    @Override
    public IncomingMessageEnvelope readNext() {
        String checkpoint = this.nextOffset();
        GenericRecord record = (GenericRecord)this.fileReader.next();
        if (this.fileReader.previousSync() != this.curBlockStart) {
            this.curBlockStart = this.fileReader.previousSync();
            this.curRecordOffset = 0L;
        } else {
            ++this.curRecordOffset;
        }
        return new IncomingMessageEnvelope(this.systemStreamPartition, checkpoint, null, (Object)record);
    }

    @Override
    public boolean hasNext() {
        return this.fileReader.hasNext();
    }

    @Override
    public void close() {
        LOG.info("About to close file reader for " + this.systemStreamPartition);
        try {
            this.fileReader.close();
        }
        catch (IOException e) {
            throw new SamzaException((Throwable)e);
        }
        LOG.info("File reader closed for " + this.systemStreamPartition);
    }

    @Override
    public String nextOffset() {
        return AvroFileCheckpoint.generateCheckpointStr(this.curBlockStart, this.curRecordOffset);
    }

    public static int offsetComparator(String offset1, String offset2) {
        AvroFileCheckpoint cp1 = new AvroFileCheckpoint(offset1);
        AvroFileCheckpoint cp2 = new AvroFileCheckpoint(offset2);
        return cp1.compareTo(cp2);
    }

    public static class AvroFileCheckpoint {
        private static final String CP_DELIM = "@";
        private long blockStart;
        private long recordOffset;
        String checkpointStr;

        public static String generateCheckpointStr(long blockStart, long recordOffset) {
            return blockStart + CP_DELIM + recordOffset;
        }

        public AvroFileCheckpoint(String checkpointStr) {
            String[] elements = checkpointStr.replaceAll("\\s", "").split(CP_DELIM);
            if (elements.length > 2 || elements.length < 1) {
                throw new SamzaException("Invalid checkpoint for AvroFileHdfsReader: " + checkpointStr);
            }
            try {
                this.blockStart = Long.parseLong(elements[0]);
                this.recordOffset = elements.length == 2 ? Long.parseLong(elements[1]) : 0L;
            }
            catch (NumberFormatException e) {
                throw new SamzaException("Invalid checkpoint for AvroFileHdfsReader: " + checkpointStr, (Throwable)e);
            }
            this.checkpointStr = checkpointStr;
        }

        public AvroFileCheckpoint(long blockStart, long recordOffset) {
            this.blockStart = blockStart;
            this.recordOffset = recordOffset;
            this.checkpointStr = AvroFileCheckpoint.generateCheckpointStr(blockStart, recordOffset);
        }

        public long getBlockStart() {
            return this.blockStart;
        }

        public long getRecordOffset() {
            return this.recordOffset;
        }

        public String getCheckpointStr() {
            return this.checkpointStr;
        }

        public boolean isStartingOffset() {
            return this.blockStart == 0L;
        }

        public int compareTo(AvroFileCheckpoint other) {
            if (this.blockStart < other.blockStart) {
                return -1;
            }
            if (this.blockStart > other.blockStart) {
                return 1;
            }
            return Long.compare(this.recordOffset, other.recordOffset);
        }

        public String toString() {
            return this.getCheckpointStr();
        }
    }
}

