/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.raft.internals;

import java.io.DataInput;
import java.io.DataInputStream;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.Optional;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.Readable;
import org.apache.kafka.common.record.DefaultRecordBatch;
import org.apache.kafka.common.record.FileRecords;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.MutableRecordBatch;
import org.apache.kafka.common.record.Records;
import org.apache.kafka.common.utils.BufferSupplier;
import org.apache.kafka.common.utils.ByteUtils;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.raft.Batch;
import org.apache.kafka.server.common.serialization.RecordSerde;

public final class RecordsIterator<T>
implements Iterator<Batch<T>>,
AutoCloseable {
    private final Records records;
    private final RecordSerde<T> serde;
    private final BufferSupplier bufferSupplier;
    private final int batchSize;
    private final boolean doCrcValidation;
    private Iterator<MutableRecordBatch> nextBatches = Collections.emptyIterator();
    private Optional<Batch<T>> nextBatch = Optional.empty();
    private Optional<ByteBuffer> allocatedBuffer = Optional.empty();
    private int bytesRead = 0;
    private boolean isClosed = false;

    public RecordsIterator(Records records, RecordSerde<T> serde, BufferSupplier bufferSupplier, int batchSize, boolean doCrcValidation) {
        this.records = records;
        this.serde = serde;
        this.bufferSupplier = bufferSupplier;
        this.batchSize = Math.max(batchSize, 17);
        this.doCrcValidation = doCrcValidation;
    }

    @Override
    public boolean hasNext() {
        this.ensureOpen();
        if (!this.nextBatch.isPresent()) {
            this.nextBatch = this.nextBatch();
        }
        return this.nextBatch.isPresent();
    }

    @Override
    public Batch<T> next() {
        if (!this.hasNext()) {
            throw new NoSuchElementException("Batch iterator doesn't have any more elements");
        }
        Batch<T> batch = this.nextBatch.get();
        this.nextBatch = Optional.empty();
        return batch;
    }

    @Override
    public void close() {
        this.isClosed = true;
        this.allocatedBuffer.ifPresent(arg_0 -> ((BufferSupplier)this.bufferSupplier).release(arg_0));
        this.allocatedBuffer = Optional.empty();
    }

    private void ensureOpen() {
        if (this.isClosed) {
            throw new IllegalStateException("Serde record batch iterator was closed");
        }
    }

    private MemoryRecords readFileRecords(FileRecords fileRecords, ByteBuffer buffer) {
        int start = buffer.position();
        try {
            fileRecords.readInto(buffer, this.bytesRead);
        }
        catch (IOException e) {
            throw new UncheckedIOException("Failed to read records into memory", e);
        }
        this.bytesRead += buffer.limit() - start;
        return MemoryRecords.readableRecords((ByteBuffer)buffer.slice());
    }

    private MemoryRecords createMemoryRecords(FileRecords fileRecords) {
        ByteBuffer buffer;
        if (this.allocatedBuffer.isPresent()) {
            buffer = this.allocatedBuffer.get();
            buffer.compact();
        } else {
            buffer = this.bufferSupplier.get(Math.min(this.batchSize, this.records.sizeInBytes()));
            this.allocatedBuffer = Optional.of(buffer);
        }
        MemoryRecords memoryRecords = this.readFileRecords(fileRecords, buffer);
        if (memoryRecords.firstBatchSize() <= buffer.remaining()) {
            return memoryRecords;
        }
        ByteBuffer newBuffer = this.bufferSupplier.get(memoryRecords.firstBatchSize().intValue());
        this.allocatedBuffer = Optional.of(newBuffer);
        newBuffer.put(buffer);
        this.bufferSupplier.release(buffer);
        return this.readFileRecords(fileRecords, newBuffer);
    }

    private Iterator<MutableRecordBatch> nextBatches() {
        int recordSize = this.records.sizeInBytes();
        if (this.bytesRead < recordSize) {
            MemoryRecords memoryRecords;
            if (this.records instanceof MemoryRecords) {
                this.bytesRead = recordSize;
                memoryRecords = (MemoryRecords)this.records;
            } else if (this.records instanceof FileRecords) {
                memoryRecords = this.createMemoryRecords((FileRecords)this.records);
            } else {
                throw new IllegalStateException(String.format("Unexpected Records type %s", this.records.getClass()));
            }
            return memoryRecords.batchIterator();
        }
        return Collections.emptyIterator();
    }

    private Optional<Batch<T>> nextBatch() {
        if (!this.nextBatches.hasNext()) {
            this.nextBatches = this.nextBatches();
        }
        if (this.nextBatches.hasNext()) {
            MutableRecordBatch nextBatch = this.nextBatches.next();
            this.allocatedBuffer.ifPresent(buffer -> buffer.position(buffer.position() + nextBatch.sizeInBytes()));
            if (!(nextBatch instanceof DefaultRecordBatch)) {
                throw new IllegalStateException(String.format("DefaultRecordBatch expected by record type was %s", nextBatch.getClass()));
            }
            return Optional.of(this.readBatch((DefaultRecordBatch)nextBatch));
        }
        return Optional.empty();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private Batch<T> readBatch(DefaultRecordBatch batch) {
        Batch<Object> result;
        if (this.doCrcValidation) {
            batch.ensureValid();
        }
        if (batch.isControlBatch()) {
            result = Batch.control(batch.baseOffset(), batch.partitionLeaderEpoch(), batch.maxTimestamp(), batch.sizeInBytes(), batch.lastOffset());
        } else {
            Integer numRecords = batch.countOrNull();
            if (numRecords == null) {
                throw new IllegalStateException("Expected a record count for the records batch");
            }
            ArrayList<T> records = new ArrayList<T>(numRecords);
            DataInputStream input = new DataInputStream(batch.recordInputStream(this.bufferSupplier));
            try {
                for (int i = 0; i < numRecords; ++i) {
                    T record = this.readRecord(input, batch.sizeInBytes());
                    records.add(record);
                }
            }
            finally {
                Utils.closeQuietly((AutoCloseable)input, (String)"DataInputStream");
            }
            result = Batch.data(batch.baseOffset(), batch.partitionLeaderEpoch(), batch.maxTimestamp(), batch.sizeInBytes(), records);
        }
        return result;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private T readRecord(DataInputStream stream, int totalBatchSize) {
        int size;
        try {
            size = ByteUtils.readVarint((DataInput)stream);
        }
        catch (IOException e) {
            throw new UncheckedIOException("Unable to read record size", e);
        }
        if (size <= 0) {
            throw new RuntimeException("Invalid non-positive frame size: " + size);
        }
        if (size > totalBatchSize) {
            throw new RuntimeException("Specified frame size, " + size + ", is larger than the entire size of the batch, which is " + totalBatchSize);
        }
        ByteBuffer buf = this.bufferSupplier.get(size);
        buf.limit(size - 1);
        try {
            int bytesRead = stream.read(buf.array(), 0, size);
            if (bytesRead != size) {
                throw new RuntimeException("Unable to read " + size + " bytes, only read " + bytesRead);
            }
        }
        catch (IOException e) {
            throw new UncheckedIOException("Failed to read record bytes", e);
        }
        try {
            ByteBufferAccessor input = new ByteBufferAccessor(buf);
            input.readByte();
            long timestampDelta = input.readVarlong();
            if (timestampDelta != 0L) {
                throw new IllegalArgumentException("Got timestamp delta of " + timestampDelta + ", but this is invalid because it is not 0 as expected.");
            }
            input.readVarint();
            int keySize = input.readVarint();
            if (keySize != -1) {
                throw new IllegalArgumentException("Got key size of " + keySize + ", but this is invalid because it is not -1 as expected.");
            }
            int valueSize = input.readVarint();
            if (valueSize < 1) {
                throw new IllegalArgumentException("Got payload size of " + valueSize + ", but this is invalid because it is less than 1.");
            }
            Object record = this.serde.read((Readable)input, valueSize);
            byte numHeaders = buf.array()[size - 1];
            if (numHeaders != 0) {
                throw new IllegalArgumentException("Got numHeaders of " + numHeaders + ", but this is invalid because it is not 0 as expected.");
            }
            Object object = record;
            return (T)object;
        }
        finally {
            this.bufferSupplier.release(buf);
        }
    }
}

