/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tez.runtime;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.Closeable;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.lang.management.ThreadInfo;
import java.lang.management.ThreadMXBean;
import java.nio.ByteBuffer;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.tez.common.CallableWithNdc;
import org.apache.tez.common.ReflectionUtils;
import org.apache.tez.common.RunnableWithNdc;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.OutputDescriptor;
import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.hadoop.shim.HadoopShim;
import org.apache.tez.runtime.InputReadyTracker;
import org.apache.tez.runtime.RuntimeTask;
import org.apache.tez.runtime.api.AbstractLogicalIOProcessor;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.ExecutionContext;
import org.apache.tez.runtime.api.Input;
import org.apache.tez.runtime.api.InputContext;
import org.apache.tez.runtime.api.InputFrameworkInterface;
import org.apache.tez.runtime.api.LogicalIOProcessor;
import org.apache.tez.runtime.api.LogicalInput;
import org.apache.tez.runtime.api.LogicalOutput;
import org.apache.tez.runtime.api.LogicalOutputFrameworkInterface;
import org.apache.tez.runtime.api.MergedInputContext;
import org.apache.tez.runtime.api.MergedLogicalInput;
import org.apache.tez.runtime.api.ObjectRegistry;
import org.apache.tez.runtime.api.Output;
import org.apache.tez.runtime.api.OutputContext;
import org.apache.tez.runtime.api.OutputFrameworkInterface;
import org.apache.tez.runtime.api.Processor;
import org.apache.tez.runtime.api.ProcessorContext;
import org.apache.tez.runtime.api.TaskContext;
import org.apache.tez.runtime.api.TaskFailureType;
import org.apache.tez.runtime.api.impl.EventMetaData;
import org.apache.tez.runtime.api.impl.GroupInputSpec;
import org.apache.tez.runtime.api.impl.InputSpec;
import org.apache.tez.runtime.api.impl.OutputSpec;
import org.apache.tez.runtime.api.impl.TaskSpec;
import org.apache.tez.runtime.api.impl.TezEvent;
import org.apache.tez.runtime.api.impl.TezInputContextImpl;
import org.apache.tez.runtime.api.impl.TezMergedInputContextImpl;
import org.apache.tez.runtime.api.impl.TezOutputContextImpl;
import org.apache.tez.runtime.api.impl.TezProcessorContextImpl;
import org.apache.tez.runtime.api.impl.TezUmbilical;
import org.apache.tez.runtime.common.resources.MemoryDistributor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceAudience.Private
public class LogicalIOProcessorRuntimeTask
extends RuntimeTask {
    private static final Logger LOG = LoggerFactory.getLogger(LogicalIOProcessorRuntimeTask.class);
    @VisibleForTesting
    private final String[] localDirs;
    final List<InputSpec> inputSpecs;
    final ConcurrentMap<String, LogicalInput> inputsMap;
    final ConcurrentMap<String, InputContext> inputContextMap;
    final List<OutputSpec> outputSpecs;
    final ConcurrentMap<String, LogicalOutput> outputsMap;
    final ConcurrentMap<String, OutputContext> outputContextMap;
    final List<GroupInputSpec> groupInputSpecs;
    ConcurrentHashMap<String, MergedLogicalInput> groupInputsMap;
    final ConcurrentHashMap<String, LogicalInput> initializedInputs;
    final ConcurrentHashMap<String, LogicalOutput> initializedOutputs;
    private boolean processorClosed = false;
    final ProcessorDescriptor processorDescriptor;
    AbstractLogicalIOProcessor processor;
    ProcessorContext processorContext;
    private final MemoryDistributor initialMemoryDistributor;
    final LinkedHashMap<String, LogicalInput> runInputMap;
    final LinkedHashMap<String, LogicalOutput> runOutputMap;
    private final Map<String, ByteBuffer> serviceConsumerMetadata;
    private final Map<String, String> envMap;
    final ExecutorService initializerExecutor;
    private final CompletionService<Void> initializerCompletionService;
    private final Multimap<String, String> startedInputsMap;
    LinkedBlockingQueue<TezEvent> eventsToBeProcessed;
    Thread eventRouterThread = null;
    private final int appAttemptNumber;
    private volatile InputReadyTracker inputReadyTracker;
    private volatile ObjectRegistry objectRegistry;
    private final ExecutionContext ExecutionContext;
    private final long memAvailable;
    private final HadoopShim hadoopShim;
    private final int maxEventBacklog;

    public LogicalIOProcessorRuntimeTask(TaskSpec taskSpec, int appAttemptNumber, Configuration tezConf, String[] localDirs, TezUmbilical tezUmbilical, Map<String, ByteBuffer> serviceConsumerMetadata, Map<String, String> envMap, Multimap<String, String> startedInputsMap, ObjectRegistry objectRegistry, String pid, ExecutionContext ExecutionContext2, long memAvailable, boolean updateSysCounters, HadoopShim hadoopShim) throws IOException {
        super(taskSpec, tezConf, tezUmbilical, pid, updateSysCounters);
        LOG.info("Initializing LogicalIOProcessorRuntimeTask with TaskSpec: " + taskSpec);
        int numInputs = taskSpec.getInputs().size();
        int numOutputs = taskSpec.getOutputs().size();
        this.localDirs = localDirs;
        this.inputSpecs = taskSpec.getInputs();
        this.inputsMap = new ConcurrentHashMap<String, LogicalInput>(numInputs);
        this.inputContextMap = new ConcurrentHashMap<String, InputContext>(numInputs);
        this.outputSpecs = taskSpec.getOutputs();
        this.outputsMap = new ConcurrentHashMap<String, LogicalOutput>(numOutputs);
        this.outputContextMap = new ConcurrentHashMap<String, OutputContext>(numOutputs);
        this.runInputMap = new LinkedHashMap();
        this.runOutputMap = new LinkedHashMap();
        this.initializedInputs = new ConcurrentHashMap();
        this.initializedOutputs = new ConcurrentHashMap();
        this.processorDescriptor = taskSpec.getProcessorDescriptor();
        this.serviceConsumerMetadata = serviceConsumerMetadata;
        this.envMap = envMap;
        this.eventsToBeProcessed = new LinkedBlockingQueue();
        this.state.set(RuntimeTask.State.NEW);
        this.appAttemptNumber = appAttemptNumber;
        int numInitializers = numInputs + numOutputs;
        numInitializers = numInitializers == 0 ? 1 : numInitializers;
        this.initializerExecutor = Executors.newFixedThreadPool(numInitializers, new ThreadFactoryBuilder().setDaemon(true).setNameFormat("I/O Setup %d").build());
        this.initializerCompletionService = new ExecutorCompletionService<Void>(this.initializerExecutor);
        this.groupInputSpecs = taskSpec.getGroupInputs();
        this.initialMemoryDistributor = new MemoryDistributor(numInputs, numOutputs, tezConf);
        this.startedInputsMap = startedInputsMap;
        this.inputReadyTracker = new InputReadyTracker();
        this.objectRegistry = objectRegistry;
        this.ExecutionContext = ExecutionContext2;
        this.memAvailable = memAvailable;
        this.hadoopShim = hadoopShim;
        this.maxEventBacklog = tezConf.getInt("tez.task.max-event-backlog", 10000);
    }

    public void initialize() throws Exception {
        int completedTasks;
        Preconditions.checkState((this.state.get() == RuntimeTask.State.NEW ? 1 : 0) != 0, (Object)"Already initialized");
        this.state.set(RuntimeTask.State.INITED);
        this.processorContext = this.createProcessorContext();
        this.processor = this.createProcessor(this.processorDescriptor.getClassName(), this.processorContext);
        int numTasks = 0;
        int inputIndex = 0;
        for (InputSpec inputSpec : this.taskSpec.getInputs()) {
            this.initializerCompletionService.submit((Callable<Void>)((Object)new InitializeInputCallable(inputSpec, inputIndex++)));
            ++numTasks;
        }
        int outputIndex = 0;
        for (OutputSpec outputSpec : this.taskSpec.getOutputs()) {
            this.initializerCompletionService.submit((Callable<Void>)((Object)new InitializeOutputCallable(outputSpec, outputIndex++)));
            ++numTasks;
        }
        this.initializeLogicalIOProcessor();
        for (completedTasks = 0; completedTasks < numTasks; ++completedTasks) {
            LOG.info("Waiting for " + (numTasks - completedTasks) + " initializers to finish");
            Future<Void> future = this.initializerCompletionService.take();
            try {
                future.get();
                continue;
            }
            catch (ExecutionException e) {
                if (e.getCause() instanceof Exception) {
                    throw (Exception)e.getCause();
                }
                throw new Exception(e);
            }
        }
        LOG.info("All initializers finished");
        this.initializeGroupInputs();
        this.inputReadyTracker.setGroupedInputs(this.groupInputsMap == null ? null : this.groupInputsMap.values());
        HashSet groupInputs = Sets.newHashSet();
        if (this.groupInputSpecs != null && !this.groupInputSpecs.isEmpty()) {
            for (GroupInputSpec groupInputSpec : this.groupInputSpecs) {
                this.runInputMap.put(groupInputSpec.getGroupName(), (LogicalInput)this.groupInputsMap.get(groupInputSpec.getGroupName()));
                groupInputs.addAll(groupInputSpec.getGroupVertices());
            }
        }
        this.initialMemoryDistributor.makeInitialAllocations();
        LOG.info("Starting Inputs/Outputs");
        int numAutoStarts = 0;
        for (InputSpec inputSpec : this.inputSpecs) {
            if (groupInputs.contains(inputSpec.getSourceVertexName())) {
                LOG.info("Ignoring " + inputSpec.getSourceVertexName() + " for start, since it will be controlled via it's Group");
                continue;
            }
            if (this.inputAlreadyStarted(this.taskSpec.getVertexName(), inputSpec.getSourceVertexName())) continue;
            this.startedInputsMap.put((Object)this.taskSpec.getVertexName(), (Object)inputSpec.getSourceVertexName());
            ++numAutoStarts;
            this.initializerCompletionService.submit((Callable<Void>)((Object)new StartInputCallable((LogicalInput)this.inputsMap.get(inputSpec.getSourceVertexName()), inputSpec.getSourceVertexName())));
            LOG.info("Input: " + inputSpec.getSourceVertexName() + " being auto started by the framework. Subsequent instances will not be auto-started");
        }
        if (this.groupInputSpecs != null) {
            for (GroupInputSpec group : this.groupInputSpecs) {
                if (this.inputAlreadyStarted(this.taskSpec.getVertexName(), group.getGroupName())) continue;
                ++numAutoStarts;
                this.initializerCompletionService.submit((Callable<Void>)((Object)new StartInputCallable((LogicalInput)this.groupInputsMap.get(group.getGroupName()), group.getGroupName())));
                LOG.info("InputGroup: " + group.getGroupName() + " being auto started by the framework. Subsequent instance will not be auto-started");
            }
        }
        this.initializerExecutor.shutdown();
        LOG.info("Num IOs determined for AutoStart: " + numAutoStarts);
        for (completedTasks = 0; completedTasks < numAutoStarts; ++completedTasks) {
            LOG.info("Waiting for " + (numAutoStarts - completedTasks) + " IOs to start");
            Future<Void> future = this.initializerCompletionService.take();
            try {
                future.get();
                continue;
            }
            catch (ExecutionException e) {
                if (e.getCause() instanceof Exception) {
                    throw (Exception)e.getCause();
                }
                throw new Exception(e);
            }
        }
        LOG.info("AutoStartComplete");
        for (InputSpec inputSpec : this.inputSpecs) {
            if (groupInputs.contains(inputSpec.getSourceVertexName())) continue;
            LogicalInput input = (LogicalInput)this.inputsMap.get(inputSpec.getSourceVertexName());
            this.runInputMap.put(inputSpec.getSourceVertexName(), input);
        }
        for (OutputSpec outputSpec : this.outputSpecs) {
            LogicalOutput output = (LogicalOutput)this.outputsMap.get(outputSpec.getDestinationVertexName());
            String outputName = outputSpec.getDestinationVertexName();
            this.runOutputMap.put(outputName, output);
        }
        this.startRouterThread();
    }

    public void run() throws Exception {
        Preconditions.checkState((this.state.get() == RuntimeTask.State.INITED ? 1 : 0) != 0, (Object)("Can only run while in INITED state. Current: " + this.state));
        this.state.set(RuntimeTask.State.RUNNING);
        this.processor.run(this.runInputMap, this.runOutputMap);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void close() throws Exception {
        try {
            Preconditions.checkState((this.state.get() == RuntimeTask.State.RUNNING ? 1 : 0) != 0, (Object)("Can only run while in RUNNING state. Current: " + this.state));
            this.state.set(RuntimeTask.State.CLOSED);
            for (InputSpec inputSpec : this.inputSpecs) {
                String srcVertexName = inputSpec.getSourceVertexName();
                this.initializedInputs.remove(srcVertexName);
                List closeInputEvents = ((InputFrameworkInterface)this.inputsMap.get(srcVertexName)).close();
                this.sendTaskGeneratedEvents(closeInputEvents, EventMetaData.EventProducerConsumerType.INPUT, this.taskSpec.getVertexName(), srcVertexName, this.taskSpec.getTaskAttemptID());
            }
            for (OutputSpec outputSpec : this.outputSpecs) {
                String destVertexName = outputSpec.getDestinationVertexName();
                this.initializedOutputs.remove(destVertexName);
                List closeOutputEvents = ((LogicalOutputFrameworkInterface)this.outputsMap.get(destVertexName)).close();
                this.sendTaskGeneratedEvents(closeOutputEvents, EventMetaData.EventProducerConsumerType.OUTPUT, this.taskSpec.getVertexName(), destVertexName, this.taskSpec.getTaskAttemptID());
            }
            this.processorClosed = true;
            this.processor.close();
        }
        finally {
            this.setTaskDone();
            Thread.interrupted();
            if (this.eventRouterThread != null) {
                this.eventRouterThread.interrupt();
                LOG.info("Joining on EventRouter");
                try {
                    this.eventRouterThread.join();
                }
                catch (InterruptedException e) {
                    LOG.info("Ignoring interrupt while waiting for the router thread to die");
                    Thread.currentThread().interrupt();
                }
                this.eventRouterThread = null;
            }
            String timeStamp = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(Calendar.getInstance().getTime());
            System.err.println(timeStamp + " Completed running task attempt: " + this.taskSpec.getTaskAttemptID().toString());
            System.out.println(timeStamp + " Completed running task attempt: " + this.taskSpec.getTaskAttemptID().toString());
        }
    }

    private boolean inputAlreadyStarted(String vertexName, String edgeVertexName) {
        return this.startedInputsMap.containsKey((Object)vertexName) && this.startedInputsMap.get((Object)vertexName).contains(edgeVertexName);
    }

    private void initializeGroupInputs() throws TezException {
        if (this.groupInputSpecs != null && !this.groupInputSpecs.isEmpty()) {
            this.groupInputsMap = new ConcurrentHashMap(this.groupInputSpecs.size());
            for (GroupInputSpec groupInputSpec : this.groupInputSpecs) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Initializing GroupInput using GroupInputSpec: " + groupInputSpec);
                }
                TezMergedInputContextImpl mergedInputContext = new TezMergedInputContextImpl(groupInputSpec.getMergedInputDescriptor().getUserPayload(), groupInputSpec.getGroupName(), this.groupInputsMap, this.inputReadyTracker, this.localDirs, this);
                ArrayList inputs = Lists.newArrayListWithCapacity((int)groupInputSpec.getGroupVertices().size());
                for (String groupVertex : groupInputSpec.getGroupVertices()) {
                    inputs.add(this.inputsMap.get(groupVertex));
                }
                MergedLogicalInput groupInput = (MergedLogicalInput)this.createMergedInput(groupInputSpec.getMergedInputDescriptor(), mergedInputContext, inputs);
                this.groupInputsMap.put(groupInputSpec.getGroupName(), groupInput);
            }
        }
    }

    private void initializeLogicalIOProcessor() throws Exception {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Initializing processor, processorClassName=" + this.processorDescriptor.getClassName());
        }
        this.processor.initialize();
        LOG.info("Initialized processor");
    }

    private InputContext createInputContext(Map<String, LogicalInput> inputMap, InputSpec inputSpec, int inputIndex) {
        TezInputContextImpl inputContext = new TezInputContextImpl(this.tezConf, this.localDirs, this.appAttemptNumber, this.tezUmbilical, this.taskSpec.getDAGName(), this.taskSpec.getVertexName(), inputSpec.getSourceVertexName(), this.taskSpec.getVertexParallelism(), this.taskSpec.getTaskAttemptID(), inputIndex, inputSpec.getInputDescriptor().getUserPayload(), this, this.serviceConsumerMetadata, this.envMap, this.initialMemoryDistributor, inputSpec.getInputDescriptor(), inputMap, this.inputReadyTracker, this.objectRegistry, this.ExecutionContext, this.memAvailable);
        return inputContext;
    }

    private OutputContext createOutputContext(OutputSpec outputSpec, int outputIndex) {
        TezOutputContextImpl outputContext = new TezOutputContextImpl(this.tezConf, this.localDirs, this.appAttemptNumber, this.tezUmbilical, this.taskSpec.getDAGName(), this.taskSpec.getVertexName(), outputSpec.getDestinationVertexName(), this.taskSpec.getVertexParallelism(), this.taskSpec.getTaskAttemptID(), outputIndex, outputSpec.getOutputDescriptor().getUserPayload(), this, this.serviceConsumerMetadata, this.envMap, this.initialMemoryDistributor, outputSpec.getOutputDescriptor(), this.objectRegistry, this.ExecutionContext, this.memAvailable);
        return outputContext;
    }

    private ProcessorContext createProcessorContext() {
        TezProcessorContextImpl processorContext = new TezProcessorContextImpl(this.tezConf, this.localDirs, this.appAttemptNumber, this.tezUmbilical, this.taskSpec.getDAGName(), this.taskSpec.getVertexName(), this.taskSpec.getVertexParallelism(), this.taskSpec.getTaskAttemptID(), this.processorDescriptor.getUserPayload(), this, this.serviceConsumerMetadata, this.envMap, this.initialMemoryDistributor, this.processorDescriptor, this.inputReadyTracker, this.objectRegistry, this.ExecutionContext, this.memAvailable);
        return processorContext;
    }

    private LogicalInput createInput(InputSpec inputSpec, InputContext inputContext) throws TezException {
        InputDescriptor inputDesc = inputSpec.getInputDescriptor();
        Input input = (Input)ReflectionUtils.createClazzInstance((String)inputDesc.getClassName(), (Class[])new Class[]{InputContext.class, Integer.TYPE}, (Object[])new Object[]{inputContext, inputSpec.getPhysicalEdgeCount()});
        if (!(input instanceof LogicalInput)) {
            throw new TezUncheckedException(inputDesc.getClass().getName() + " is not a sub-type of LogicalInput." + " Only LogicalInput sub-types supported by LogicalIOProcessor.");
        }
        return (LogicalInput)input;
    }

    private LogicalInput createMergedInput(InputDescriptor inputDesc, MergedInputContext mergedInputContext, List<Input> constituentInputs) throws TezException {
        LogicalInput input = (LogicalInput)ReflectionUtils.createClazzInstance((String)inputDesc.getClassName(), (Class[])new Class[]{MergedInputContext.class, List.class}, (Object[])new Object[]{mergedInputContext, constituentInputs});
        return input;
    }

    private LogicalOutput createOutput(OutputSpec outputSpec, OutputContext outputContext) throws TezException {
        OutputDescriptor outputDesc = outputSpec.getOutputDescriptor();
        Output output = (Output)ReflectionUtils.createClazzInstance((String)outputDesc.getClassName(), (Class[])new Class[]{OutputContext.class, Integer.TYPE}, (Object[])new Object[]{outputContext, outputSpec.getPhysicalEdgeCount()});
        if (!(output instanceof LogicalOutput)) {
            throw new TezUncheckedException(output.getClass().getName() + " is not a sub-type of LogicalOutput." + " Only LogicalOutput sub-types supported by LogicalIOProcessor.");
        }
        return (LogicalOutput)output;
    }

    private AbstractLogicalIOProcessor createProcessor(String processorClassName, ProcessorContext processorContext) throws TezException {
        Processor processor = (Processor)ReflectionUtils.createClazzInstance((String)processorClassName, (Class[])new Class[]{ProcessorContext.class}, (Object[])new Object[]{processorContext});
        if (!(processor instanceof AbstractLogicalIOProcessor)) {
            throw new TezUncheckedException(processor.getClass().getName() + " is not a sub-type of AbstractLogicalIOProcessor." + " Only AbstractLogicalIOProcessor sub-types supported by LogicalIOProcessorRuntimeTask.");
        }
        return (AbstractLogicalIOProcessor)processor;
    }

    private void sendTaskGeneratedEvents(List<Event> events, EventMetaData.EventProducerConsumerType generator, String taskVertexName, String edgeVertexName, TezTaskAttemptID taskAttemptID) {
        if (events == null || events.isEmpty()) {
            return;
        }
        EventMetaData eventMetaData = new EventMetaData(generator, taskVertexName, edgeVertexName, taskAttemptID);
        ArrayList<TezEvent> tezEvents = new ArrayList<TezEvent>(events.size());
        for (Event event : events) {
            TezEvent te = new TezEvent(event, eventMetaData);
            tezEvents.add(te);
        }
        if (LOG.isDebugEnabled()) {
            for (TezEvent tezEvent : tezEvents) {
                LOG.debug("Generated event info, eventMetaData=" + eventMetaData.toString() + ", eventType=" + (Object)((Object)tezEvent.getEventType()));
            }
        }
        this.tezUmbilical.addEvents(tezEvents);
    }

    private boolean handleEvent(TezEvent e) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Handling TezEvent in task, taskAttemptId=" + this.taskSpec.getTaskAttemptID() + ", eventType=" + (Object)((Object)e.getEventType()) + ", eventSourceInfo=" + e.getSourceInfo() + ", eventDestinationInfo=" + e.getDestinationInfo());
        }
        try {
            switch (e.getDestinationInfo().getEventGenerator()) {
                case INPUT: {
                    LogicalInput input = (LogicalInput)this.inputsMap.get(e.getDestinationInfo().getEdgeVertexName());
                    if (input != null) {
                        ((InputFrameworkInterface)input).handleEvents(Collections.singletonList(e.getEvent()));
                        break;
                    }
                    throw new TezUncheckedException("Unhandled event for invalid target: " + e);
                }
                case OUTPUT: {
                    LogicalOutput output = (LogicalOutput)this.outputsMap.get(e.getDestinationInfo().getEdgeVertexName());
                    if (output != null) {
                        ((OutputFrameworkInterface)output).handleEvents(Collections.singletonList(e.getEvent()));
                        break;
                    }
                    throw new TezUncheckedException("Unhandled event for invalid target: " + e);
                }
                case PROCESSOR: {
                    this.processor.handleEvents(Collections.singletonList(e.getEvent()));
                    break;
                }
                case SYSTEM: {
                    LOG.warn("Trying to send a System event in a Task: " + e);
                }
            }
        }
        catch (Throwable t) {
            LOG.warn("Failed to handle event", t);
            this.registerError();
            EventMetaData sourceInfo = new EventMetaData(e.getDestinationInfo().getEventGenerator(), this.taskSpec.getVertexName(), e.getDestinationInfo().getEdgeVertexName(), this.getTaskAttemptID());
            this.setFrameworkCounters();
            this.tezUmbilical.signalFailure(this.getTaskAttemptID(), TaskFailureType.NON_FATAL, t, ExceptionUtils.getStackTrace((Throwable)t), sourceInfo);
            return false;
        }
        return true;
    }

    @Override
    public int getMaxEventsToHandle() {
        return Math.max(0, this.maxEventBacklog - this.eventsToBeProcessed.size());
    }

    @Override
    public synchronized void handleEvents(Collection<TezEvent> events) {
        if (events == null || events.isEmpty()) {
            return;
        }
        this.eventCounter.addAndGet(events.size());
        if (LOG.isDebugEnabled()) {
            LOG.debug("Received events to be processed by task, taskAttemptId=" + this.taskSpec.getTaskAttemptID() + ", eventCount=" + events.size() + ", newEventCounter=" + this.eventCounter.get());
        }
        this.eventsToBeProcessed.addAll(events);
    }

    @Override
    public synchronized void abortTask() {
        if (this.processor != null) {
            this.processor.abort();
        }
    }

    private void startRouterThread() {
        this.eventRouterThread = new Thread((Runnable)new RunnableWithNdc(){

            public void runInternal() {
                while (!LogicalIOProcessorRuntimeTask.this.isTaskDone() && !Thread.currentThread().isInterrupted()) {
                    try {
                        TezEvent e = LogicalIOProcessorRuntimeTask.this.eventsToBeProcessed.take();
                        if (e == null || LogicalIOProcessorRuntimeTask.this.handleEvent(e)) continue;
                        LOG.warn("Stopping Event Router thread as failed to handle event: " + e);
                        return;
                    }
                    catch (InterruptedException e) {
                        if (!LogicalIOProcessorRuntimeTask.this.isTaskDone()) {
                            LOG.warn("Event Router thread interrupted. Returning.");
                        }
                        Thread.currentThread().interrupt();
                        return;
                    }
                }
            }
        });
        this.eventRouterThread.setName("TezTaskEventRouter{" + this.taskSpec.getTaskAttemptID().toString() + "}");
        this.eventRouterThread.start();
    }

    private void maybeResetInterruptStatus() {
        if (!Thread.currentThread().isInterrupted()) {
            Thread.currentThread().interrupt();
        }
    }

    private void closeContexts() throws IOException {
        this.closeContext(this.inputContextMap);
        this.closeContext(this.outputContextMap);
        this.closeContext((TaskContext)this.processorContext);
    }

    private void closeContext(Map<String, ? extends TaskContext> contextMap) throws IOException {
        if (contextMap == null) {
            return;
        }
        for (TaskContext taskContext : contextMap.values()) {
            this.closeContext(taskContext);
        }
        contextMap.clear();
    }

    private void closeContext(TaskContext context) throws IOException {
        if (context != null && context instanceof Closeable) {
            ((Closeable)context).close();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Loose catch block
     */
    public void cleanup() throws InterruptedException {
        LOG.info("Final Counters for " + this.taskSpec.getTaskAttemptID() + ": " + this.getCounters().toShortString());
        this.setTaskDone();
        if (this.eventRouterThread != null) {
            this.eventRouterThread.interrupt();
            LOG.info("Joining on EventRouter");
            try {
                this.eventRouterThread.join();
            }
            catch (InterruptedException e) {
                LOG.info("Ignoring interrupt while waiting for the router thread to die");
                Thread.currentThread().interrupt();
            }
            this.eventRouterThread = null;
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("Processor closed={}", (Object)this.processorClosed);
            LOG.debug("Num of inputs to be closed={}", (Object)this.initializedInputs.size());
            LOG.debug("Num of outputs to be closed={}", (Object)this.initializedOutputs.size());
        }
        Iterator<Map.Entry<String, LogicalInput>> inputIterator = this.initializedInputs.entrySet().iterator();
        while (inputIterator.hasNext()) {
            Map.Entry<String, LogicalInput> entry = inputIterator.next();
            String srcVertexName = entry.getKey();
            inputIterator.remove();
            try {
                ((InputFrameworkInterface)entry.getValue()).close();
                this.maybeResetInterruptStatus();
            }
            catch (InterruptedException ie) {
                LOG.info("Resetting interrupt status for input with srcVertexName={}", (Object)srcVertexName);
                Thread.currentThread().interrupt();
                LOG.info("Closed input for vertex={}, sourceVertex={}, interruptedStatus={}", new Object[]{this.processor.getContext().getTaskVertexName(), srcVertexName, Thread.currentThread().isInterrupted()});
                continue;
            }
            catch (Throwable e) {
                LOG.warn("Ignoring exception when closing input {}(cleanup). Exception class={}, message={}", new Object[]{srcVertexName, e.getClass().getName(), e.getMessage()});
                {
                    catch (Throwable throwable) {
                        LOG.info("Closed input for vertex={}, sourceVertex={}, interruptedStatus={}", new Object[]{this.processor.getContext().getTaskVertexName(), srcVertexName, Thread.currentThread().isInterrupted()});
                        throw throwable;
                    }
                }
                LOG.info("Closed input for vertex={}, sourceVertex={}, interruptedStatus={}", new Object[]{this.processor.getContext().getTaskVertexName(), srcVertexName, Thread.currentThread().isInterrupted()});
                continue;
            }
            LOG.info("Closed input for vertex={}, sourceVertex={}, interruptedStatus={}", new Object[]{this.processor.getContext().getTaskVertexName(), srcVertexName, Thread.currentThread().isInterrupted()});
        }
        Iterator<Map.Entry<String, LogicalOutput>> outputIterator = this.initializedOutputs.entrySet().iterator();
        while (outputIterator.hasNext()) {
            Map.Entry<String, LogicalOutput> entry = outputIterator.next();
            String destVertexName = entry.getKey();
            outputIterator.remove();
            try {
                ((OutputFrameworkInterface)entry.getValue()).close();
                this.maybeResetInterruptStatus();
            }
            catch (InterruptedException ie) {
                LOG.info("Resetting interrupt status for output with destVertexName={}", (Object)destVertexName);
                Thread.currentThread().interrupt();
                LOG.info("Closed input for vertex={}, sourceVertex={}, interruptedStatus={}", new Object[]{this.processor.getContext().getTaskVertexName(), destVertexName, Thread.currentThread().isInterrupted()});
                continue;
            }
            catch (Throwable e) {
                LOG.warn("Ignoring exception when closing output {}(cleanup). Exception class={}, message={}", new Object[]{destVertexName, e.getClass().getName(), e.getMessage()});
                {
                    catch (Throwable throwable) {
                        LOG.info("Closed input for vertex={}, sourceVertex={}, interruptedStatus={}", new Object[]{this.processor.getContext().getTaskVertexName(), destVertexName, Thread.currentThread().isInterrupted()});
                        throw throwable;
                    }
                }
                LOG.info("Closed input for vertex={}, sourceVertex={}, interruptedStatus={}", new Object[]{this.processor.getContext().getTaskVertexName(), destVertexName, Thread.currentThread().isInterrupted()});
                continue;
            }
            LOG.info("Closed input for vertex={}, sourceVertex={}, interruptedStatus={}", new Object[]{this.processor.getContext().getTaskVertexName(), destVertexName, Thread.currentThread().isInterrupted()});
        }
        if (LOG.isDebugEnabled()) {
            this.printThreads();
        }
        if (!this.processorClosed && this.processor != null) {
            try {
                this.processorClosed = true;
                this.processor.close();
                LOG.info("Closed processor for vertex={}, index={}, interruptedStatus={}", new Object[]{this.processor.getContext().getTaskVertexName(), this.processor.getContext().getTaskVertexIndex(), Thread.currentThread().isInterrupted()});
                this.maybeResetInterruptStatus();
            }
            catch (InterruptedException ie) {
                LOG.info("Resetting interrupt for processor");
                Thread.currentThread().interrupt();
            }
            catch (Throwable e) {
                LOG.warn("Ignoring Exception when closing processor(cleanup). Exception class={}, message={}" + e.getClass().getName(), (Object)e.getMessage());
            }
        }
        try {
            this.closeContexts();
            this.cleanupStructures();
        }
        catch (IOException e) {
            LOG.info("Error while cleaning up contexts ", (Throwable)e);
        }
    }

    private void cleanupStructures() {
        if (this.initializerExecutor != null && !this.initializerExecutor.isShutdown()) {
            this.initializerExecutor.shutdownNow();
        }
        this.inputsMap.clear();
        this.outputsMap.clear();
        this.initializedInputs.clear();
        this.initializedOutputs.clear();
        this.inputContextMap.clear();
        this.outputContextMap.clear();
        if (!this.tezConf.getBoolean("tez.local.mode", false)) {
            this.inputSpecs.clear();
            this.outputSpecs.clear();
            if (this.groupInputSpecs != null) {
                this.groupInputSpecs.clear();
            }
        }
        if (this.groupInputsMap != null) {
            this.groupInputsMap.clear();
            this.groupInputsMap = null;
        }
        this.processor = null;
        this.processorContext = null;
        this.runInputMap.clear();
        this.runOutputMap.clear();
        this.eventsToBeProcessed.clear();
        this.inputReadyTracker = null;
        this.objectRegistry = null;
    }

    void printThreads() {
        long[] threadIds;
        ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();
        long[] arr$ = threadIds = threadMXBean.getAllThreadIds();
        int len$ = arr$.length;
        for (int i$ = 0; i$ < len$; ++i$) {
            Long id = arr$[i$];
            ThreadInfo threadInfo = threadMXBean.getThreadInfo(id);
            if (threadInfo == null || !LOG.isDebugEnabled()) continue;
            LOG.debug("ThreadId : " + id + ", name=" + threadInfo.getThreadName());
        }
    }

    @InterfaceAudience.Private
    @VisibleForTesting
    public Collection<InputContext> getInputContexts() {
        return this.inputContextMap.values();
    }

    @InterfaceAudience.Private
    @VisibleForTesting
    public Collection<OutputContext> getOutputContexts() {
        return this.outputContextMap.values();
    }

    @InterfaceAudience.Private
    @VisibleForTesting
    public ProcessorContext getProcessorContext() {
        return this.processorContext;
    }

    @InterfaceAudience.Private
    @VisibleForTesting
    public LogicalIOProcessor getProcessor() {
        return this.processor;
    }

    @InterfaceAudience.Private
    @VisibleForTesting
    public Map<String, LogicalInput> getInputs() {
        return this.inputsMap;
    }

    @InterfaceAudience.Private
    @VisibleForTesting
    public Map<String, LogicalOutput> getOutputs() {
        return this.outputsMap;
    }

    @InterfaceAudience.Private
    public HadoopShim getHadoopShim() {
        return this.hadoopShim;
    }

    private class InitializeOutputCallable
    extends CallableWithNdc<Void> {
        private final OutputSpec outputSpec;
        private final int outputIndex;

        public InitializeOutputCallable(OutputSpec outputSpec, int outputIndex) {
            this.outputSpec = outputSpec;
            this.outputIndex = outputIndex;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        protected Void callInternal() throws Exception {
            String oldThreadName = Thread.currentThread().getName();
            try {
                Thread.currentThread().setName(oldThreadName + " Initialize: {" + this.outputSpec.getDestinationVertexName() + "}");
                Void void_ = this._callInternal();
                return void_;
            }
            finally {
                Thread.currentThread().setName(oldThreadName);
            }
        }

        protected Void _callInternal() throws Exception {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Initializing Output using OutputSpec: " + this.outputSpec);
            }
            String edgeName = this.outputSpec.getDestinationVertexName();
            OutputContext outputContext = LogicalIOProcessorRuntimeTask.this.createOutputContext(this.outputSpec, this.outputIndex);
            LogicalOutput output = LogicalIOProcessorRuntimeTask.this.createOutput(this.outputSpec, outputContext);
            LogicalIOProcessorRuntimeTask.this.outputsMap.put(edgeName, output);
            LogicalIOProcessorRuntimeTask.this.outputContextMap.put(edgeName, outputContext);
            List events = ((OutputFrameworkInterface)output).initialize();
            LogicalIOProcessorRuntimeTask.this.sendTaskGeneratedEvents(events, EventMetaData.EventProducerConsumerType.OUTPUT, outputContext.getTaskVertexName(), outputContext.getDestinationVertexName(), LogicalIOProcessorRuntimeTask.this.taskSpec.getTaskAttemptID());
            LogicalIOProcessorRuntimeTask.this.initializedOutputs.put(edgeName, output);
            if (LOG.isDebugEnabled()) {
                LOG.debug("Initialized Output with dest edge: " + edgeName);
            }
            LogicalIOProcessorRuntimeTask.this.initializedOutputs.put(edgeName, output);
            return null;
        }
    }

    private static class StartInputCallable
    extends CallableWithNdc<Void> {
        private final LogicalInput input;
        private final String srcVertexName;

        public StartInputCallable(LogicalInput input, String srcVertexName) {
            this.input = input;
            this.srcVertexName = srcVertexName;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        protected Void callInternal() throws Exception {
            String oldThreadName = Thread.currentThread().getName();
            try {
                Thread.currentThread().setName(oldThreadName + " Start: {" + this.srcVertexName + "}");
                Void void_ = this._callInternal();
                return void_;
            }
            finally {
                Thread.currentThread().setName(oldThreadName);
            }
        }

        protected Void _callInternal() throws Exception {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Starting Input with src edge: " + this.srcVertexName);
            }
            this.input.start();
            LOG.info("Started Input with src edge: " + this.srcVertexName);
            return null;
        }
    }

    private class InitializeInputCallable
    extends CallableWithNdc<Void> {
        private final InputSpec inputSpec;
        private final int inputIndex;

        public InitializeInputCallable(InputSpec inputSpec, int inputIndex) {
            this.inputSpec = inputSpec;
            this.inputIndex = inputIndex;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        protected Void callInternal() throws Exception {
            String oldThreadName = Thread.currentThread().getName();
            try {
                Thread.currentThread().setName(oldThreadName + " Initialize: {" + this.inputSpec.getSourceVertexName() + "}");
                Void void_ = this._callInternal();
                return void_;
            }
            finally {
                Thread.currentThread().setName(oldThreadName);
            }
        }

        protected Void _callInternal() throws Exception {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Initializing Input using InputSpec: " + this.inputSpec);
            }
            String edgeName = this.inputSpec.getSourceVertexName();
            InputContext inputContext = LogicalIOProcessorRuntimeTask.this.createInputContext(LogicalIOProcessorRuntimeTask.this.inputsMap, this.inputSpec, this.inputIndex);
            LogicalInput input = LogicalIOProcessorRuntimeTask.this.createInput(this.inputSpec, inputContext);
            LogicalIOProcessorRuntimeTask.this.inputsMap.put(edgeName, input);
            LogicalIOProcessorRuntimeTask.this.inputContextMap.put(edgeName, inputContext);
            List events = ((InputFrameworkInterface)input).initialize();
            LogicalIOProcessorRuntimeTask.this.sendTaskGeneratedEvents(events, EventMetaData.EventProducerConsumerType.INPUT, inputContext.getTaskVertexName(), inputContext.getSourceVertexName(), LogicalIOProcessorRuntimeTask.this.taskSpec.getTaskAttemptID());
            LogicalIOProcessorRuntimeTask.this.initializedInputs.put(edgeName, input);
            if (LOG.isDebugEnabled()) {
                LOG.debug("Initialized Input with src edge: " + edgeName);
            }
            LogicalIOProcessorRuntimeTask.this.initializedInputs.put(edgeName, input);
            return null;
        }
    }
}

