/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.func;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.function.Function;
import org.apache.avro.Schema;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hudi.WriteStatus;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.func.LazyIterableIterator;
import org.apache.hudi.func.SparkBoundedInMemoryExecutor;
import org.apache.hudi.io.HoodieCreateHandle;
import org.apache.hudi.io.HoodieWriteHandle;
import org.apache.hudi.table.HoodieTable;

public class CopyOnWriteLazyInsertIterable<T extends HoodieRecordPayload>
extends LazyIterableIterator<HoodieRecord<T>, List<WriteStatus>> {
    protected final HoodieWriteConfig hoodieConfig;
    protected final String commitTime;
    protected final HoodieTable<T> hoodieTable;
    protected final String idPrefix;
    protected int numFilesWritten;

    public CopyOnWriteLazyInsertIterable(Iterator<HoodieRecord<T>> sortedRecordItr, HoodieWriteConfig config, String commitTime, HoodieTable<T> hoodieTable, String idPrefix) {
        super(sortedRecordItr);
        this.hoodieConfig = config;
        this.commitTime = commitTime;
        this.hoodieTable = hoodieTable;
        this.idPrefix = idPrefix;
        this.numFilesWritten = 0;
    }

    static <T extends HoodieRecordPayload> Function<HoodieRecord<T>, HoodieInsertValueGenResult<HoodieRecord>> getTransformFunction(Schema schema) {
        return hoodieRecord -> new HoodieInsertValueGenResult<HoodieRecord>((HoodieRecord)hoodieRecord, schema);
    }

    @Override
    protected void start() {
    }

    @Override
    protected List<WriteStatus> computeNext() {
        SparkBoundedInMemoryExecutor<HoodieRecord<T>, HoodieInsertValueGenResult<HoodieRecord>, List<WriteStatus>> bufferedIteratorExecutor = null;
        try {
            Schema schema = new Schema.Parser().parse(this.hoodieConfig.getSchema());
            bufferedIteratorExecutor = new SparkBoundedInMemoryExecutor<HoodieRecord<T>, HoodieInsertValueGenResult<HoodieRecord>, List<WriteStatus>>(this.hoodieConfig, this.inputItr, this.getInsertHandler(), CopyOnWriteLazyInsertIterable.getTransformFunction(schema));
            List result = (List)bufferedIteratorExecutor.execute();
            assert (result != null && !result.isEmpty() && !bufferedIteratorExecutor.isRemaining());
            List list = result;
            return list;
        }
        catch (Exception e) {
            throw new HoodieException(e);
        }
        finally {
            if (null != bufferedIteratorExecutor) {
                bufferedIteratorExecutor.shutdownNow();
            }
        }
    }

    @Override
    protected void end() {
    }

    protected String getNextFileId(String idPfx) {
        return String.format("%s-%d", idPfx, this.numFilesWritten++);
    }

    protected CopyOnWriteInsertHandler getInsertHandler() {
        return new CopyOnWriteInsertHandler();
    }

    protected class CopyOnWriteInsertHandler
    extends BoundedInMemoryQueueConsumer<HoodieInsertValueGenResult<HoodieRecord>, List<WriteStatus>> {
        protected final List<WriteStatus> statuses = new ArrayList<WriteStatus>();
        protected HoodieWriteHandle handle;

        protected CopyOnWriteInsertHandler() {
        }

        @Override
        protected void consumeOneRecord(HoodieInsertValueGenResult<HoodieRecord> payload) {
            Object insertPayload = payload.record;
            if (this.handle == null) {
                this.handle = new HoodieCreateHandle(CopyOnWriteLazyInsertIterable.this.hoodieConfig, CopyOnWriteLazyInsertIterable.this.commitTime, CopyOnWriteLazyInsertIterable.this.hoodieTable, ((HoodieRecord)insertPayload).getPartitionPath(), CopyOnWriteLazyInsertIterable.this.getNextFileId(CopyOnWriteLazyInsertIterable.this.idPrefix));
            }
            if (this.handle.canWrite((HoodieRecord)payload.record)) {
                this.handle.write((HoodieRecord)insertPayload, payload.insertValue, payload.exception);
            } else {
                this.statuses.add(this.handle.close());
                this.handle = new HoodieCreateHandle(CopyOnWriteLazyInsertIterable.this.hoodieConfig, CopyOnWriteLazyInsertIterable.this.commitTime, CopyOnWriteLazyInsertIterable.this.hoodieTable, ((HoodieRecord)insertPayload).getPartitionPath(), CopyOnWriteLazyInsertIterable.this.getNextFileId(CopyOnWriteLazyInsertIterable.this.idPrefix));
                this.handle.write((HoodieRecord)insertPayload, payload.insertValue, payload.exception);
            }
        }

        @Override
        protected void finish() {
            if (this.handle != null) {
                this.statuses.add(this.handle.close());
            }
            this.handle = null;
            assert (this.statuses.size() > 0);
        }

        @Override
        protected List<WriteStatus> getResult() {
            return this.statuses;
        }
    }

    static class HoodieInsertValueGenResult<T extends HoodieRecord> {
        public T record;
        public Option<IndexedRecord> insertValue;
        public Option<Exception> exception = Option.empty();

        public HoodieInsertValueGenResult(T record, Schema schema) {
            this.record = record;
            try {
                this.insertValue = ((HoodieRecord)record).getData().getInsertValue(schema);
            }
            catch (Exception e) {
                this.exception = Option.of(e);
            }
        }
    }
}

