/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hive.llap.tez;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.protobuf.Message;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.security.PrivilegedAction;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import javax.net.SocketFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.llap.LlapNodeId;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos;
import org.apache.hadoop.hive.llap.impl.LlapProtocolClientImpl;
import org.apache.hadoop.hive.llap.protocol.LlapProtocolBlockingPB;
import org.apache.hadoop.hive.llap.security.LlapTokenIdentifier;
import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.service.AbstractService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LlapProtocolClientProxy
extends AbstractService {
    private static final Logger LOG = LoggerFactory.getLogger(LlapProtocolClientProxy.class);
    private final ConcurrentMap<String, LlapProtocolBlockingPB> hostProxies = new ConcurrentHashMap<String, LlapProtocolBlockingPB>();
    private final RequestManager requestManager;
    private final RetryPolicy retryPolicy;
    private final SocketFactory socketFactory;
    private final ListeningExecutorService requestManagerExecutor;
    private volatile ListenableFuture<Void> requestManagerFuture;
    private final Token<LlapTokenIdentifier> llapToken;

    public LlapProtocolClientProxy(int numThreads, Configuration conf, Token<LlapTokenIdentifier> llapToken) {
        super(LlapProtocolClientProxy.class.getSimpleName());
        this.socketFactory = NetUtils.getDefaultSocketFactory((Configuration)conf);
        this.llapToken = llapToken;
        long connectionTimeout = HiveConf.getTimeVar((Configuration)conf, (HiveConf.ConfVars)HiveConf.ConfVars.LLAP_TASK_COMMUNICATOR_CONNECTION_TIMEOUT_MS, (TimeUnit)TimeUnit.MILLISECONDS);
        long retrySleep = HiveConf.getTimeVar((Configuration)conf, (HiveConf.ConfVars)HiveConf.ConfVars.LLAP_TASK_COMMUNICATOR_CONNECTION_SLEEP_BETWEEN_RETRIES_MS, (TimeUnit)TimeUnit.MILLISECONDS);
        this.retryPolicy = RetryPolicies.retryUpToMaximumTimeWithFixedSleep((long)connectionTimeout, (long)retrySleep, (TimeUnit)TimeUnit.MILLISECONDS);
        this.requestManager = new RequestManager(numThreads);
        ExecutorService localExecutor = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder().setNameFormat("RequestManagerExecutor").build());
        this.requestManagerExecutor = MoreExecutors.listeningDecorator((ExecutorService)localExecutor);
        LOG.info("Setting up taskCommunicator withnumThreads=" + numThreads + "retryTime(millis)=" + connectionTimeout + "retrySleep(millis)=" + retrySleep);
    }

    public void serviceStart() {
        this.requestManagerFuture = this.requestManagerExecutor.submit((Callable)this.requestManager);
        Futures.addCallback(this.requestManagerFuture, (FutureCallback)new FutureCallback<Void>(){

            public void onSuccess(Void result) {
                LOG.info("RequestManager shutdown");
            }

            public void onFailure(Throwable t) {
                LOG.warn("RequestManager shutdown with error", t);
            }
        });
    }

    public void serviceStop() {
        if (this.requestManagerFuture != null) {
            this.requestManager.shutdown();
            this.requestManagerFuture.cancel(true);
        }
        this.requestManagerExecutor.shutdown();
    }

    public void sendSubmitWork(LlapDaemonProtocolProtos.SubmitWorkRequestProto request, String host, int port, ExecuteRequestCallback<LlapDaemonProtocolProtos.SubmitWorkResponseProto> callback) {
        LlapNodeId nodeId = LlapNodeId.getInstance((String)host, (int)port);
        this.requestManager.queueRequest(new SubmitWorkCallable(nodeId, request, callback));
    }

    public void sendSourceStateUpdate(LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto request, LlapNodeId nodeId, ExecuteRequestCallback<LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto> callback) {
        this.requestManager.queueRequest(new SendSourceStateUpdateCallable(nodeId, request, callback));
    }

    public void sendQueryComplete(LlapDaemonProtocolProtos.QueryCompleteRequestProto request, String host, int port, ExecuteRequestCallback<LlapDaemonProtocolProtos.QueryCompleteResponseProto> callback) {
        LlapNodeId nodeId = LlapNodeId.getInstance((String)host, (int)port);
        this.requestManager.queueRequest(new SendQueryCompleteCallable(nodeId, request, callback));
    }

    public void sendTerminateFragment(LlapDaemonProtocolProtos.TerminateFragmentRequestProto request, String host, int port, ExecuteRequestCallback<LlapDaemonProtocolProtos.TerminateFragmentResponseProto> callback) {
        LlapNodeId nodeId = LlapNodeId.getInstance((String)host, (int)port);
        this.requestManager.queueRequest(new SendTerminateFragmentCallable(nodeId, request, callback));
    }

    private LlapProtocolBlockingPB getProxy(final LlapNodeId nodeId) {
        String hostId = this.getHostIdentifier(nodeId.getHostname(), nodeId.getPort());
        LlapProtocolBlockingPB proxy = (LlapProtocolBlockingPB)this.hostProxies.get(hostId);
        if (proxy == null) {
            if (this.llapToken == null) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Creating a client without a token for " + nodeId);
                }
                proxy = new LlapProtocolClientImpl(this.getConfig(), nodeId.getHostname(), nodeId.getPort(), this.retryPolicy, this.socketFactory);
            } else {
                UserGroupInformation ugi;
                try {
                    ugi = UserGroupInformation.getCurrentUser();
                }
                catch (IOException e) {
                    throw new RuntimeException(e);
                }
                Token nodeToken = new Token(this.llapToken);
                SecurityUtil.setTokenService((Token)nodeToken, (InetSocketAddress)NetUtils.createSocketAddrForHost((String)nodeId.getHostname(), (int)nodeId.getPort()));
                ugi.addToken(nodeToken);
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Creating a client for " + nodeId + "; the token is " + nodeToken);
                }
                proxy = (LlapProtocolBlockingPB)ugi.doAs((PrivilegedAction)new PrivilegedAction<LlapProtocolBlockingPB>(){

                    @Override
                    public LlapProtocolBlockingPB run() {
                        return new LlapProtocolClientImpl(LlapProtocolClientProxy.this.getConfig(), nodeId.getHostname(), nodeId.getPort(), LlapProtocolClientProxy.this.retryPolicy, LlapProtocolClientProxy.this.socketFactory);
                    }
                });
            }
            LlapProtocolBlockingPB proxyOld = this.hostProxies.putIfAbsent(hostId, proxy);
            if (proxyOld != null) {
                proxy = proxyOld;
            }
        }
        return proxy;
    }

    private String getHostIdentifier(String hostname, int port) {
        return hostname + ":" + port;
    }

    public static interface ExecuteRequestCallback<T extends Message> {
        public void setResponse(T var1);

        public void indicateError(Throwable var1);
    }

    private class SendTerminateFragmentCallable
    extends CallableRequest<LlapDaemonProtocolProtos.TerminateFragmentRequestProto, LlapDaemonProtocolProtos.TerminateFragmentResponseProto> {
        protected SendTerminateFragmentCallable(LlapNodeId nodeId, LlapDaemonProtocolProtos.TerminateFragmentRequestProto terminateFragmentRequestProto, ExecuteRequestCallback<LlapDaemonProtocolProtos.TerminateFragmentResponseProto> callback) {
            super(nodeId, terminateFragmentRequestProto, callback);
        }

        @Override
        public LlapDaemonProtocolProtos.TerminateFragmentResponseProto call() throws Exception {
            return LlapProtocolClientProxy.this.getProxy(this.nodeId).terminateFragment(null, (LlapDaemonProtocolProtos.TerminateFragmentRequestProto)this.request);
        }
    }

    private class SendQueryCompleteCallable
    extends CallableRequest<LlapDaemonProtocolProtos.QueryCompleteRequestProto, LlapDaemonProtocolProtos.QueryCompleteResponseProto> {
        protected SendQueryCompleteCallable(LlapNodeId nodeId, LlapDaemonProtocolProtos.QueryCompleteRequestProto queryCompleteRequestProto, ExecuteRequestCallback<LlapDaemonProtocolProtos.QueryCompleteResponseProto> callback) {
            super(nodeId, queryCompleteRequestProto, callback);
        }

        @Override
        public LlapDaemonProtocolProtos.QueryCompleteResponseProto call() throws Exception {
            return LlapProtocolClientProxy.this.getProxy(this.nodeId).queryComplete(null, (LlapDaemonProtocolProtos.QueryCompleteRequestProto)this.request);
        }
    }

    private class SendSourceStateUpdateCallable
    extends CallableRequest<LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto, LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto> {
        public SendSourceStateUpdateCallable(LlapNodeId nodeId, LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto request, ExecuteRequestCallback<LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto> callback) {
            super(nodeId, request, callback);
        }

        @Override
        public LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto call() throws Exception {
            return LlapProtocolClientProxy.this.getProxy(this.nodeId).sourceStateUpdated(null, (LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto)this.request);
        }
    }

    private class SubmitWorkCallable
    extends CallableRequest<LlapDaemonProtocolProtos.SubmitWorkRequestProto, LlapDaemonProtocolProtos.SubmitWorkResponseProto> {
        protected SubmitWorkCallable(LlapNodeId nodeId, LlapDaemonProtocolProtos.SubmitWorkRequestProto submitWorkRequestProto, ExecuteRequestCallback<LlapDaemonProtocolProtos.SubmitWorkResponseProto> callback) {
            super(nodeId, submitWorkRequestProto, callback);
        }

        @Override
        public LlapDaemonProtocolProtos.SubmitWorkResponseProto call() throws Exception {
            return LlapProtocolClientProxy.this.getProxy(this.nodeId).submitWork(null, (LlapDaemonProtocolProtos.SubmitWorkRequestProto)this.request);
        }
    }

    @VisibleForTesting
    static abstract class CallableRequest<REQUEST extends Message, RESPONSE extends Message>
    implements Callable {
        final LlapNodeId nodeId;
        final ExecuteRequestCallback<RESPONSE> callback;
        final REQUEST request;

        protected CallableRequest(LlapNodeId nodeId, REQUEST request, ExecuteRequestCallback<RESPONSE> callback) {
            this.nodeId = nodeId;
            this.request = request;
            this.callback = callback;
        }

        public LlapNodeId getNodeId() {
            return this.nodeId;
        }

        public ExecuteRequestCallback<RESPONSE> getCallback() {
            return this.callback;
        }

        public abstract RESPONSE call() throws Exception;
    }

    private static final class ResponseCallback<TYPE extends Message>
    implements FutureCallback<TYPE> {
        private final ExecuteRequestCallback<TYPE> callback;
        private final LlapNodeId nodeId;
        private final RequestManager requestManager;

        public ResponseCallback(ExecuteRequestCallback<TYPE> callback, LlapNodeId nodeId, RequestManager requestManager) {
            this.callback = callback;
            this.nodeId = nodeId;
            this.requestManager = requestManager;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void onSuccess(TYPE result) {
            try {
                this.callback.setResponse(result);
            }
            finally {
                this.requestManager.requestFinished(this.nodeId);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void onFailure(Throwable t) {
            try {
                this.callback.indicateError(t);
            }
            finally {
                this.requestManager.requestFinished(this.nodeId);
            }
        }
    }

    @VisibleForTesting
    static class RequestManager
    implements Callable<Void> {
        private final Lock lock = new ReentrantLock();
        private final AtomicBoolean isShutdown = new AtomicBoolean(false);
        private final Condition queueCondition = this.lock.newCondition();
        private final AtomicBoolean shouldRun = new AtomicBoolean(false);
        private final int maxConcurrentRequestsPerNode = 1;
        private final ListeningExecutorService executor;
        private final LinkedList<CallableRequest> newRequestList = new LinkedList();
        private final LinkedList<CallableRequest> pendingRequests = new LinkedList();
        private final ConcurrentMap<LlapNodeId, AtomicInteger> runningRequests = new ConcurrentHashMap<LlapNodeId, AtomicInteger>();
        private final LinkedList<LlapNodeId> completedNodes = new LinkedList();
        @VisibleForTesting
        Set<LlapNodeId> currentLoopDisabledNodes = new HashSet<LlapNodeId>();
        @VisibleForTesting
        List<CallableRequest> currentLoopSkippedRequests = new LinkedList<CallableRequest>();

        public RequestManager(int numThreads) {
            ExecutorService localExecutor = Executors.newFixedThreadPool(numThreads, new ThreadFactoryBuilder().setNameFormat("TaskCommunicator #%2d").build());
            this.executor = MoreExecutors.listeningDecorator((ExecutorService)localExecutor);
        }

        @Override
        public Void call() {
            while (!this.isShutdown.get()) {
                this.lock.lock();
                try {
                    boolean shouldBreak;
                    if (!this.shouldRun.get()) {
                        this.queueCondition.await();
                    }
                    if (!(shouldBreak = this.process())) continue;
                    break;
                }
                catch (InterruptedException e) {
                    if (this.isShutdown.get()) break;
                    LOG.warn("RunLoop interrupted without being shutdown first");
                    throw new RuntimeException(e);
                }
                finally {
                    this.lock.unlock();
                }
            }
            LOG.info("CallScheduler loop exiting");
            return null;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void queueRequest(CallableRequest request) {
            LinkedList<CallableRequest> linkedList = this.newRequestList;
            synchronized (linkedList) {
                this.newRequestList.add(request);
                this.shouldRun.set(true);
            }
            this.notifyRunLoop();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void requestFinished(LlapNodeId nodeId) {
            LinkedList<LlapNodeId> linkedList = this.completedNodes;
            synchronized (linkedList) {
                this.completedNodes.add(nodeId);
                this.shouldRun.set(true);
            }
            this.notifyRunLoop();
        }

        public void shutdown() {
            if (!this.isShutdown.getAndSet(true)) {
                this.executor.shutdownNow();
                this.notifyRunLoop();
            }
        }

        @VisibleForTesting
        void submitToExecutor(CallableRequest request, LlapNodeId nodeId) {
            ListenableFuture future = this.executor.submit((Callable)request);
            Futures.addCallback((ListenableFuture)future, new ResponseCallback(request.getCallback(), nodeId, this));
        }

        @VisibleForTesting
        boolean process() {
            if (this.isShutdown.get()) {
                return true;
            }
            this.currentLoopDisabledNodes.clear();
            this.currentLoopSkippedRequests.clear();
            this.shouldRun.compareAndSet(true, false);
            this.drainNewRequestList();
            this.drainCompletedNodes();
            Iterator iterator = this.pendingRequests.iterator();
            while (iterator.hasNext()) {
                CallableRequest request = (CallableRequest)iterator.next();
                iterator.remove();
                LlapNodeId nodeId = request.getNodeId();
                if (this.canRunForNode(nodeId, this.currentLoopDisabledNodes)) {
                    this.submitToExecutor(request, nodeId);
                    continue;
                }
                this.currentLoopDisabledNodes.add(nodeId);
                this.currentLoopSkippedRequests.add(request);
            }
            this.pendingRequests.addAll(0, this.currentLoopSkippedRequests);
            return false;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void drainNewRequestList() {
            LinkedList<CallableRequest> linkedList = this.newRequestList;
            synchronized (linkedList) {
                if (!this.newRequestList.isEmpty()) {
                    this.pendingRequests.addAll(this.newRequestList);
                    this.newRequestList.clear();
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void drainCompletedNodes() {
            LinkedList<LlapNodeId> linkedList = this.completedNodes;
            synchronized (linkedList) {
                if (!this.completedNodes.isEmpty()) {
                    for (LlapNodeId nodeId : this.completedNodes) {
                        ((AtomicInteger)this.runningRequests.get(nodeId)).decrementAndGet();
                    }
                }
                this.completedNodes.clear();
            }
        }

        private boolean canRunForNode(LlapNodeId nodeId, Set<LlapNodeId> currentRunDisabledNodes) {
            if (currentRunDisabledNodes.contains(nodeId)) {
                return false;
            }
            AtomicInteger count = (AtomicInteger)this.runningRequests.get(nodeId);
            if (count == null) {
                count = new AtomicInteger(0);
                AtomicInteger old = this.runningRequests.putIfAbsent(nodeId, count);
                AtomicInteger atomicInteger = count = old != null ? old : count;
            }
            if (count.incrementAndGet() <= 1) {
                return true;
            }
            count.decrementAndGet();
            return false;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void notifyRunLoop() {
            this.lock.lock();
            try {
                this.queueCondition.signal();
            }
            finally {
                this.lock.unlock();
            }
        }
    }
}

