/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.format.avro;

import java.io.IOException;
import java.util.Iterator;
import java.util.function.Function;
import javax.annotation.Nullable;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.format.FormatReaderFactory;
import org.apache.paimon.format.avro.SeekableInputStreamWrapper;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.shade.org.apache.avro.Schema;
import org.apache.paimon.shade.org.apache.avro.file.DataFileReader;
import org.apache.paimon.shade.org.apache.avro.generic.GenericDatumReader;
import org.apache.paimon.utils.IOUtils;
import org.apache.paimon.utils.IteratorResultIterator;
import org.apache.paimon.utils.Pool;

public abstract class AbstractAvroBulkFormat<A>
implements FormatReaderFactory {
    private static final long serialVersionUID = 1L;
    protected final Schema readerSchema;

    protected AbstractAvroBulkFormat(Schema readerSchema) {
        this.readerSchema = readerSchema;
    }

    public AvroReader createReader(FileIO fileIO, Path file) throws IOException {
        return this.createReader(fileIO, file, this.createReusedAvroRecord(), this.createConverter());
    }

    private AvroReader createReader(FileIO fileIO, Path file, A reuse, Function<A, InternalRow> converter) throws IOException {
        return new AvroReader(fileIO, file, 0L, fileIO.getFileSize(file), -1L, 0L, reuse, converter);
    }

    protected abstract A createReusedAvroRecord();

    protected abstract Function<A, InternalRow> createConverter();

    private class AvroBlockIterator
    implements Iterator<InternalRow> {
        private long numRecordsRemaining;
        private final DataFileReader<A> reader;
        private final A reuse;
        private final Function<A, InternalRow> converter;

        private AvroBlockIterator(long numRecordsRemaining, DataFileReader<A> reader, A reuse, Function<A, InternalRow> converter) {
            this.numRecordsRemaining = numRecordsRemaining;
            this.reader = reader;
            this.reuse = reuse;
            this.converter = converter;
        }

        @Override
        public boolean hasNext() {
            return this.numRecordsRemaining > 0L;
        }

        @Override
        public InternalRow next() {
            try {
                --this.numRecordsRemaining;
                return this.converter.apply(this.reader.next(this.reuse));
            }
            catch (IOException e) {
                throw new RuntimeException("Encountered exception when reading from avro format file", e);
            }
        }
    }

    private class AvroReader
    implements RecordReader<InternalRow> {
        private final FileIO fileIO;
        private final DataFileReader<A> reader;
        private final Function<A, InternalRow> converter;
        private final long end;
        private final Pool<A> pool;
        private long currentRecordsToSkip;

        private AvroReader(FileIO fileIO, Path path, long offset, long end, long blockStart, long recordsToSkip, A reuse, Function<A, InternalRow> converter) throws IOException {
            this.fileIO = fileIO;
            this.reader = this.createReaderFromPath(path);
            if (blockStart >= 0L) {
                this.reader.seek(blockStart);
            } else {
                this.reader.sync(offset);
            }
            int i = 0;
            while ((long)i < recordsToSkip) {
                this.reader.next(reuse);
                ++i;
            }
            this.converter = converter;
            this.end = end;
            this.pool = new Pool(1);
            this.pool.add(reuse);
            this.currentRecordsToSkip = recordsToSkip;
        }

        private DataFileReader<A> createReaderFromPath(Path path) throws IOException {
            GenericDatumReader datumReader = new GenericDatumReader(null, AbstractAvroBulkFormat.this.readerSchema);
            SeekableInputStreamWrapper in = new SeekableInputStreamWrapper(this.fileIO.newInputStream(path), this.fileIO.getFileSize(path));
            try {
                return (DataFileReader)DataFileReader.openReader(in, datumReader);
            }
            catch (Throwable e) {
                IOUtils.closeQuietly(in);
                throw e;
            }
        }

        @Override
        @Nullable
        public RecordReader.RecordIterator<InternalRow> readBatch() throws IOException {
            Object reuse;
            try {
                reuse = this.pool.pollEntry();
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException("Interrupted while waiting for the previous batch to be consumed", e);
            }
            if (!this.readNextBlock()) {
                this.pool.recycler().recycle(reuse);
                return null;
            }
            AvroBlockIterator iterator = new AvroBlockIterator(this.reader.getBlockCount() - this.currentRecordsToSkip, this.reader, reuse, this.converter);
            this.currentRecordsToSkip = 0L;
            return new IteratorResultIterator<InternalRow>(iterator, () -> this.pool.recycler().recycle(reuse));
        }

        private boolean readNextBlock() throws IOException {
            return this.reader.hasNext() && !this.reader.pastSync(this.end);
        }

        @Override
        public void close() throws IOException {
            this.reader.close();
        }
    }
}

