/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.hadoop.realtime;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.table.log.HoodieUnMergedLogRecordScanner;
import org.apache.hudi.common.util.DefaultSizeEstimator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor;
import org.apache.hudi.common.util.queue.BoundedInMemoryQueueProducer;
import org.apache.hudi.common.util.queue.FunctionBasedQueueProducer;
import org.apache.hudi.common.util.queue.IteratorBasedQueueProducer;
import org.apache.hudi.hadoop.RecordReaderValueIterator;
import org.apache.hudi.hadoop.SafeParquetRecordReaderWrapper;
import org.apache.hudi.hadoop.realtime.AbstractRealtimeRecordReader;
import org.apache.hudi.hadoop.realtime.RealtimeSplit;
import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils;
import org.apache.hudi.org.apache.avro.generic.GenericRecord;

class RealtimeUnmergedRecordReader
extends AbstractRealtimeRecordReader
implements RecordReader<NullWritable, ArrayWritable> {
    private final HoodieUnMergedLogRecordScanner logRecordScanner;
    private final RecordReader<NullWritable, ArrayWritable> parquetReader;
    private final RecordReaderValueIterator<NullWritable, ArrayWritable> parquetRecordsIterator;
    private final BoundedInMemoryExecutor<ArrayWritable, ArrayWritable, ?> executor;
    private final Iterator<ArrayWritable> iterator;

    public RealtimeUnmergedRecordReader(RealtimeSplit split, JobConf job, RecordReader<NullWritable, ArrayWritable> realReader) {
        super(split, job);
        this.parquetReader = new SafeParquetRecordReaderWrapper(realReader);
        this.parquetRecordsIterator = new RecordReaderValueIterator<NullWritable, ArrayWritable>(this.parquetReader);
        this.executor = new BoundedInMemoryExecutor(HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes(this.jobConf), this.getParallelProducers(), Option.empty(), x -> x, new DefaultSizeEstimator());
        this.iterator = this.executor.getQueue().iterator();
        this.logRecordScanner = new HoodieUnMergedLogRecordScanner(FSUtils.getFs(split.getPath().toString(), (Configuration)this.jobConf), split.getBasePath(), split.getDeltaLogPaths(), this.getReaderSchema(), split.getMaxCommitTime(), Boolean.parseBoolean(this.jobConf.get("compaction.lazy.block.read.enabled", "true")), false, this.jobConf.getInt("hoodie.memory.dfs.buffer.max.size", 0x100000), record -> {
            GenericRecord rec = (GenericRecord)record.getData().getInsertValue(this.getReaderSchema()).get();
            ArrayWritable aWritable = (ArrayWritable)HoodieRealtimeRecordReaderUtils.avroToArrayWritable(rec, this.getHiveSchema());
            this.executor.getQueue().insertRecord(aWritable);
        });
        this.executor.startProducers();
    }

    private List<BoundedInMemoryQueueProducer<ArrayWritable>> getParallelProducers() {
        ArrayList<BoundedInMemoryQueueProducer<ArrayWritable>> producers = new ArrayList<BoundedInMemoryQueueProducer<ArrayWritable>>();
        producers.add(new FunctionBasedQueueProducer(buffer -> {
            this.logRecordScanner.scan();
            return null;
        }));
        producers.add(new IteratorBasedQueueProducer(this.parquetRecordsIterator));
        return producers;
    }

    public boolean next(NullWritable key, ArrayWritable value) {
        if (!this.iterator.hasNext()) {
            return false;
        }
        value.set(this.iterator.next().get());
        return true;
    }

    public NullWritable createKey() {
        return (NullWritable)this.parquetReader.createKey();
    }

    public ArrayWritable createValue() {
        return (ArrayWritable)this.parquetReader.createValue();
    }

    public long getPos() {
        return 0L;
    }

    public void close() throws IOException {
        this.parquetRecordsIterator.close();
        this.executor.shutdownNow();
    }

    public float getProgress() throws IOException {
        return Math.min(this.parquetReader.getProgress(), this.logRecordScanner.getProgress());
    }
}

