/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.common.util.queue;

import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.hudi.common.util.CustomizedThreadFactory;
import org.apache.hudi.common.util.DefaultSizeEstimator;
import org.apache.hudi.common.util.Functions;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.SizeEstimator;
import org.apache.hudi.common.util.queue.BoundedInMemoryQueue;
import org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer;
import org.apache.hudi.common.util.queue.BoundedInMemoryQueueProducer;
import org.apache.hudi.common.util.queue.IteratorBasedQueueProducer;
import org.apache.hudi.exception.HoodieException;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

public class BoundedInMemoryExecutor<I, O, E> {
    private static final Logger LOG = LogManager.getLogger(BoundedInMemoryExecutor.class);
    private final ExecutorService producerExecutorService;
    private final ExecutorService consumerExecutorService;
    private final BoundedInMemoryQueue<I, O> queue;
    private final List<BoundedInMemoryQueueProducer<I>> producers;
    private final Option<BoundedInMemoryQueueConsumer<O, E>> consumer;
    private final Runnable preExecuteRunnable;

    public BoundedInMemoryExecutor(long bufferLimitInBytes, Iterator<I> inputItr, BoundedInMemoryQueueConsumer<O, E> consumer, Function<I, O> transformFunction, Runnable preExecuteRunnable) {
        this(bufferLimitInBytes, new IteratorBasedQueueProducer<I>(inputItr), Option.of(consumer), transformFunction, preExecuteRunnable);
    }

    public BoundedInMemoryExecutor(long bufferLimitInBytes, BoundedInMemoryQueueProducer<I> producer, Option<BoundedInMemoryQueueConsumer<O, E>> consumer, Function<I, O> transformFunction) {
        this(bufferLimitInBytes, producer, consumer, transformFunction, Functions.noop());
    }

    public BoundedInMemoryExecutor(long bufferLimitInBytes, BoundedInMemoryQueueProducer<I> producer, Option<BoundedInMemoryQueueConsumer<O, E>> consumer, Function<I, O> transformFunction, Runnable preExecuteRunnable) {
        this(bufferLimitInBytes, Collections.singletonList(producer), consumer, transformFunction, new DefaultSizeEstimator(), preExecuteRunnable);
    }

    public BoundedInMemoryExecutor(long bufferLimitInBytes, List<BoundedInMemoryQueueProducer<I>> producers, Option<BoundedInMemoryQueueConsumer<O, E>> consumer, Function<I, O> transformFunction, SizeEstimator<O> sizeEstimator, Runnable preExecuteRunnable) {
        this.producers = producers;
        this.consumer = consumer;
        this.preExecuteRunnable = preExecuteRunnable;
        this.producerExecutorService = Executors.newFixedThreadPool(producers.size(), new CustomizedThreadFactory("producer"));
        this.consumerExecutorService = Executors.newSingleThreadExecutor(new CustomizedThreadFactory("consumer"));
        this.queue = new BoundedInMemoryQueue<I, O>(bufferLimitInBytes, transformFunction, sizeEstimator);
    }

    public ExecutorCompletionService<Boolean> startProducers() {
        CountDownLatch latch = new CountDownLatch(this.producers.size());
        ExecutorCompletionService<Boolean> completionService = new ExecutorCompletionService<Boolean>(this.producerExecutorService);
        this.producers.stream().map(producer -> completionService.submit(() -> {
            try {
                this.preExecuteRunnable.run();
                producer.produce(this.queue);
            }
            catch (Throwable e) {
                LOG.error((Object)"error producing records", e);
                this.queue.markAsFailed(e);
                throw e;
            }
            finally {
                CountDownLatch countDownLatch = latch;
                synchronized (countDownLatch) {
                    latch.countDown();
                    if (latch.getCount() == 0L) {
                        this.queue.close();
                    }
                }
            }
            return true;
        })).collect(Collectors.toList());
        return completionService;
    }

    private Future<E> startConsumer() {
        return this.consumer.map(consumer -> this.consumerExecutorService.submit(() -> {
            LOG.info((Object)"starting consumer thread");
            this.preExecuteRunnable.run();
            try {
                Object result = consumer.consume(this.queue);
                LOG.info((Object)"Queue Consumption is done; notifying producer threads");
                return result;
            }
            catch (Exception e) {
                LOG.error((Object)"error consuming records", (Throwable)e);
                this.queue.markAsFailed(e);
                throw e;
            }
        })).orElse(CompletableFuture.completedFuture(null));
    }

    public E execute() {
        try {
            this.startProducers();
            Future<E> future = this.startConsumer();
            return future.get();
        }
        catch (InterruptedException ie) {
            this.shutdownNow();
            Thread.currentThread().interrupt();
            throw new HoodieException(ie);
        }
        catch (Exception e) {
            throw new HoodieException(e);
        }
    }

    public boolean isRemaining() {
        return this.queue.iterator().hasNext();
    }

    public void shutdownNow() {
        this.producerExecutorService.shutdownNow();
        this.consumerExecutorService.shutdownNow();
    }

    public BoundedInMemoryQueue<I, O> getQueue() {
        return this.queue;
    }
}

