/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.org.apache.hadoop.hbase.procedure;

import java.io.IOException;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hudi.com.google.common.collect.MapMaker;
import org.apache.hudi.org.apache.hadoop.hbase.DaemonThreadFactory;
import org.apache.hudi.org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hudi.org.apache.hadoop.hbase.errorhandling.ForeignException;
import org.apache.hudi.org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
import org.apache.hudi.org.apache.hadoop.hbase.procedure.Procedure;
import org.apache.hudi.org.apache.hadoop.hbase.procedure.ProcedureCoordinatorRpcs;

@InterfaceAudience.Private
public class ProcedureCoordinator {
    private static final Log LOG = LogFactory.getLog(ProcedureCoordinator.class);
    static final long KEEP_ALIVE_MILLIS_DEFAULT = 5000L;
    static final long TIMEOUT_MILLIS_DEFAULT = 60000L;
    static final long WAKE_MILLIS_DEFAULT = 500L;
    private final ProcedureCoordinatorRpcs rpcs;
    private final ExecutorService pool;
    private final long wakeTimeMillis;
    private final long timeoutMillis;
    private final ConcurrentMap<String, Procedure> procedures = new MapMaker().concurrencyLevel(4).weakValues().makeMap();

    public ProcedureCoordinator(ProcedureCoordinatorRpcs rpcs, ThreadPoolExecutor pool) {
        this(rpcs, pool, 60000L, 500L);
    }

    public ProcedureCoordinator(ProcedureCoordinatorRpcs rpcs, ThreadPoolExecutor pool, long timeoutMillis, long wakeTimeMillis) {
        this.timeoutMillis = timeoutMillis;
        this.wakeTimeMillis = wakeTimeMillis;
        this.rpcs = rpcs;
        this.pool = pool;
        this.rpcs.start(this);
    }

    public static ThreadPoolExecutor defaultPool(String coordName, int opThreads) {
        return ProcedureCoordinator.defaultPool(coordName, opThreads, 5000L);
    }

    public static ThreadPoolExecutor defaultPool(String coordName, int opThreads, long keepAliveMillis) {
        return new ThreadPoolExecutor(1, opThreads, keepAliveMillis, TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>(), new DaemonThreadFactory("(" + coordName + ")-proc-coordinator-pool"));
    }

    public void close() throws IOException {
        this.pool.shutdownNow();
        this.rpcs.close();
    }

    boolean submitProcedure(Procedure proc) {
        String procName;
        block9: {
            if (proc == null) {
                return false;
            }
            procName = proc.getName();
            Procedure oldProc = (Procedure)this.procedures.get(procName);
            if (oldProc != null) {
                try {
                    if (!oldProc.isCompleted()) {
                        LOG.warn((Object)("Procedure " + procName + " currently running.  Rejecting new request"));
                        return false;
                    }
                    LOG.debug((Object)("Procedure " + procName + " was in running list but was completed.  Accepting new attempt."));
                    if (!this.procedures.remove(procName, oldProc)) {
                        LOG.warn((Object)("Procedure " + procName + " has been resubmitted by another thread. Rejecting this request."));
                        return false;
                    }
                }
                catch (ForeignException e) {
                    LOG.debug((Object)("Procedure " + procName + " was in running list but has exception.  Accepting new attempt."));
                    if (this.procedures.remove(procName, oldProc)) break block9;
                    LOG.warn((Object)("Procedure " + procName + " has been resubmitted by another thread. Rejecting this request."));
                    return false;
                }
            }
        }
        try {
            if (this.procedures.putIfAbsent(procName, proc) == null) {
                LOG.debug((Object)("Submitting procedure " + procName));
                this.pool.submit(proc);
                return true;
            }
            LOG.error((Object)("Another thread has submitted procedure '" + procName + "'. Ignoring this attempt."));
            return false;
        }
        catch (RejectedExecutionException e) {
            LOG.warn((Object)("Procedure " + procName + " rejected by execution pool.  Propagating error."), (Throwable)e);
            this.procedures.remove(procName, proc);
            proc.receive(new ForeignException(procName, e));
            return false;
        }
    }

    void rpcConnectionFailure(String message, IOException cause) {
        Collection toNotify = this.procedures.values();
        boolean isTraceEnabled = LOG.isTraceEnabled();
        LOG.debug((Object)("received connection failure: " + message), (Throwable)cause);
        for (Procedure proc : toNotify) {
            if (proc == null) continue;
            if (isTraceEnabled) {
                LOG.trace((Object)("connection failure - notify procedure: " + proc.getName()));
            }
            proc.receive(new ForeignException(proc.getName(), cause));
        }
    }

    public void abortProcedure(String procName, ForeignException reason) {
        LOG.debug((Object)("abort procedure " + procName), (Throwable)reason);
        Procedure proc = (Procedure)this.procedures.get(procName);
        if (proc == null) {
            return;
        }
        proc.receive(reason);
    }

    Procedure createProcedure(ForeignExceptionDispatcher fed, String procName, byte[] procArgs, List<String> expectedMembers) {
        return new Procedure(this, fed, this.wakeTimeMillis, this.timeoutMillis, procName, procArgs, expectedMembers);
    }

    public Procedure startProcedure(ForeignExceptionDispatcher fed, String procName, byte[] procArgs, List<String> expectedMembers) {
        Procedure proc = this.createProcedure(fed, procName, procArgs, expectedMembers);
        if (!this.submitProcedure(proc)) {
            LOG.error((Object)("Failed to submit procedure '" + procName + "'"));
            return null;
        }
        return proc;
    }

    void memberAcquiredBarrier(String procName, String member) {
        Procedure proc = (Procedure)this.procedures.get(procName);
        if (proc == null) {
            LOG.warn((Object)("Member '" + member + "' is trying to acquire an unknown procedure '" + procName + "'"));
            return;
        }
        if (LOG.isTraceEnabled()) {
            LOG.trace((Object)("Member '" + member + "' acquired procedure '" + procName + "'"));
        }
        proc.barrierAcquiredByMember(member);
    }

    void memberFinishedBarrier(String procName, String member, byte[] dataFromMember) {
        Procedure proc = (Procedure)this.procedures.get(procName);
        if (proc == null) {
            LOG.warn((Object)("Member '" + member + "' is trying to release an unknown procedure '" + procName + "'"));
            return;
        }
        if (LOG.isTraceEnabled()) {
            LOG.trace((Object)("Member '" + member + "' released procedure '" + procName + "'"));
        }
        proc.barrierReleasedByMember(member, dataFromMember);
    }

    ProcedureCoordinatorRpcs getRpcs() {
        return this.rpcs;
    }

    public Procedure getProcedure(String name) {
        return (Procedure)this.procedures.get(name);
    }

    public Set<String> getProcedureNames() {
        return new HashSet<String>(this.procedures.keySet());
    }
}

