/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tez.test.dag;

import com.google.common.collect.Lists;
import com.google.common.primitives.Ints;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.tez.common.TezUtils;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.Edge;
import org.apache.tez.dag.api.EdgeProperty;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.OutputDescriptor;
import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.api.Vertex;
import org.apache.tez.dag.api.VertexManagerPlugin;
import org.apache.tez.dag.api.VertexManagerPluginContext;
import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
import org.apache.tez.dag.api.client.VertexStatus;
import org.apache.tez.runtime.api.AbstractLogicalInput;
import org.apache.tez.runtime.api.AbstractLogicalOutput;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.InputContext;
import org.apache.tez.runtime.api.InputInitializer;
import org.apache.tez.runtime.api.InputInitializerContext;
import org.apache.tez.runtime.api.MemoryUpdateCallback;
import org.apache.tez.runtime.api.OutputCommitter;
import org.apache.tez.runtime.api.OutputCommitterContext;
import org.apache.tez.runtime.api.OutputContext;
import org.apache.tez.runtime.api.Reader;
import org.apache.tez.runtime.api.TaskAttemptIdentifier;
import org.apache.tez.runtime.api.Writer;
import org.apache.tez.runtime.api.events.InputDataInformationEvent;
import org.apache.tez.runtime.api.events.InputInitializerEvent;
import org.apache.tez.runtime.api.events.VertexManagerEvent;
import org.apache.tez.test.TestInput;
import org.apache.tez.test.TestOutput;
import org.apache.tez.test.TestProcessor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class MultiAttemptDAG {
    private static final Logger LOG = LoggerFactory.getLogger(MultiAttemptDAG.class);
    static Resource defaultResource = Resource.newInstance((int)100, (int)0);
    public static String MULTI_ATTEMPT_DAG_VERTEX_NUM_TASKS = "tez.multi-attempt-dag.vertex.num-tasks";
    public static int MULTI_ATTEMPT_DAG_VERTEX_NUM_TASKS_DEFAULT = 2;
    public static String MULTI_ATTEMPT_DAG_USE_FAILING_COMMITTER = "tez.multi-attempt-dag.use-failing-committer";
    public static boolean MULTI_ATTEMPT_DAG_USE_FAILING_COMMITTER_DEFAULT = false;

    private MultiAttemptDAG() {
    }

    public static DAG createDAG(String name, Configuration conf) throws Exception {
        UserPayload payload = UserPayload.create(null);
        int taskCount = MULTI_ATTEMPT_DAG_VERTEX_NUM_TASKS_DEFAULT;
        if (conf != null) {
            taskCount = conf.getInt(MULTI_ATTEMPT_DAG_VERTEX_NUM_TASKS, MULTI_ATTEMPT_DAG_VERTEX_NUM_TASKS_DEFAULT);
            payload = TezUtils.createUserPayloadFromConf((Configuration)conf);
        }
        DAG dag = DAG.create((String)name);
        Vertex v1 = Vertex.create((String)"v1", (ProcessorDescriptor)TestProcessor.getProcDesc(payload), (int)taskCount, (Resource)defaultResource);
        Vertex v2 = Vertex.create((String)"v2", (ProcessorDescriptor)TestProcessor.getProcDesc(payload), (int)taskCount, (Resource)defaultResource);
        Vertex v3 = Vertex.create((String)"v3", (ProcessorDescriptor)TestProcessor.getProcDesc(payload), (int)taskCount, (Resource)defaultResource);
        v1.setVertexManagerPlugin((VertexManagerPluginDescriptor)VertexManagerPluginDescriptor.create((String)FailOnAttemptVertexManagerPlugin.class.getName()).setUserPayload(UserPayload.create((ByteBuffer)ByteBuffer.wrap("1".getBytes()))));
        v2.setVertexManagerPlugin((VertexManagerPluginDescriptor)VertexManagerPluginDescriptor.create((String)FailOnAttemptVertexManagerPlugin.class.getName()).setUserPayload(UserPayload.create((ByteBuffer)ByteBuffer.wrap("2".getBytes()))));
        v3.setVertexManagerPlugin((VertexManagerPluginDescriptor)VertexManagerPluginDescriptor.create((String)FailOnAttemptVertexManagerPlugin.class.getName()).setUserPayload(UserPayload.create((ByteBuffer)ByteBuffer.wrap("3".getBytes()))));
        dag.addVertex(v1).addVertex(v2).addVertex(v3);
        dag.addEdge(Edge.create((Vertex)v1, (Vertex)v2, (EdgeProperty)EdgeProperty.create((EdgeProperty.DataMovementType)EdgeProperty.DataMovementType.SCATTER_GATHER, (EdgeProperty.DataSourceType)EdgeProperty.DataSourceType.PERSISTED, (EdgeProperty.SchedulingType)EdgeProperty.SchedulingType.SEQUENTIAL, (OutputDescriptor)TestOutput.getOutputDesc(payload), (InputDescriptor)TestInput.getInputDesc(payload))));
        dag.addEdge(Edge.create((Vertex)v2, (Vertex)v3, (EdgeProperty)EdgeProperty.create((EdgeProperty.DataMovementType)EdgeProperty.DataMovementType.SCATTER_GATHER, (EdgeProperty.DataSourceType)EdgeProperty.DataSourceType.PERSISTED, (EdgeProperty.SchedulingType)EdgeProperty.SchedulingType.SEQUENTIAL, (OutputDescriptor)TestOutput.getOutputDesc(payload), (InputDescriptor)TestInput.getInputDesc(payload))));
        return dag;
    }

    public static DAG createDAG(Configuration conf) throws Exception {
        return MultiAttemptDAG.createDAG("SimpleVTestDAG", conf);
    }

    public static class NoOpOutput
    extends AbstractLogicalOutput {
        public NoOpOutput(OutputContext outputContext, int numPhysicalOutputs) {
            super(outputContext, numPhysicalOutputs);
        }

        public List<Event> initialize() throws Exception {
            this.getContext().requestInitialMemory(1L, new MemoryUpdateCallback(){

                public void memoryAssigned(long assignedSize) {
                }
            });
            return null;
        }

        public void start() throws Exception {
        }

        public Writer getWriter() {
            return null;
        }

        public void handleEvents(List<Event> outputEvents) {
        }

        public List<Event> close() throws Exception {
            return null;
        }
    }

    public static class NoOpInput
    extends AbstractLogicalInput {
        public NoOpInput(InputContext inputContext, int numPhysicalInputs) {
            super(inputContext, numPhysicalInputs);
        }

        public List<Event> initialize() throws Exception {
            this.getContext().requestInitialMemory(1L, new MemoryUpdateCallback(){

                public void memoryAssigned(long assignedSize) {
                }
            });
            return null;
        }

        public void start() throws Exception {
        }

        public Reader getReader() {
            return null;
        }

        public void handleEvents(List<Event> inputEvents) {
        }

        public List<Event> close() throws Exception {
            return null;
        }
    }

    public static class FailingInputInitializer
    extends InputInitializer {
        public FailingInputInitializer(InputInitializerContext initializerContext) {
            super(initializerContext);
        }

        public List<Event> initialize() throws Exception {
            try {
                Thread.sleep(2000L);
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
            if (this.getContext().getDAGAttemptNumber() == 1) {
                LOG.info("Shutting down the AM in 1st attempt");
                Runtime.getRuntime().halt(-1);
            }
            return null;
        }

        public void handleInputInitializerEvent(List<InputInitializerEvent> events) {
            throw new UnsupportedOperationException("Not supported");
        }
    }

    public static class TestRootInputInitializer
    extends InputInitializer {
        public TestRootInputInitializer(InputInitializerContext initializerContext) {
            super(initializerContext);
        }

        public List<Event> initialize() throws Exception {
            ArrayList<Event> events = new ArrayList<Event>();
            events.add((Event)InputDataInformationEvent.createWithSerializedPayload((int)0, (ByteBuffer)ByteBuffer.allocate(0)));
            return events;
        }

        public void handleInputInitializerEvent(List<InputInitializerEvent> events) {
            throw new UnsupportedOperationException("Not supported");
        }
    }

    public static class FailingOutputCommitter
    extends OutputCommitter {
        boolean failOnCommit = false;

        public FailingOutputCommitter(OutputCommitterContext committerContext) {
            super(committerContext);
        }

        public void initialize() throws Exception {
            FailingOutputCommitterConfig config = new FailingOutputCommitterConfig();
            config.fromUserPayload(this.getContext().getOutputUserPayload().deepCopyAsArray());
            this.failOnCommit = config.failOnCommit;
        }

        public void setupOutput() {
        }

        public void commitOutput() {
            if (this.failOnCommit) {
                LOG.info("Committer causing AM to shutdown");
                Runtime.getRuntime().halt(-1);
            }
        }

        public void abortOutput(VertexStatus.State finalState) {
        }

        public static class FailingOutputCommitterConfig {
            boolean failOnCommit;

            public FailingOutputCommitterConfig() {
                this(false);
            }

            public FailingOutputCommitterConfig(boolean failOnCommit) {
                this.failOnCommit = failOnCommit;
            }

            public byte[] toUserPayload() {
                return Ints.toByteArray((int)(this.failOnCommit ? 1 : 0));
            }

            public void fromUserPayload(byte[] userPayload) {
                int failInt = Ints.fromByteArray((byte[])userPayload);
                this.failOnCommit = failInt != 0;
            }
        }
    }

    public static class FailOnAttemptVertexManagerPlugin
    extends VertexManagerPlugin {
        private int numSourceTasks = 0;
        private final AtomicInteger numCompletions = new AtomicInteger();
        private boolean tasksScheduled = false;

        public FailOnAttemptVertexManagerPlugin(VertexManagerPluginContext context) {
            super(context);
        }

        public void initialize() {
            for (String input : this.getContext().getInputVertexEdgeProperties().keySet()) {
                LOG.info("Adding sourceTasks for Vertex " + input);
                this.numSourceTasks += this.getContext().getVertexNumTasks(input);
                LOG.info("Current numSourceTasks=" + this.numSourceTasks);
            }
        }

        public void onVertexStarted(List<TaskAttemptIdentifier> completions) {
            if (completions != null) {
                LOG.info("Received completion events on vertexStarted, completions=" + completions.size());
                this.numCompletions.addAndGet(completions.size());
            }
            this.maybeScheduleTasks();
        }

        private synchronized void maybeScheduleTasks() {
            if (this.numCompletions.get() >= this.numSourceTasks && !this.tasksScheduled) {
                this.tasksScheduled = true;
                String payload = new String(this.getContext().getUserPayload().deepCopyAsArray());
                int successAttemptId = Integer.parseInt(payload);
                LOG.info("Checking whether to crash AM or schedule tasks, vertex: " + this.getContext().getVertexName() + ", successfulAttemptID=" + successAttemptId + ", currentAttempt=" + this.getContext().getDAGAttemptNumber());
                if (successAttemptId > this.getContext().getDAGAttemptNumber()) {
                    Runtime.getRuntime().halt(-1);
                } else {
                    LOG.info("Scheduling tasks for vertex=" + this.getContext().getVertexName());
                    int numTasks = this.getContext().getVertexNumTasks(this.getContext().getVertexName());
                    ArrayList scheduledTasks = Lists.newArrayListWithCapacity((int)numTasks);
                    for (int i = 0; i < numTasks; ++i) {
                        scheduledTasks.add(VertexManagerPluginContext.ScheduleTaskRequest.create((int)i, null));
                    }
                    this.getContext().scheduleTasks((List)scheduledTasks);
                }
            }
        }

        public void onSourceTaskCompleted(TaskAttemptIdentifier attempt) {
            LOG.info("Received completion events for source task, vertex=" + attempt.getTaskIdentifier().getVertexIdentifier().getName() + ", taskIdx=" + attempt.getTaskIdentifier().getIdentifier());
            this.numCompletions.incrementAndGet();
            this.maybeScheduleTasks();
        }

        public void onVertexManagerEventReceived(VertexManagerEvent vmEvent) {
        }

        public void onRootVertexInitialized(String inputName, InputDescriptor inputDescriptor, List<Event> events) {
            ArrayList<InputDataInformationEvent> inputInfoEvents = new ArrayList<InputDataInformationEvent>();
            for (Event event : events) {
                if (!(event instanceof InputDataInformationEvent)) continue;
                inputInfoEvents.add((InputDataInformationEvent)event);
            }
            this.getContext().addRootInputEvents(inputName, inputInfoEvents);
        }
    }
}

