/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hive.llap.tezplugins;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.BiMap;
import com.google.common.collect.HashBiMap;
import com.google.protobuf.ByteString;
import com.google.protobuf.ServiceException;
import java.io.IOException;
import java.net.BindException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.common.IPStackUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.Validator;
import org.apache.hadoop.hive.llap.AsyncPbRpcProxy;
import org.apache.hadoop.hive.llap.LlapNodeId;
import org.apache.hadoop.hive.llap.LlapUtil;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos;
import org.apache.hadoop.hive.llap.protocol.LlapTaskUmbilicalProtocol;
import org.apache.hadoop.hive.llap.registry.LlapServiceInstance;
import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService;
import org.apache.hadoop.hive.llap.security.LlapTokenIdentifier;
import org.apache.hadoop.hive.llap.tez.Converters;
import org.apache.hadoop.hive.llap.tez.LlapProtocolClientProxy;
import org.apache.hadoop.hive.llap.tezplugins.ContainerFactory;
import org.apache.hadoop.hive.llap.tezplugins.LlapTaskSchedulerService;
import org.apache.hadoop.hive.llap.tezplugins.LlapTezUtils;
import org.apache.hadoop.hive.llap.tezplugins.LlapUmbilicalPolicyProvider;
import org.apache.hadoop.hive.llap.tezplugins.helpers.SourceStateTracker;
import org.apache.hadoop.io.BooleanWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.ProtocolSignature;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.ipc.VersionedProtocol;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.PolicyProvider;
import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
import org.apache.tez.common.TezTaskUmbilicalProtocol;
import org.apache.tez.common.TezUtils;
import org.apache.tez.common.security.JobTokenSecretManager;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.api.event.VertexStateUpdate;
import org.apache.tez.dag.app.TezTaskCommunicatorImpl;
import org.apache.tez.dag.app.dag.DAG;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.runtime.api.TaskFailureType;
import org.apache.tez.runtime.api.impl.TaskSpec;
import org.apache.tez.runtime.api.impl.TezHeartbeatRequest;
import org.apache.tez.runtime.api.impl.TezHeartbeatResponse;
import org.apache.tez.serviceplugins.api.ContainerEndReason;
import org.apache.tez.serviceplugins.api.DagInfo;
import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
import org.apache.tez.serviceplugins.api.TaskCommunicatorContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LlapTaskCommunicator
extends TezTaskCommunicatorImpl {
    private static final Logger LOG = LoggerFactory.getLogger(LlapTaskCommunicator.class);
    private static final Joiner JOINER = Joiner.on((String)"");
    private static final Joiner PATH_JOINER = Joiner.on((String)"/");
    private final ConcurrentMap<LlapDaemonProtocolProtos.QueryIdentifierProto, ByteBuffer> credentialMap;
    private final EntityTracker entityTracker = new EntityTracker();
    private final SourceStateTracker sourceStateTracker;
    private final Set<LlapNodeId> nodesForQuery = new HashSet<LlapNodeId>();
    private LlapTaskSchedulerService scheduler;
    private LlapProtocolClientProxy communicator;
    private long deleteDelayOnDagComplete;
    private final LlapTaskUmbilicalProtocol umbilical;
    private final String user;
    private String amHost;
    private String timelineServerUri;
    private final ConcurrentMap<LlapNodeId, Long> knownNodeMap = new ConcurrentHashMap<LlapNodeId, Long>();
    private final ConcurrentMap<LlapNodeId, PingingNodeInfo> pingedNodeMap = new ConcurrentHashMap<LlapNodeId, PingingNodeInfo>();
    private final LlapRegistryService serviceRegistry;
    private volatile LlapDaemonProtocolProtos.QueryIdentifierProto currentQueryIdentifierProto;
    private volatile String currentHiveQueryId;
    static final Object pluginInitLock = new Object();
    static LlapTaskCommunicator instance = null;
    private static final String LLAP_TOKEN_NAME = LlapTokenIdentifier.KIND_NAME.toString();
    private final AtomicLong nodeNotFoundLogTime = new AtomicLong(0L);

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public LlapTaskCommunicator(TaskCommunicatorContext taskCommunicatorContext) {
        super(taskCommunicatorContext);
        this.serviceRegistry = LlapRegistryService.getClient((Configuration)this.conf);
        this.umbilical = new LlapTaskUmbilicalProtocolImpl(this.getUmbilical());
        this.user = System.getenv(ApplicationConstants.Environment.USER.name());
        this.credentialMap = new ConcurrentHashMap<LlapDaemonProtocolProtos.QueryIdentifierProto, ByteBuffer>();
        this.sourceStateTracker = new SourceStateTracker(this.getContext(), this);
        Object object = pluginInitLock;
        synchronized (object) {
            LlapTaskSchedulerService peer = LlapTaskSchedulerService.instance;
            if (peer != null) {
                peer.setTaskCommunicator(this);
                this.setScheduler(peer);
                LlapTaskSchedulerService.instance = null;
            } else {
                instance = this;
            }
        }
    }

    private Token<LlapTokenIdentifier> getLlapToken() {
        Token token = null;
        Credentials credentials = this.getContext().getAMCredentials();
        if (credentials != null) {
            token = credentials.getToken(LlapTokenIdentifier.KIND_NAME);
        }
        Preconditions.checkState((token != null == UserGroupInformation.isSecurityEnabled() ? 1 : 0) != 0);
        if (token != null) {
            LOG.info("Task communicator with a token {}", (Object)token);
        }
        return token;
    }

    void setScheduler(LlapTaskSchedulerService peer) {
        this.scheduler = peer;
    }

    private void processSendError(Throwable t) {
        for (Throwable cause = t; cause != null; cause = cause.getCause()) {
            if (!this.isInvalidTokenError(cause)) continue;
            this.handleInvalidToken();
            return;
        }
    }

    private boolean isInvalidTokenError(Throwable cause) {
        LOG.debug("Checking for invalid token error, cause: {}, cause.getCause(): {}", (Object)cause, (Object)cause.getCause());
        return cause instanceof SecretManager.InvalidToken && cause.getMessage() != null || cause instanceof RemoteException && cause.getCause() == null && cause.getMessage() != null && cause.getMessage().contains(LLAP_TOKEN_NAME) && (cause.getMessage().contains("InvalidToken") || cause.getMessage().contains("can't be found in cache"));
    }

    public void initialize() throws Exception {
        super.initialize();
        Configuration conf = this.getConf();
        int numThreads = HiveConf.getIntVar((Configuration)conf, (HiveConf.ConfVars)HiveConf.ConfVars.LLAP_DAEMON_COMMUNICATOR_NUM_THREADS);
        this.communicator = this.createLlapProtocolClientProxy(numThreads, conf);
        this.deleteDelayOnDagComplete = HiveConf.getTimeVar((Configuration)conf, (HiveConf.ConfVars)HiveConf.ConfVars.LLAP_FILE_CLEANUP_DELAY_SECONDS, (TimeUnit)TimeUnit.SECONDS);
        LOG.info("Running LlapTaskCommunicator with fileCleanupDelay=" + this.deleteDelayOnDagComplete + ", numCommunicatorThreads=" + numThreads);
        this.communicator.init(conf);
        String scheme = WebAppUtils.getHttpSchemePrefix((Configuration)conf);
        String ahsUrl = WebAppUtils.getAHSWebAppURLWithoutScheme((Configuration)conf);
        this.timelineServerUri = WebAppUtils.getURLWithScheme((String)scheme, (String)ahsUrl);
    }

    public void start() {
        super.start();
        this.communicator.start();
    }

    public void shutdown() {
        super.shutdown();
        if (this.communicator != null) {
            this.communicator.stop();
        }
    }

    protected void startRpcServer() {
        Configuration conf = this.getConf();
        try {
            JobTokenSecretManager jobTokenSecretManager = new JobTokenSecretManager(conf);
            jobTokenSecretManager.addTokenForJob(this.tokenIdentifier, this.sessionToken);
            int numHandlers = HiveConf.getIntVar((Configuration)conf, (HiveConf.ConfVars)HiveConf.ConfVars.LLAP_TASK_COMMUNICATOR_LISTENER_THREAD_COUNT);
            String[] portRange = conf.get(HiveConf.ConfVars.LLAP_TASK_UMBILICAL_SERVER_PORT.varname).split("-");
            boolean isHadoopSecurityAuthorizationEnabled = conf.getBoolean("hadoop.security.authorization", false);
            boolean portFound = false;
            BindException ioe = null;
            int minPort = Integer.parseInt(portRange[0]);
            if (portRange.length == 1) {
                this.startServerInternal(conf, minPort, numHandlers, jobTokenSecretManager, isHadoopSecurityAuthorizationEnabled);
                portFound = true;
                LOG.info("Successfully bound to port {}", (Object)minPort);
            } else {
                int maxPort = Integer.parseInt(portRange[1]);
                this.validatePortRange(portRange[0], portRange[1]);
                for (int i = minPort; i < maxPort; ++i) {
                    try {
                        this.startServerInternal(conf, i, numHandlers, jobTokenSecretManager, isHadoopSecurityAuthorizationEnabled);
                        portFound = true;
                        LOG.info("Successfully bound to port {}", (Object)i);
                        break;
                    }
                    catch (BindException be) {
                        LOG.warn("Unable to bind to port {}", (Object)i, (Object)be);
                        ioe = be;
                        continue;
                    }
                }
            }
            if (!portFound) {
                throw ioe;
            }
            this.address = NetUtils.getConnectAddress((Server)this.server);
            this.amHost = LlapUtil.getAmHostNameFromAddress((InetSocketAddress)this.address, (Configuration)conf);
            LOG.info("Started LlapUmbilical: " + this.umbilical.getClass().getName() + " at address: " + this.address + " with numHandlers=" + numHandlers + " using the host name " + this.amHost);
        }
        catch (IOException e) {
            throw new TezUncheckedException((Throwable)e);
        }
    }

    private void validatePortRange(String minPort, String maxPort) throws IOException {
        String valMax;
        Validator.RangeValidator rangeValidator = new Validator.RangeValidator((Object)1024L, (Object)65535L);
        String valMin = rangeValidator.validate(minPort);
        if (valMin == null & (valMax = rangeValidator.validate(maxPort)) == null) {
            throw new IOException("Invalid minimum range value: " + minPort + " and maximum range value: " + maxPort + " for " + HiveConf.ConfVars.LLAP_TASK_UMBILICAL_SERVER_PORT.varname + ". The value should be between 1024 and 65535.");
        }
        if (valMin != null) {
            throw new IOException("Invalid minimum range value :" + minPort + " for " + HiveConf.ConfVars.LLAP_TASK_UMBILICAL_SERVER_PORT.varname + ". The value should be between 1024 and 65535.");
        }
        if (valMax != null) {
            throw new IOException("Invalid maximum range value:" + maxPort + " for " + HiveConf.ConfVars.LLAP_TASK_UMBILICAL_SERVER_PORT.varname + ". The value should be between 1024 and 65535.");
        }
    }

    private void startServerInternal(Configuration conf, int umbilicalPort, int numHandlers, JobTokenSecretManager jobTokenSecretManager, boolean isHadoopSecurityAuthorizationEnabled) throws IOException {
        this.server = new RPC.Builder(conf).setProtocol(LlapTaskUmbilicalProtocol.class).setBindAddress(IPStackUtils.resolveWildcardAddress()).setPort(umbilicalPort).setInstance((Object)this.umbilical).setNumHandlers(numHandlers).setSecretManager((SecretManager)jobTokenSecretManager).build();
        if (isHadoopSecurityAuthorizationEnabled) {
            this.server.refreshServiceAcl(conf, (PolicyProvider)new LlapUmbilicalPolicyProvider());
        }
        this.server.start();
    }

    @VisibleForTesting
    protected LlapProtocolClientProxy createLlapProtocolClientProxy(int numThreads, Configuration conf) {
        return new LlapProtocolClientProxy(numThreads, conf, this.getLlapToken());
    }

    public void registerRunningContainer(ContainerId containerId, String hostname, int port) {
        super.registerRunningContainer(containerId, hostname, port);
        this.entityTracker.registerContainer(containerId, hostname, port);
    }

    public void registerContainerEnd(ContainerId containerId, ContainerEndReason endReason, String diagnostics) {
        super.registerContainerEnd(containerId, endReason, diagnostics);
        if (endReason == ContainerEndReason.INTERNAL_PREEMPTION) {
            LOG.info("Processing containerEnd for container {} caused by internal preemption", (Object)containerId);
            TezTaskAttemptID taskAttemptId = this.entityTracker.getTaskAttemptIdForContainer(containerId);
            if (taskAttemptId != null) {
                this.sendTaskTerminated(taskAttemptId, true);
            }
        }
        this.entityTracker.unregisterContainer(containerId);
    }

    public boolean registerDag(final LlapTaskSchedulerService.NodeInfo node, final OperationCallback<LlapDaemonProtocolProtos.QueryIdentifierProto, Void> callback) {
        LlapDaemonProtocolProtos.RegisterDagRequestProto.Builder builder = LlapDaemonProtocolProtos.RegisterDagRequestProto.newBuilder();
        if (this.currentQueryIdentifierProto == null) {
            return false;
        }
        try {
            LlapDaemonProtocolProtos.RegisterDagRequestProto request = builder.setQueryIdentifier(this.currentQueryIdentifierProto).setUser(this.user).setCredentialsBinary(this.getCredentials(this.getContext().getCurrentDagInfo().getCredentials())).build();
            this.communicator.registerDag(request, node.getHost(), node.getRpcPort(), (AsyncPbRpcProxy.ExecuteRequestCallback)new AsyncPbRpcProxy.ExecuteRequestCallback<LlapDaemonProtocolProtos.RegisterDagResponseProto>(){

                public void setResponse(LlapDaemonProtocolProtos.RegisterDagResponseProto response) {
                    callback.setDone(null, LlapTaskCommunicator.this.currentQueryIdentifierProto);
                }

                public void indicateError(Throwable t) {
                    LOG.info("Error registering dag with appId=" + LlapTaskCommunicator.this.currentQueryIdentifierProto.getApplicationIdString() + " dagId=" + LlapTaskCommunicator.this.currentQueryIdentifierProto.getDagIndex() + " to node " + node.getHost());
                    LlapTaskCommunicator.this.processSendError(t);
                    callback.setError(null, t);
                }
            });
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
        return true;
    }

    public <T> void startUpdateGuaranteed(final TezTaskAttemptID attemptId, LlapTaskSchedulerService.NodeInfo assignedNode, boolean newState, final OperationCallback<Boolean, T> callback, final T ctx) {
        LlapNodeId nodeId = this.entityTracker.getNodeIdForTaskAttempt(attemptId);
        if (nodeId == null) {
            if (assignedNode != null) {
                nodeId = LlapNodeId.getInstance((String)assignedNode.getHost(), (int)assignedNode.getRpcPort());
            }
            LOG.warn("Untracked node for " + attemptId + "; NodeInfo points to " + nodeId);
            if (nodeId == null) {
                callback.setDone(ctx, false);
                return;
            }
        }
        LlapDaemonProtocolProtos.UpdateFragmentRequestProto request = LlapDaemonProtocolProtos.UpdateFragmentRequestProto.newBuilder().setIsGuaranteed(newState).setFragmentIdentifierString(attemptId.toString()).setQueryIdentifier(this.constructQueryIdentifierProto(attemptId.getDAGID().getId())).build();
        this.communicator.sendUpdateFragment(request, nodeId.getHostname(), nodeId.getPort(), (AsyncPbRpcProxy.ExecuteRequestCallback)new AsyncPbRpcProxy.ExecuteRequestCallback<LlapDaemonProtocolProtos.UpdateFragmentResponseProto>(){

            public void setResponse(LlapDaemonProtocolProtos.UpdateFragmentResponseProto response) {
                callback.setDone(ctx, response.getResult());
            }

            public void indicateError(Throwable t) {
                LOG.warn("Failed to send update fragment request for {}", (Object)attemptId.toString());
                LlapTaskCommunicator.this.processSendError(t);
                callback.setError(ctx, t);
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void registerRunningTaskAttempt(final ContainerId containerId, final TaskSpec taskSpec, Map<String, LocalResource> additionalResources, Credentials credentials, boolean credentialsChanged, int priority) {
        LlapDaemonProtocolProtos.SubmitWorkRequestProto requestProto;
        LlapDaemonProtocolProtos.FragmentRuntimeInfo fragmentRuntimeInfo;
        int port;
        String host;
        TezTaskCommunicatorImpl.ContainerInfo containerInfo;
        super.registerRunningTaskAttempt(containerId, taskSpec, additionalResources, credentials, credentialsChanged, priority);
        int dagId = taskSpec.getDAGID().getId();
        if (this.currentQueryIdentifierProto == null || dagId != this.currentQueryIdentifierProto.getDagIndex()) {
            String hiveQueryId = this.extractQueryIdFromContext();
            try {
                hiveQueryId = hiveQueryId == null ? this.extractQueryId(taskSpec) : hiveQueryId;
            }
            catch (IOException e) {
                throw new RuntimeException("Failed to extract query id from task spec: " + taskSpec, e);
            }
            Preconditions.checkNotNull((Object)hiveQueryId, (Object)"Unexpected null query id");
            this.resetCurrentDag(dagId, hiveQueryId);
        }
        if ((containerInfo = this.getContainerInfo(containerId)) != null) {
            TezTaskCommunicatorImpl.ContainerInfo containerInfo2 = containerInfo;
            synchronized (containerInfo2) {
                host = containerInfo.host;
                port = containerInfo.port;
            }
        } else {
            throw new RuntimeException("ContainerInfo not found for container: " + containerId + ", while trying to launch task: " + taskSpec.getTaskAttemptID());
        }
        LlapNodeId nodeId = LlapNodeId.getInstance((String)host, (int)port);
        this.registerKnownNode(nodeId);
        this.entityTracker.registerTaskAttempt(containerId, taskSpec.getTaskAttemptID(), host, port);
        this.nodesForQuery.add(nodeId);
        this.sourceStateTracker.registerTaskForStateUpdates(host, port, taskSpec.getInputs());
        try {
            fragmentRuntimeInfo = this.sourceStateTracker.getFragmentRuntimeInfo(taskSpec.getVertexName(), taskSpec.getTaskAttemptID().getTaskID().getId(), priority);
        }
        catch (Exception e) {
            LOG.error("Error while trying to get runtimeFragmentInfo for fragmentId={}, containerId={}, currentQI={}, currentQueryId={}", new Object[]{taskSpec.getTaskAttemptID(), containerId, this.currentQueryIdentifierProto, this.currentHiveQueryId, e});
            if (e instanceof RuntimeException) {
                throw (RuntimeException)e;
            }
            throw new RuntimeException(e);
        }
        try {
            requestProto = this.constructSubmitWorkRequest(containerId, taskSpec, fragmentRuntimeInfo, this.currentHiveQueryId);
        }
        catch (IOException e) {
            throw new RuntimeException("Failed to construct request", e);
        }
        this.getContext().taskStartedRemotely(taskSpec.getTaskAttemptID(), containerId);
        this.communicator.sendSubmitWork(requestProto, host, port, (AsyncPbRpcProxy.ExecuteRequestCallback)new AsyncPbRpcProxy.ExecuteRequestCallback<LlapDaemonProtocolProtos.SubmitWorkResponseProto>(){

            public void setResponse(LlapDaemonProtocolProtos.SubmitWorkResponseProto response) {
                if (response.hasSubmissionState()) {
                    LlapDaemonProtocolProtos.SubmissionStateProto ss = response.getSubmissionState();
                    if (ss.equals((Object)LlapDaemonProtocolProtos.SubmissionStateProto.REJECTED)) {
                        LOG.info("Unable to run task: " + taskSpec.getTaskAttemptID() + " on containerId: " + containerId + ", Service Busy");
                        LlapTaskCommunicator.this.getContext().taskKilled(taskSpec.getTaskAttemptID(), TaskAttemptEndReason.EXECUTOR_BUSY, "Service Busy");
                        return;
                    }
                } else {
                    throw new RuntimeException("SubmissionState in response is expected!");
                }
                if (response.hasUniqueNodeId()) {
                    LlapTaskCommunicator.this.entityTracker.registerTaskSubmittedToNode(taskSpec.getTaskAttemptID(), response.getUniqueNodeId());
                }
                LOG.info("Successfully launched task: " + taskSpec.getTaskAttemptID());
                LlapTaskCommunicator.this.scheduler.notifyStarted(taskSpec.getTaskAttemptID());
            }

            public void indicateError(Throwable t) {
                Throwable originalError = t;
                if (t instanceof ServiceException) {
                    ServiceException se = (ServiceException)t;
                    t = se.getCause();
                }
                if (t instanceof RemoteException) {
                    LOG.info("Failed to run task: " + taskSpec.getTaskAttemptID() + " on containerId: " + containerId, t);
                    LlapTaskCommunicator.this.processSendError(originalError);
                    LlapTaskCommunicator.this.getContext().taskFailed(taskSpec.getTaskAttemptID(), TaskFailureType.NON_FATAL, TaskAttemptEndReason.OTHER, t.toString());
                } else if (t instanceof IOException) {
                    LOG.info("Unable to run task: " + taskSpec.getTaskAttemptID() + " on containerId: " + containerId + ", Communication Error");
                    LlapTaskCommunicator.this.processSendError(originalError);
                    LlapTaskCommunicator.this.getContext().taskKilled(taskSpec.getTaskAttemptID(), TaskAttemptEndReason.COMMUNICATION_ERROR, "Communication Error");
                } else {
                    LOG.info("Failed to run task: " + taskSpec.getTaskAttemptID() + " on containerId: " + containerId, t);
                    LlapTaskCommunicator.this.processSendError(originalError);
                    LlapTaskCommunicator.this.getContext().taskFailed(taskSpec.getTaskAttemptID(), TaskFailureType.NON_FATAL, TaskAttemptEndReason.OTHER, t.getMessage());
                }
            }
        });
    }

    private void handleInvalidToken() {
        this.communicator.refreshToken();
    }

    public void unregisterRunningTaskAttempt(TezTaskAttemptID taskAttemptId, TaskAttemptEndReason endReason, String diagnostics) {
        super.unregisterRunningTaskAttempt(taskAttemptId, endReason, diagnostics);
        if (endReason == TaskAttemptEndReason.INTERNAL_PREEMPTION) {
            LOG.info("Processing taskEnd for task {} caused by internal preemption", (Object)taskAttemptId);
            this.sendTaskTerminated(taskAttemptId, false);
        }
        this.entityTracker.unregisterTaskAttempt(taskAttemptId);
    }

    private void sendTaskTerminated(final TezTaskAttemptID taskAttemptId, boolean invokedByContainerEnd) {
        LOG.info("Attempting to send terminateRequest for fragment {} due to internal preemption invoked by {}", (Object)taskAttemptId.toString(), (Object)(invokedByContainerEnd ? "containerEnd" : "taskEnd"));
        LlapNodeId nodeId = this.entityTracker.getNodeIdForTaskAttempt(taskAttemptId);
        if (nodeId != null) {
            LlapDaemonProtocolProtos.TerminateFragmentRequestProto request = LlapDaemonProtocolProtos.TerminateFragmentRequestProto.newBuilder().setQueryIdentifier(this.constructQueryIdentifierProto(taskAttemptId.getDAGID().getId())).setFragmentIdentifierString(taskAttemptId.toString()).build();
            this.communicator.sendTerminateFragment(request, nodeId.getHostname(), nodeId.getPort(), (AsyncPbRpcProxy.ExecuteRequestCallback)new AsyncPbRpcProxy.ExecuteRequestCallback<LlapDaemonProtocolProtos.TerminateFragmentResponseProto>(){

                public void setResponse(LlapDaemonProtocolProtos.TerminateFragmentResponseProto response) {
                }

                public void indicateError(Throwable t) {
                    LOG.warn("Failed to send terminate fragment request for {}", (Object)taskAttemptId.toString());
                    LlapTaskCommunicator.this.processSendError(t);
                }
            });
        } else {
            LOG.info("Not sending terminate request for fragment {} since it's node is not known. Already unregistered", (Object)taskAttemptId.toString());
        }
    }

    public void dagComplete(final int dagIdentifier) {
        LlapDaemonProtocolProtos.QueryIdentifierProto queryIdentifierProto = this.constructQueryIdentifierProto(dagIdentifier);
        LlapDaemonProtocolProtos.QueryCompleteRequestProto request = LlapDaemonProtocolProtos.QueryCompleteRequestProto.newBuilder().setQueryIdentifier(queryIdentifierProto).setDeleteDelay(this.deleteDelayOnDagComplete).build();
        for (final LlapNodeId llapNodeId : this.nodesForQuery) {
            LOG.info("Sending dagComplete message for {}, to {}", (Object)dagIdentifier, (Object)llapNodeId);
            this.communicator.sendQueryComplete(request, llapNodeId.getHostname(), llapNodeId.getPort(), (AsyncPbRpcProxy.ExecuteRequestCallback)new AsyncPbRpcProxy.ExecuteRequestCallback<LlapDaemonProtocolProtos.QueryCompleteResponseProto>(){

                public void setResponse(LlapDaemonProtocolProtos.QueryCompleteResponseProto response) {
                }

                public void indicateError(Throwable t) {
                    LOG.warn("Failed to indicate dag complete dagId={} to node {}", (Object)dagIdentifier, (Object)llapNodeId);
                    LlapTaskCommunicator.this.processSendError(t);
                }
            });
        }
        this.nodesForQuery.clear();
    }

    public void onVertexStateUpdated(VertexStateUpdate vertexStateUpdate) {
        this.sourceStateTracker.sourceStateUpdated(vertexStateUpdate.getVertexName(), vertexStateUpdate.getVertexState());
    }

    public void sendStateUpdate(final LlapNodeId nodeId, final LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto request) {
        this.communicator.sendSourceStateUpdate(request, nodeId, (AsyncPbRpcProxy.ExecuteRequestCallback)new AsyncPbRpcProxy.ExecuteRequestCallback<LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto>(){

            public void setResponse(LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto response) {
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            public void indicateError(Throwable t) {
                LOG.error("Failed to send state update to node: {}, Killing all attempts running on node. Attempted StateUpdate={}", new Object[]{nodeId, request, t});
                LlapTaskCommunicator.this.processSendError(t);
                BiMap<ContainerId, TezTaskAttemptID> biMap = LlapTaskCommunicator.this.entityTracker.getContainerAttemptMapForNode(nodeId);
                if (biMap != null) {
                    BiMap<ContainerId, TezTaskAttemptID> biMap2 = biMap;
                    synchronized (biMap2) {
                        for (Map.Entry entry : biMap.entrySet()) {
                            LOG.info("Sending a kill for attempt {}, due to a communication failure while sending a finishable state update", entry.getValue());
                            LlapTaskCommunicator.this.getContext().taskKilled((TezTaskAttemptID)entry.getValue(), TaskAttemptEndReason.NODE_FAILED, "Failed to send finishable state update to node " + nodeId);
                        }
                    }
                }
            }
        });
    }

    public String getInProgressLogsUrl(TezTaskAttemptID attemptID, NodeId containerNodeId) {
        return this.constructLogUrl(attemptID, containerNodeId, false);
    }

    public String getCompletedLogsUrl(TezTaskAttemptID attemptID, NodeId containerNodeId) {
        return this.constructLogUrl(attemptID, containerNodeId, true);
    }

    private String constructLogUrl(TezTaskAttemptID attemptID, NodeId containerNodeId, boolean isDone) {
        Set instanceSet;
        if (this.timelineServerUri == null || containerNodeId == null) {
            return null;
        }
        try {
            instanceSet = this.serviceRegistry.getInstances().getByHost(containerNodeId.getHost());
        }
        catch (IOException e) {
            LOG.warn("Unable to find instance for yarnNodeId={} to construct the log url. Exception message={}", (Object)containerNodeId, (Object)e.getMessage());
            return null;
        }
        if (instanceSet != null) {
            LlapServiceInstance matchedInstance = null;
            for (LlapServiceInstance instance : instanceSet) {
                if (instance.getRpcPort() != containerNodeId.getPort()) continue;
                matchedInstance = instance;
                break;
            }
            if (matchedInstance != null) {
                String containerIdString = (String)matchedInstance.getProperties().get(HiveConf.ConfVars.LLAP_DAEMON_CONTAINER_ID.varname);
                String nmNodeAddress = (String)matchedInstance.getProperties().get(HiveConf.ConfVars.LLAP_DAEMON_NM_ADDRESS.varname);
                if (!StringUtils.isBlank((CharSequence)containerIdString) && !StringUtils.isBlank((CharSequence)nmNodeAddress)) {
                    return this.constructLlapLogUrl(attemptID, containerIdString, isDone, nmNodeAddress);
                }
            }
        }
        return null;
    }

    private String constructLlapLogUrl(TezTaskAttemptID attemptID, String containerIdString, boolean isDone, String nmAddress) {
        String dagId = attemptID.getDAGID().toString();
        String filename = JOINER.join((Object)this.currentHiveQueryId, (Object)"-", new Object[]{dagId, ".log", isDone ? ".done" : "", "?nm.id=", nmAddress});
        String url = PATH_JOINER.join((Object)this.timelineServerUri, (Object)"ws", new Object[]{"v1", "applicationhistory", "containers", containerIdString, "logs", filename});
        return url;
    }

    public void registerKnownNode(LlapNodeId nodeId) {
        Long old = this.knownNodeMap.putIfAbsent(nodeId, TimeUnit.MILLISECONDS.convert(System.nanoTime(), TimeUnit.NANOSECONDS));
        if (old == null) {
            LOG.info("Added new known node: {}", (Object)nodeId);
        }
    }

    public void registerPingingNode(LlapNodeId nodeId, String uniqueId) {
        long currentTs = TimeUnit.MILLISECONDS.convert(System.nanoTime(), TimeUnit.NANOSECONDS);
        PingingNodeInfo ni = new PingingNodeInfo(currentTs);
        PingingNodeInfo old = this.pingedNodeMap.put(nodeId, ni);
        if (old == null) {
            LOG.info("Added new pinging node: [{}] with uniqueId: {}", (Object)nodeId, (Object)uniqueId);
        } else {
            old.pingCount.incrementAndGet();
        }
        if (!this.knownNodeMap.containsKey(nodeId)) {
            if (old == null) {
                LOG.warn("Received ping from unknownNode: [{}], count={}", (Object)nodeId, (Object)ni.pingCount.get());
            } else if (currentTs > old.logTimestamp.get() + 5000L) {
                LOG.warn("Received ping from unknownNode: [{}], count={}", (Object)nodeId, (Object)old.pingCount.get());
                old.logTimestamp.set(currentTs);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void nodePinged(String hostname, String uniqueId, int port, LlapTaskUmbilicalProtocol.TezAttemptArray tasks, LlapTaskUmbilicalProtocol.BooleanArray guaranteed) {
        LlapNodeId nodeId = LlapNodeId.getInstance((String)hostname, (int)port);
        this.registerPingingNode(nodeId, uniqueId);
        BiMap<ContainerId, TezTaskAttemptID> biMap = this.entityTracker.getContainerAttemptMapForNode(nodeId);
        if (biMap != null) {
            HashSet<TezTaskAttemptID> error = new HashSet<TezTaskAttemptID>();
            BiMap<ContainerId, TezTaskAttemptID> biMap2 = biMap;
            synchronized (biMap2) {
                for (int i = 0; i < tasks.get().length; ++i) {
                    TezTaskAttemptID attemptID;
                    boolean isGuaranteed = false;
                    if (guaranteed != null) {
                        isGuaranteed = ((BooleanWritable)guaranteed.get()[i]).get();
                    }
                    if (!biMap.containsValue((Object)(attemptID = (TezTaskAttemptID)tasks.get()[i]))) continue;
                    String taskNodeId = this.entityTracker.getUniqueNodeId(attemptID);
                    if (taskNodeId != null && taskNodeId.equals(uniqueId)) {
                        this.getContext().taskAlive(attemptID);
                        this.scheduler.taskInfoUpdated(attemptID, isGuaranteed);
                        this.getContext().containerAlive((ContainerId)biMap.inverse().get((Object)attemptID));
                        continue;
                    }
                    error.add(attemptID);
                }
            }
            if (!error.isEmpty()) {
                LOG.info("The tasks we expected to be on the node are not there: " + error);
                for (TezTaskAttemptID attempt : error) {
                    LOG.info("Sending a kill for attempt {}, due to a ping from node with same host and same port but registered with different unique ID", (Object)attempt);
                    this.getContext().taskKilled(attempt, TaskAttemptEndReason.NODE_FAILED, "Node with same host and port but with new unique ID pinged");
                }
            }
        } else {
            long currentTs = TimeUnit.MILLISECONDS.convert(System.nanoTime(), TimeUnit.NANOSECONDS);
            if (currentTs > this.nodeNotFoundLogTime.get() + 5000L) {
                LOG.warn("Received ping from node without any registered tasks or containers: " + hostname + ":" + port + ". Could be caused by pre-emption by the AM, or a mismatched hostname. Enable debug logging for mismatched host names");
                this.nodeNotFoundLogTime.set(currentTs);
            }
        }
    }

    private void resetCurrentDag(int newDagId, String hiveQueryId) {
        this.currentQueryIdentifierProto = this.constructQueryIdentifierProto(newDagId);
        this.currentHiveQueryId = hiveQueryId;
        this.sourceStateTracker.resetState(this.currentQueryIdentifierProto);
        this.nodesForQuery.clear();
        LOG.info("CurrentDagId set to: " + newDagId + ", name=" + this.getContext().getCurrentDagInfo().getName() + ", queryId=" + hiveQueryId);
    }

    private String extractQueryId(TaskSpec taskSpec) throws IOException {
        UserPayload processorPayload = taskSpec.getProcessorDescriptor().getUserPayload();
        Configuration conf = TezUtils.createConfFromUserPayload((UserPayload)processorPayload);
        return HiveConf.getVar((Configuration)conf, (HiveConf.ConfVars)HiveConf.ConfVars.HIVE_QUERY_ID);
    }

    private String extractQueryIdFromContext() {
        DagInfo dagInfo = this.getContext().getCurrentDagInfo();
        if (dagInfo instanceof DAG) {
            return ((DAG)dagInfo).getConf().get(HiveConf.ConfVars.HIVE_QUERY_ID.varname);
        }
        return null;
    }

    private LlapDaemonProtocolProtos.SubmitWorkRequestProto constructSubmitWorkRequest(ContainerId containerId, TaskSpec taskSpec, LlapDaemonProtocolProtos.FragmentRuntimeInfo fragmentRuntimeInfo, String hiveQueryId) throws IOException {
        LlapDaemonProtocolProtos.SubmitWorkRequestProto.Builder builder = LlapDaemonProtocolProtos.SubmitWorkRequestProto.newBuilder();
        builder.setFragmentNumber(taskSpec.getTaskAttemptID().getTaskID().getId());
        builder.setAttemptNumber(taskSpec.getTaskAttemptID().getId());
        builder.setContainerIdString(containerId.toString());
        builder.setAmHost(this.getAmHostString());
        builder.setAmPort(this.getAddress().getPort());
        Preconditions.checkState((this.currentQueryIdentifierProto.getDagIndex() == taskSpec.getDAGID().getId() ? 1 : 0) != 0);
        builder.setCredentialsBinary(this.getCredentials(this.getContext().getCurrentDagInfo().getCredentials()));
        builder.setWorkSpec(LlapDaemonProtocolProtos.VertexOrBinary.newBuilder().setVertex(Converters.constructSignableVertexSpec((TaskSpec)taskSpec, (LlapDaemonProtocolProtos.QueryIdentifierProto)this.currentQueryIdentifierProto, (String)this.getTokenIdentifier(), (String)this.user, (String)hiveQueryId)).build());
        builder.setFragmentRuntimeInfo(fragmentRuntimeInfo);
        builder.setIsGuaranteed(ContainerFactory.isContainerInitializedAsGuaranteed(containerId));
        return builder.build();
    }

    private ByteString getCredentials(Credentials credentials) throws IOException {
        ByteBuffer credentialsBinary = (ByteBuffer)this.credentialMap.get(this.currentQueryIdentifierProto);
        if (credentialsBinary == null) {
            credentialsBinary = LlapTezUtils.serializeCredentials(credentials);
            this.credentialMap.putIfAbsent(this.currentQueryIdentifierProto, credentialsBinary.duplicate());
        } else {
            credentialsBinary = credentialsBinary.duplicate();
        }
        return ByteString.copyFrom((ByteBuffer)credentialsBinary);
    }

    private LlapDaemonProtocolProtos.QueryIdentifierProto constructQueryIdentifierProto(int dagIdentifier) {
        return LlapDaemonProtocolProtos.QueryIdentifierProto.newBuilder().setApplicationIdString(this.getContext().getCurrentAppIdentifier()).setDagIndex(dagIdentifier).setAppAttemptNumber(this.getContext().getApplicationAttemptId().getAttemptId()).build();
    }

    public String getAmHostString() {
        return this.amHost;
    }

    @VisibleForTesting
    static final class EntityTracker {
        @VisibleForTesting
        final ConcurrentMap<TezTaskAttemptID, LlapNodeId> attemptToNodeMap = new ConcurrentHashMap<TezTaskAttemptID, LlapNodeId>();
        @VisibleForTesting
        final ConcurrentMap<ContainerId, LlapNodeId> containerToNodeMap = new ConcurrentHashMap<ContainerId, LlapNodeId>();
        @VisibleForTesting
        final ConcurrentMap<LlapNodeId, BiMap<ContainerId, TezTaskAttemptID>> nodeMap = new ConcurrentHashMap<LlapNodeId, BiMap<ContainerId, TezTaskAttemptID>>();
        private final ConcurrentMap<TezTaskAttemptID, String> uniqueNodeMap = new ConcurrentHashMap<TezTaskAttemptID, String>();

        EntityTracker() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        void registerTaskAttempt(ContainerId containerId, TezTaskAttemptID taskAttemptId, String host, int port) {
            BiMap<ContainerId, TezTaskAttemptID> usedInstance;
            if (LOG.isDebugEnabled()) {
                LOG.debug("Registering " + containerId + ", " + taskAttemptId + " for node: " + host + ":" + port);
            }
            LlapNodeId llapNodeId = LlapNodeId.getInstance((String)host, (int)port);
            this.attemptToNodeMap.putIfAbsent(taskAttemptId, llapNodeId);
            this.registerContainer(containerId, host, port);
            BiMap<ContainerId, TezTaskAttemptID> tmpMap = HashBiMap.create();
            BiMap<ContainerId, TezTaskAttemptID> old = this.nodeMap.putIfAbsent(llapNodeId, tmpMap);
            BiMap<ContainerId, TezTaskAttemptID> biMap = usedInstance = old == null ? tmpMap : old;
            synchronized (biMap) {
                usedInstance.put((Object)containerId, (Object)taskAttemptId);
            }
            this.nodeMap.putIfAbsent(llapNodeId, usedInstance);
        }

        public String getUniqueNodeId(TezTaskAttemptID attemptId) {
            return (String)this.uniqueNodeMap.get(attemptId);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void registerTaskSubmittedToNode(TezTaskAttemptID taskAttemptID, String uniqueNodeId) {
            ConcurrentMap<TezTaskAttemptID, LlapNodeId> concurrentMap = this.attemptToNodeMap;
            synchronized (concurrentMap) {
                String prev;
                if (this.attemptToNodeMap.containsKey(taskAttemptID) && (prev = this.uniqueNodeMap.putIfAbsent(taskAttemptID, uniqueNodeId)) != null) {
                    LOG.warn("Replaced the unique node mapping for task from " + prev + " to " + uniqueNodeId);
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        void unregisterTaskAttempt(TezTaskAttemptID attemptId) {
            LlapNodeId llapNodeId;
            this.uniqueNodeMap.remove(attemptId);
            ConcurrentMap<TezTaskAttemptID, LlapNodeId> concurrentMap = this.attemptToNodeMap;
            synchronized (concurrentMap) {
                llapNodeId = (LlapNodeId)this.attemptToNodeMap.remove(attemptId);
                if (llapNodeId == null) {
                    return;
                }
            }
            BiMap bMap = (BiMap)this.nodeMap.get(llapNodeId);
            ContainerId matched = null;
            if (bMap != null) {
                BiMap biMap = bMap;
                synchronized (biMap) {
                    matched = (ContainerId)bMap.inverse().remove((Object)attemptId);
                }
                if (bMap.isEmpty()) {
                    this.nodeMap.remove(llapNodeId);
                }
            }
            if (matched != null) {
                this.containerToNodeMap.remove(matched);
            }
        }

        void registerContainer(ContainerId containerId, String hostname, int port) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Registering " + containerId + " for node: " + hostname + ":" + port);
            }
            this.containerToNodeMap.putIfAbsent(containerId, LlapNodeId.getInstance((String)hostname, (int)port));
        }

        LlapNodeId getNodeIdForContainer(ContainerId containerId) {
            return (LlapNodeId)this.containerToNodeMap.get(containerId);
        }

        LlapNodeId getNodeIdForTaskAttempt(TezTaskAttemptID taskAttemptId) {
            return (LlapNodeId)this.attemptToNodeMap.get(taskAttemptId);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        ContainerId getContainerIdForAttempt(TezTaskAttemptID taskAttemptId) {
            LlapNodeId llapNodeId = this.getNodeIdForTaskAttempt(taskAttemptId);
            if (llapNodeId != null) {
                BiMap bMap = ((BiMap)this.nodeMap.get(llapNodeId)).inverse();
                if (bMap != null) {
                    BiMap biMap = bMap;
                    synchronized (biMap) {
                        return (ContainerId)bMap.get((Object)taskAttemptId);
                    }
                }
                return null;
            }
            return null;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        TezTaskAttemptID getTaskAttemptIdForContainer(ContainerId containerId) {
            LlapNodeId llapNodeId = this.getNodeIdForContainer(containerId);
            if (llapNodeId != null) {
                BiMap bMap = (BiMap)this.nodeMap.get(llapNodeId);
                if (bMap != null) {
                    BiMap biMap = bMap;
                    synchronized (biMap) {
                        return (TezTaskAttemptID)bMap.get((Object)containerId);
                    }
                }
                return null;
            }
            return null;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        void unregisterContainer(ContainerId containerId) {
            LlapNodeId llapNodeId = (LlapNodeId)this.containerToNodeMap.remove(containerId);
            if (llapNodeId == null) {
                return;
            }
            BiMap bMap = (BiMap)this.nodeMap.get(llapNodeId);
            TezTaskAttemptID matched = null;
            if (bMap != null) {
                BiMap biMap = bMap;
                synchronized (biMap) {
                    matched = (TezTaskAttemptID)bMap.remove((Object)containerId);
                }
                if (bMap.isEmpty()) {
                    this.nodeMap.remove(llapNodeId);
                }
            }
            if (matched != null) {
                this.attemptToNodeMap.remove(matched);
                this.uniqueNodeMap.remove(matched);
            }
        }

        BiMap<ContainerId, TezTaskAttemptID> getContainerAttemptMapForNode(LlapNodeId llapNodeId) {
            return (BiMap)this.nodeMap.get(llapNodeId);
        }
    }

    protected class LlapTaskUmbilicalProtocolImpl
    implements LlapTaskUmbilicalProtocol {
        private final TezTaskUmbilicalProtocol tezUmbilical;

        public LlapTaskUmbilicalProtocolImpl(TezTaskUmbilicalProtocol tezUmbilical) {
            this.tezUmbilical = tezUmbilical;
        }

        public boolean canCommit(TezTaskAttemptID taskid) throws IOException {
            return this.tezUmbilical.canCommit(taskid);
        }

        public TezHeartbeatResponse heartbeat(TezHeartbeatRequest request) throws IOException, TezException {
            return this.tezUmbilical.heartbeat(request);
        }

        public void nodeHeartbeat(Text hostname, Text uniqueId, int port, LlapTaskUmbilicalProtocol.TezAttemptArray aw, LlapTaskUmbilicalProtocol.BooleanArray guaranteed) throws IOException {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Received heartbeat from [" + hostname + ":" + port + " (" + uniqueId + ")]");
            }
            LlapTaskCommunicator.this.nodePinged(hostname.toString(), uniqueId.toString(), port, aw, guaranteed);
        }

        public void taskKilled(TezTaskAttemptID taskAttemptId) throws IOException {
            LlapTaskCommunicator.this.getContext().taskKilled(taskAttemptId, TaskAttemptEndReason.EXTERNAL_PREEMPTION, "Attempt preempted");
            LlapTaskCommunicator.this.entityTracker.unregisterTaskAttempt(taskAttemptId);
        }

        public long getProtocolVersion(String protocol, long clientVersion) throws IOException {
            return 1L;
        }

        public ProtocolSignature getProtocolSignature(String protocol, long clientVersion, int clientMethodsHash) throws IOException {
            return ProtocolSignature.getProtocolSignature((VersionedProtocol)this, (String)protocol, (long)clientVersion, (int)clientMethodsHash);
        }
    }

    public static interface OperationCallback<ResultType, CtxType> {
        public void setDone(CtxType var1, ResultType var2);

        public void setError(CtxType var1, Throwable var2);
    }

    private static class PingingNodeInfo {
        final AtomicLong logTimestamp;
        final AtomicInteger pingCount;

        PingingNodeInfo(long currentTs) {
            this.logTimestamp = new AtomicLong(currentTs);
            this.pingCount = new AtomicInteger(1);
        }
    }
}

