/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hbase.master.procedure;

import java.io.IOException;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hbase.CallQueueTooBigException;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.ServerListener;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher;
import org.apache.hadoop.hbase.regionserver.RegionServerAbortedException;
import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.common.collect.ArrayListMultimap;
import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceAudience.Private
public class RSProcedureDispatcher
extends RemoteProcedureDispatcher<MasterProcedureEnv, ServerName>
implements ServerListener {
    private static final Logger LOG = LoggerFactory.getLogger(RSProcedureDispatcher.class);
    public static final String RS_RPC_STARTUP_WAIT_TIME_CONF_KEY = "hbase.regionserver.rpc.startup.waittime";
    private static final int DEFAULT_RS_RPC_STARTUP_WAIT_TIME = 60000;
    protected final MasterServices master;
    private final long rsStartupWaitTime;
    private MasterProcedureEnv procedureEnv;

    public RSProcedureDispatcher(MasterServices master) {
        super(master.getConfiguration());
        this.master = master;
        this.rsStartupWaitTime = master.getConfiguration().getLong(RS_RPC_STARTUP_WAIT_TIME_CONF_KEY, 60000L);
    }

    @Override
    protected Thread.UncaughtExceptionHandler getUncaughtExceptionHandler() {
        return new Thread.UncaughtExceptionHandler(){

            @Override
            public void uncaughtException(Thread t, Throwable e) {
                LOG.error("Unexpected error caught, this may cause the procedure to hang forever", e);
            }
        };
    }

    @Override
    public boolean start() {
        if (!super.start()) {
            return false;
        }
        this.master.getServerManager().registerListener(this);
        this.procedureEnv = this.master.getMasterProcedureExecutor().getEnvironment();
        for (ServerName serverName : this.master.getServerManager().getOnlineServersList()) {
            this.addNode(serverName);
        }
        return true;
    }

    @Override
    public boolean stop() {
        if (!super.stop()) {
            return false;
        }
        this.master.getServerManager().unregisterListener(this);
        return true;
    }

    @Override
    protected void remoteDispatch(ServerName serverName, Set<RemoteProcedureDispatcher.RemoteProcedure> remoteProcedures) {
        if (!this.master.getServerManager().isServerOnline(serverName)) {
            this.submitTask(new DeadRSRemoteCall(serverName, remoteProcedures));
        } else {
            this.submitTask(new ExecuteProceduresRemoteCall(serverName, remoteProcedures));
        }
    }

    @Override
    protected void abortPendingOperations(ServerName serverName, Set<RemoteProcedureDispatcher.RemoteProcedure> operations) {
        DoNotRetryIOException e = new DoNotRetryIOException("server not online " + serverName);
        for (RemoteProcedureDispatcher.RemoteProcedure proc : operations) {
            proc.remoteCallFailed(this.procedureEnv, serverName, e);
        }
    }

    @Override
    public void serverAdded(ServerName serverName) {
        this.addNode(serverName);
    }

    @Override
    public void serverRemoved(ServerName serverName) {
        this.removeNode(serverName);
    }

    public void splitAndResolveOperation(ServerName serverName, Set<RemoteProcedureDispatcher.RemoteProcedure> operations, RemoteProcedureResolver resolver) {
        List<ServerOperation> refreshOps;
        List<RegionCloseOperation> closeOps;
        MasterProcedureEnv env = this.master.getMasterProcedureExecutor().getEnvironment();
        ArrayListMultimap<Class<?>, RemoteProcedureDispatcher.RemoteOperation> reqsByType = this.buildAndGroupRequestByType(env, serverName, operations);
        List<RegionOpenOperation> openOps = this.fetchType(reqsByType, RegionOpenOperation.class);
        if (!openOps.isEmpty()) {
            resolver.dispatchOpenRequests(env, openOps);
        }
        if (!(closeOps = this.fetchType(reqsByType, RegionCloseOperation.class)).isEmpty()) {
            resolver.dispatchCloseRequests(env, closeOps);
        }
        if (!(refreshOps = this.fetchType(reqsByType, ServerOperation.class)).isEmpty()) {
            resolver.dispatchServerOperations(env, refreshOps);
        }
        if (!reqsByType.isEmpty()) {
            LOG.warn("unknown request type in the queue: " + reqsByType);
        }
    }

    private static AdminProtos.OpenRegionRequest buildOpenRegionRequest(MasterProcedureEnv env, ServerName serverName, List<RegionOpenOperation> operations) {
        AdminProtos.OpenRegionRequest.Builder builder = AdminProtos.OpenRegionRequest.newBuilder();
        builder.setServerStartCode(serverName.getStartcode());
        builder.setMasterSystemTime(EnvironmentEdgeManager.currentTime());
        for (RegionOpenOperation op : operations) {
            builder.addOpenInfo(op.buildRegionOpenInfoRequest(env));
        }
        return builder.build();
    }

    public static class RegionCloseOperation
    extends RegionOperation {
        private final ServerName destinationServer;

        public RegionCloseOperation(RemoteProcedureDispatcher.RemoteProcedure remoteProcedure, RegionInfo regionInfo, long procId, ServerName destinationServer) {
            super(remoteProcedure, regionInfo, procId);
            this.destinationServer = destinationServer;
        }

        public ServerName getDestinationServer() {
            return this.destinationServer;
        }

        public AdminProtos.CloseRegionRequest buildCloseRegionRequest(ServerName serverName) {
            return ProtobufUtil.buildCloseRegionRequest(serverName, this.regionInfo.getRegionName(), this.getDestinationServer(), this.procId);
        }
    }

    public static class RegionOpenOperation
    extends RegionOperation {
        public RegionOpenOperation(RemoteProcedureDispatcher.RemoteProcedure remoteProcedure, RegionInfo regionInfo, long procId) {
            super(remoteProcedure, regionInfo, procId);
        }

        public AdminProtos.OpenRegionRequest.RegionOpenInfo buildRegionOpenInfoRequest(MasterProcedureEnv env) {
            return RequestConverter.buildRegionOpenInfo(this.regionInfo, env.getAssignmentManager().getFavoredNodes(this.regionInfo), this.procId);
        }
    }

    public static abstract class RegionOperation
    extends RemoteProcedureDispatcher.RemoteOperation {
        protected final RegionInfo regionInfo;
        protected final long procId;

        protected RegionOperation(RemoteProcedureDispatcher.RemoteProcedure remoteProcedure, RegionInfo regionInfo, long procId) {
            super(remoteProcedure);
            this.regionInfo = regionInfo;
            this.procId = procId;
        }
    }

    public static final class ServerOperation
    extends RemoteProcedureDispatcher.RemoteOperation {
        private final long procId;
        private final Class<?> rsProcClass;
        private final byte[] rsProcData;

        public ServerOperation(RemoteProcedureDispatcher.RemoteProcedure remoteProcedure, long procId, Class<?> rsProcClass, byte[] rsProcData) {
            super(remoteProcedure);
            this.procId = procId;
            this.rsProcClass = rsProcClass;
            this.rsProcData = rsProcData;
        }

        public AdminProtos.RemoteProcedureRequest buildRequest() {
            return AdminProtos.RemoteProcedureRequest.newBuilder().setProcId(this.procId).setProcClass(this.rsProcClass.getName()).setProcData(ByteString.copyFrom(this.rsProcData)).build();
        }
    }

    protected class ExecuteProceduresRemoteCall
    implements RemoteProcedureResolver,
    Runnable {
        private final ServerName serverName;
        private final Set<RemoteProcedureDispatcher.RemoteProcedure> remoteProcedures;
        private int numberOfAttemptsSoFar = 0;
        private long maxWaitTime = -1L;
        private AdminProtos.ExecuteProceduresRequest.Builder request = null;

        public ExecuteProceduresRemoteCall(ServerName serverName, Set<RemoteProcedureDispatcher.RemoteProcedure> remoteProcedures) {
            this.serverName = serverName;
            this.remoteProcedures = remoteProcedures;
        }

        private AdminProtos.AdminService.BlockingInterface getRsAdmin() throws IOException {
            AdminProtos.AdminService.BlockingInterface admin = RSProcedureDispatcher.this.master.getServerManager().getRsAdmin(this.serverName);
            if (admin == null) {
                throw new IOException("Attempting to send OPEN RPC to server " + this.getServerName() + " failed because no RPC connection found to this server");
            }
            return admin;
        }

        protected final ServerName getServerName() {
            return this.serverName;
        }

        private boolean scheduleForRetry(IOException e) {
            LOG.debug("request to {} failed, try={}", new Object[]{this.serverName, this.numberOfAttemptsSoFar, e});
            if (e instanceof ServerNotRunningYetException) {
                long remainingTime = this.getMaxWaitTime() - EnvironmentEdgeManager.currentTime();
                if (remainingTime > 0L) {
                    LOG.warn("waiting a little before trying on the same server={}, try={}, can wait up to {}ms", new Object[]{this.serverName, this.numberOfAttemptsSoFar, remainingTime});
                    ++this.numberOfAttemptsSoFar;
                    RSProcedureDispatcher.this.submitTask(this, 100L, TimeUnit.MILLISECONDS);
                    return true;
                }
                LOG.warn("server {} is not up for a while; try a new one", (Object)this.serverName);
                return false;
            }
            if (e instanceof DoNotRetryIOException) {
                LOG.warn("server {} tells us do not retry due to {}, try={}, give up", new Object[]{this.serverName, e.toString(), this.numberOfAttemptsSoFar});
                return false;
            }
            if (e instanceof CallQueueTooBigException && this.numberOfAttemptsSoFar == 0) {
                LOG.warn("request to {} failed due to {}, try={}, this usually because server is overloaded, give up", new Object[]{this.serverName, e.toString(), this.numberOfAttemptsSoFar});
                return false;
            }
            if (!RSProcedureDispatcher.this.master.getServerManager().isServerOnline(this.serverName)) {
                LOG.warn("request to {} failed due to {}, try={}, and the server is dead, give up", new Object[]{this.serverName, e.toString(), this.numberOfAttemptsSoFar});
                return false;
            }
            if (e instanceof RegionServerAbortedException || e instanceof RegionServerStoppedException) {
                LOG.warn("server {} is aborted or stopped, for safety we still need to wait until it is fully dead, try={}", (Object)this.serverName, (Object)this.numberOfAttemptsSoFar);
            } else {
                LOG.warn("request to server {} failed due to {}, try={}, retrying...", new Object[]{this.serverName, e.toString(), this.numberOfAttemptsSoFar});
            }
            ++this.numberOfAttemptsSoFar;
            RSProcedureDispatcher.this.submitTask(this, 100L, TimeUnit.MILLISECONDS);
            return true;
        }

        private long getMaxWaitTime() {
            if (this.maxWaitTime < 0L) {
                this.maxWaitTime = EnvironmentEdgeManager.currentTime() + RSProcedureDispatcher.this.rsStartupWaitTime;
            }
            return this.maxWaitTime;
        }

        private IOException unwrapException(IOException e) {
            if (e instanceof RemoteException) {
                e = ((RemoteException)((Object)e)).unwrapRemoteException();
            }
            return e;
        }

        @Override
        public void run() {
            block3: {
                this.request = AdminProtos.ExecuteProceduresRequest.newBuilder();
                if (LOG.isTraceEnabled()) {
                    LOG.trace("Building request with operations count=" + this.remoteProcedures.size());
                }
                RSProcedureDispatcher.this.splitAndResolveOperation(this.getServerName(), this.remoteProcedures, this);
                try {
                    this.sendRequest(this.getServerName(), this.request.build());
                }
                catch (IOException e) {
                    e = this.unwrapException(e);
                    if (this.scheduleForRetry(e)) break block3;
                    this.remoteCallFailed(RSProcedureDispatcher.this.procedureEnv, e);
                }
            }
        }

        @Override
        public void dispatchOpenRequests(MasterProcedureEnv env, List<RegionOpenOperation> operations) {
            this.request.addOpenRegion(RSProcedureDispatcher.buildOpenRegionRequest(env, this.getServerName(), operations));
        }

        @Override
        public void dispatchCloseRequests(MasterProcedureEnv env, List<RegionCloseOperation> operations) {
            for (RegionCloseOperation op : operations) {
                this.request.addCloseRegion(op.buildCloseRegionRequest(this.getServerName()));
            }
        }

        @Override
        public void dispatchServerOperations(MasterProcedureEnv env, List<ServerOperation> operations) {
            operations.stream().map(o -> o.buildRequest()).forEachOrdered(this.request::addProc);
        }

        @VisibleForTesting
        protected AdminProtos.ExecuteProceduresResponse sendRequest(ServerName serverName, AdminProtos.ExecuteProceduresRequest request) throws IOException {
            try {
                return this.getRsAdmin().executeProcedures(null, request);
            }
            catch (ServiceException se) {
                throw ProtobufUtil.getRemoteException(se);
            }
        }

        protected final void remoteCallFailed(MasterProcedureEnv env, IOException e) {
            for (RemoteProcedureDispatcher.RemoteProcedure proc : this.remoteProcedures) {
                proc.remoteCallFailed(env, this.getServerName(), e);
            }
        }
    }

    private class DeadRSRemoteCall
    extends ExecuteProceduresRemoteCall {
        public DeadRSRemoteCall(ServerName serverName, Set<RemoteProcedureDispatcher.RemoteProcedure> remoteProcedures) {
            super(serverName, remoteProcedures);
        }

        @Override
        public void run() {
            this.remoteCallFailed(RSProcedureDispatcher.this.procedureEnv, new RegionServerStoppedException("Server " + this.getServerName() + " is not online"));
        }
    }

    private static interface RemoteProcedureResolver {
        public void dispatchOpenRequests(MasterProcedureEnv var1, List<RegionOpenOperation> var2);

        public void dispatchCloseRequests(MasterProcedureEnv var1, List<RegionCloseOperation> var2);

        public void dispatchServerOperations(MasterProcedureEnv var1, List<ServerOperation> var2);
    }
}

