/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.hadoop.realtime;

import java.io.IOException;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.function.Function;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hudi.common.config.HoodieMemoryConfig;
import org.apache.hudi.common.table.log.HoodieUnMergedLogRecordScanner;
import org.apache.hudi.common.util.DefaultSizeEstimator;
import org.apache.hudi.common.util.Functions;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor;
import org.apache.hudi.common.util.queue.FunctionBasedQueueProducer;
import org.apache.hudi.common.util.queue.HoodieProducer;
import org.apache.hudi.common.util.queue.IteratorBasedQueueProducer;
import org.apache.hudi.hadoop.RecordReaderValueIterator;
import org.apache.hudi.hadoop.SafeParquetRecordReaderWrapper;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.hadoop.realtime.AbstractRealtimeRecordReader;
import org.apache.hudi.hadoop.realtime.RealtimeSplit;
import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils;
import org.apache.hudi.storage.HoodieStorageUtils;

class RealtimeUnmergedRecordReader
extends AbstractRealtimeRecordReader
implements RecordReader<NullWritable, ArrayWritable> {
    private final RecordReader<NullWritable, ArrayWritable> parquetReader;
    private final RecordReaderValueIterator<NullWritable, ArrayWritable> parquetRecordsIterator;
    private final BoundedInMemoryExecutor<ArrayWritable, ArrayWritable, ?> executor;
    private final Iterator<ArrayWritable> iterator;

    public RealtimeUnmergedRecordReader(RealtimeSplit split, JobConf job, RecordReader<NullWritable, ArrayWritable> realReader) {
        super(split, job);
        this.parquetReader = new SafeParquetRecordReaderWrapper(realReader);
        this.parquetRecordsIterator = new RecordReaderValueIterator<NullWritable, ArrayWritable>(this.parquetReader);
        HoodieUnMergedLogRecordScanner.Builder scannerBuilder = ((HoodieUnMergedLogRecordScanner.Builder)HoodieUnMergedLogRecordScanner.newBuilder().withStorage(HoodieStorageUtils.getStorage(split.getPath().toString(), HadoopFSUtils.getStorageConf((Configuration)this.jobConf))).withBasePath(split.getBasePath()).withLogFilePaths((List)split.getDeltaLogPaths())).withReaderSchema(this.getReaderSchema()).withLatestInstantTime(split.getMaxCommitTime()).withReverseReader(false).withBufferSize(this.jobConf.getInt(HoodieMemoryConfig.MAX_DFS_STREAM_BUFFER_SIZE.key(), 0x100000));
        this.executor = new BoundedInMemoryExecutor(HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes(this.jobConf), this.getParallelProducers(scannerBuilder), Option.empty(), Function.identity(), new DefaultSizeEstimator(), Functions.noop());
        this.iterator = this.executor.getRecordIterator();
        this.executor.startProducingAsync();
    }

    private List<HoodieProducer<ArrayWritable>> getParallelProducers(HoodieUnMergedLogRecordScanner.Builder scannerBuilder) {
        return Arrays.asList(new FunctionBasedQueueProducer(queue -> {
            HoodieUnMergedLogRecordScanner scanner = scannerBuilder.withLogRecordScannerCallback(record -> {
                GenericRecord rec = (GenericRecord)record.toIndexedRecord(this.getReaderSchema(), this.payloadProps).get().getData();
                ArrayWritable aWritable = (ArrayWritable)HoodieRealtimeRecordReaderUtils.avroToArrayWritable(rec, this.getHiveSchema(), this.isSupportTimestamp());
                queue.insertRecord(aWritable);
            }).build();
            scanner.scan();
            return null;
        }), new IteratorBasedQueueProducer(this.parquetRecordsIterator));
    }

    public boolean next(NullWritable key, ArrayWritable value) {
        if (!this.iterator.hasNext()) {
            return false;
        }
        value.set(this.iterator.next().get());
        return true;
    }

    public NullWritable createKey() {
        return (NullWritable)this.parquetReader.createKey();
    }

    public ArrayWritable createValue() {
        return (ArrayWritable)this.parquetReader.createValue();
    }

    public long getPos() {
        return 0L;
    }

    public void close() throws IOException {
        this.parquetRecordsIterator.close();
        this.executor.shutdownNow();
    }

    public float getProgress() throws IOException {
        return this.parquetReader.getProgress();
    }
}

