/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.async;

import java.io.IOException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.IntStream;
import org.apache.hudi.async.HoodieAsyncService;
import org.apache.hudi.client.AbstractCompactor;
import org.apache.hudi.client.AbstractHoodieWriteClient;
import org.apache.hudi.common.engine.EngineProperty;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

public abstract class AsyncCompactService
extends HoodieAsyncService {
    private static final long serialVersionUID = 1L;
    private static final Logger LOG = LogManager.getLogger(AsyncCompactService.class);
    public static final String COMPACT_POOL_NAME = "hoodiecompact";
    private final int maxConcurrentCompaction;
    private transient AbstractCompactor compactor;
    protected transient HoodieEngineContext context;
    private transient BlockingQueue<HoodieInstant> pendingCompactions = new LinkedBlockingQueue<HoodieInstant>();
    private transient ReentrantLock queueLock = new ReentrantLock();
    private transient Condition consumed = this.queueLock.newCondition();

    public AsyncCompactService(HoodieEngineContext context, AbstractHoodieWriteClient client) {
        this(context, client, false);
    }

    public AsyncCompactService(HoodieEngineContext context, AbstractHoodieWriteClient client, boolean runInDaemonMode) {
        super(runInDaemonMode);
        this.context = context;
        this.compactor = this.createCompactor(client);
        this.maxConcurrentCompaction = 1;
    }

    protected abstract AbstractCompactor createCompactor(AbstractHoodieWriteClient var1);

    public void enqueuePendingCompaction(HoodieInstant instant) {
        this.pendingCompactions.add(instant);
    }

    public void waitTillPendingCompactionsReducesTo(int numPendingCompactions) throws InterruptedException {
        try {
            this.queueLock.lock();
            while (!this.isShutdown() && this.pendingCompactions.size() > numPendingCompactions) {
                this.consumed.await();
            }
        }
        finally {
            this.queueLock.unlock();
        }
    }

    private HoodieInstant fetchNextCompactionInstant() throws InterruptedException {
        LOG.info((Object)"Compactor waiting for next instant for compaction upto 60 seconds");
        HoodieInstant instant = this.pendingCompactions.poll(10L, TimeUnit.SECONDS);
        if (instant != null) {
            try {
                this.queueLock.lock();
                this.consumed.signal();
            }
            finally {
                this.queueLock.unlock();
            }
        }
        return instant;
    }

    @Override
    protected Pair<CompletableFuture, ExecutorService> startService() {
        ExecutorService executor = Executors.newFixedThreadPool(this.maxConcurrentCompaction, r -> {
            Thread t = new Thread(r, "async_compact_thread");
            t.setDaemon(this.isRunInDaemonMode());
            return t;
        });
        return Pair.of(CompletableFuture.allOf((CompletableFuture[])IntStream.range(0, this.maxConcurrentCompaction).mapToObj(i -> CompletableFuture.supplyAsync(() -> {
            try {
                LOG.info((Object)"Setting pool name for compaction to hoodiecompact");
                this.context.setProperty(EngineProperty.COMPACTION_POOL_NAME, COMPACT_POOL_NAME);
                while (!this.isShutdownRequested()) {
                    HoodieInstant instant = this.fetchNextCompactionInstant();
                    if (null == instant) continue;
                    LOG.info((Object)("Starting Compaction for instant " + instant));
                    this.compactor.compact(instant);
                    LOG.info((Object)("Finished Compaction for instant " + instant));
                }
                LOG.info((Object)"Compactor shutting down properly!!");
            }
            catch (InterruptedException ie) {
                LOG.warn((Object)"Compactor executor thread got interrupted exception. Stopping", (Throwable)ie);
            }
            catch (IOException e) {
                LOG.error((Object)"Compactor executor failed", (Throwable)e);
                throw new HoodieIOException(e.getMessage(), e);
            }
            return true;
        }, executor)).toArray(CompletableFuture[]::new)), executor);
    }

    protected boolean shouldStopCompactor() {
        return false;
    }

    public synchronized void updateWriteClient(AbstractHoodieWriteClient writeClient) {
        this.compactor.updateWriteClient(writeClient);
    }
}

