/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.common.util.queue;

import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.function.Function;
import org.apache.hudi.common.util.DefaultSizeEstimator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.SizeEstimator;
import org.apache.hudi.common.util.queue.BaseHoodieQueueBasedExecutor;
import org.apache.hudi.common.util.queue.BoundedInMemoryQueue;
import org.apache.hudi.common.util.queue.HoodieConsumer;
import org.apache.hudi.common.util.queue.HoodieMessageQueue;
import org.apache.hudi.common.util.queue.HoodieProducer;
import org.apache.hudi.common.util.queue.IteratorBasedQueueProducer;
import org.apache.hudi.exception.HoodieException;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

public class BoundedInMemoryExecutor<I, O, E>
extends BaseHoodieQueueBasedExecutor<I, O, E> {
    private static final Logger LOG = LogManager.getLogger(BoundedInMemoryExecutor.class);

    public BoundedInMemoryExecutor(long bufferLimitInBytes, Iterator<I> inputItr, HoodieConsumer<O, E> consumer2, Function<I, O> transformFunction, Runnable preExecuteRunnable) {
        this(bufferLimitInBytes, Collections.singletonList(new IteratorBasedQueueProducer<I>(inputItr)), Option.of(consumer2), transformFunction, new DefaultSizeEstimator(), preExecuteRunnable);
    }

    public BoundedInMemoryExecutor(long bufferLimitInBytes, List<HoodieProducer<I>> producers, Option<HoodieConsumer<O, E>> consumer2, Function<I, O> transformFunction, SizeEstimator<O> sizeEstimator, Runnable preExecuteRunnable) {
        super(producers, consumer2, new BoundedInMemoryQueue<I, O>(bufferLimitInBytes, transformFunction, sizeEstimator), preExecuteRunnable);
    }

    @Override
    protected void doConsume(HoodieMessageQueue<I, O> queue, HoodieConsumer<O, E> consumer2) {
        LOG.info((Object)"Starting consumer, consuming records from the queue");
        try {
            Iterator it = ((BoundedInMemoryQueue)queue).iterator();
            while (it.hasNext()) {
                consumer2.consume(it.next());
            }
            LOG.info((Object)"All records from the queue have been consumed");
        }
        catch (Exception e) {
            LOG.error((Object)"Failed consuming records", (Throwable)e);
            queue.markAsFailed(e);
            throw new HoodieException(e);
        }
    }

    public Iterator<O> getRecordIterator() {
        return ((BoundedInMemoryQueue)this.queue).iterator();
    }
}

