/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.common.util.queue;

import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.hudi.common.util.CustomizedThreadFactory;
import org.apache.hudi.common.util.FutureUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.queue.HoodieConsumer;
import org.apache.hudi.common.util.queue.HoodieExecutor;
import org.apache.hudi.common.util.queue.HoodieMessageQueue;
import org.apache.hudi.common.util.queue.HoodieProducer;
import org.apache.hudi.exception.HoodieException;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

public abstract class BaseHoodieQueueBasedExecutor<I, O, E>
implements HoodieExecutor<E> {
    private static final long TERMINATE_WAITING_TIME_SECS = 60L;
    private final Logger logger = LogManager.getLogger(this.getClass());
    private final ExecutorService producerExecutorService;
    private final ExecutorService consumerExecutorService;
    protected final HoodieMessageQueue<I, O> queue;
    private final List<HoodieProducer<I>> producers;
    protected final Option<HoodieConsumer<O, E>> consumer;

    public BaseHoodieQueueBasedExecutor(List<HoodieProducer<I>> producers, Option<HoodieConsumer<O, E>> consumer2, HoodieMessageQueue<I, O> queue, Runnable preExecuteRunnable) {
        this.queue = queue;
        this.producers = producers;
        this.consumer = consumer2;
        this.producerExecutorService = Executors.newFixedThreadPool(Math.max(1, producers.size()), new CustomizedThreadFactory("executor-queue-producer", preExecuteRunnable));
        this.consumerExecutorService = Executors.newSingleThreadExecutor(new CustomizedThreadFactory("executor-queue-consumer", preExecuteRunnable));
    }

    protected void doProduce(HoodieMessageQueue<I, O> queue, HoodieProducer<I> producer) {
        this.logger.info((Object)"Starting producer, populating records into the queue");
        try {
            producer.produce(queue);
            this.logger.info((Object)"Finished producing records into the queue");
        }
        catch (Exception e) {
            this.logger.error((Object)"Failed to produce records", (Throwable)e);
            queue.markAsFailed(e);
            throw new HoodieException("Failed to produce records", e);
        }
    }

    protected abstract void doConsume(HoodieMessageQueue<I, O> var1, HoodieConsumer<O, E> var2);

    protected void setUp() {
    }

    public final CompletableFuture<Void> startProducingAsync() {
        return ((CompletableFuture)FutureUtils.allOf(this.producers.stream().map(producer -> CompletableFuture.supplyAsync(() -> {
            this.doProduce(this.queue, (HoodieProducer<I>)producer);
            return null;
        }, this.producerExecutorService)).collect(Collectors.toList())).thenApply(ignored -> null)).whenComplete((result, throwable) -> {
            this.producers.forEach(HoodieProducer::close);
            this.queue.seal();
        });
    }

    private CompletableFuture<Void> startConsumingAsync() {
        return this.consumer.map(consumer2 -> CompletableFuture.supplyAsync(() -> {
            this.doConsume(this.queue, (HoodieConsumer<O, E>)consumer2);
            return null;
        }, this.consumerExecutorService)).orElse(CompletableFuture.completedFuture(null));
    }

    @Override
    public final boolean awaitTermination() {
        boolean interruptedBefore = Thread.interrupted();
        boolean producerTerminated = false;
        boolean consumerTerminated = false;
        try {
            producerTerminated = this.producerExecutorService.awaitTermination(60L, TimeUnit.SECONDS);
            consumerTerminated = this.consumerExecutorService.awaitTermination(60L, TimeUnit.SECONDS);
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
        if (interruptedBefore) {
            Thread.currentThread().interrupt();
        }
        return producerTerminated && consumerTerminated;
    }

    @Override
    public void shutdownNow() {
        this.producerExecutorService.shutdownNow();
        this.consumerExecutorService.shutdownNow();
    }

    public boolean isRunning() {
        return !this.queue.isEmpty();
    }

    @Override
    public E execute() {
        try {
            ValidationUtils.checkState(this.consumer.isPresent());
            this.setUp();
            CompletableFuture<Void> consuming = this.startConsumingAsync();
            CompletableFuture<Void> producing = this.startProducingAsync();
            return (E)((CompletableFuture)((CompletableFuture)((CompletableFuture)producing.thenCombine(consuming, (aVoid, anotherVoid) -> null)).whenComplete((ignored, throwable) -> this.queue.close())).thenApply(ignored -> this.consumer.get().finish())).get();
        }
        catch (Exception e) {
            if (e instanceof InterruptedException) {
                Thread.currentThread().interrupt();
            }
            throw new HoodieException(e);
        }
    }
}

