/*
 * Decompiled with CFR 0.152.
 */
package com.datatorrent.stram;

import com.datatorrent.api.DAG;
import com.datatorrent.api.LocalMode;
import com.datatorrent.api.Operator;
import com.datatorrent.bufferserver.server.Server;
import com.datatorrent.bufferserver.storage.DiskStorage;
import com.datatorrent.bufferserver.storage.Storage;
import com.datatorrent.common.util.AsyncFSStorageAgent;
import com.datatorrent.netlet.EventLoop;
import com.datatorrent.netlet.Listener;
import com.datatorrent.stram.StreamingContainerAgent;
import com.datatorrent.stram.StreamingContainerManager;
import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol;
import com.datatorrent.stram.engine.Node;
import com.datatorrent.stram.engine.OperatorContext;
import com.datatorrent.stram.engine.StreamingContainer;
import com.datatorrent.stram.engine.WindowGenerator;
import com.datatorrent.stram.plan.logical.LogicalPlan;
import com.datatorrent.stram.plan.physical.PTOperator;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.net.InetSocketAddress;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.lang3.SerializationUtils;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.ipc.ProtocolSignature;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StramLocalCluster
implements Runnable,
LocalMode.Controller {
    private static final Logger LOG = LoggerFactory.getLogger(StramLocalCluster.class);
    private static File CLUSTER_WORK_DIR = new File("target", StramLocalCluster.class.getName());
    private static final String LOCALHOST_PROPERTY_KEY = "org.apache.apex.stram.StramLocalCluster.hostname";
    private static final String LOCALHOST = System.getProperty("org.apache.apex.stram.StramLocalCluster.hostname", "localhost");
    protected final StreamingContainerManager dnmgr;
    private final UmbilicalProtocolLocalImpl umbilical;
    private InetSocketAddress bufferServerAddress;
    private boolean perContainerBufferServer;
    private Server bufferServer = null;
    private final Map<String, LocalStreamingContainer> childContainers = new ConcurrentHashMap<String, LocalStreamingContainer>();
    private int containerSeq = 0;
    private boolean appDone = false;
    private final Map<String, StreamingContainer> injectShutdown = new ConcurrentHashMap<String, StreamingContainer>();
    private boolean heartbeatMonitoringEnabled = true;
    private Callable<Boolean> exitCondition;
    private MockComponentFactory mockComponentFactory;

    public StramLocalCluster(LogicalPlan dag) throws IOException, ClassNotFoundException {
        dag.validate();
        StramLocalCluster.cloneLogicalPlan(dag);
        String pathUri = CLUSTER_WORK_DIR.toURI().toString();
        try {
            FileContext.getLocalFSFileContext().delete(new Path(pathUri), true);
        }
        catch (IllegalArgumentException e) {
            throw e;
        }
        catch (IOException e) {
            throw new RuntimeException("could not cleanup test dir", e);
        }
        dag.getAttributes().put(LogicalPlan.APPLICATION_ID, (Object)("app_local_" + System.currentTimeMillis()));
        if (dag.getAttributes().get(LogicalPlan.APPLICATION_PATH) == null) {
            dag.getAttributes().put(LogicalPlan.APPLICATION_PATH, (Object)pathUri);
        }
        if (dag.getAttributes().get(OperatorContext.STORAGE_AGENT) == null) {
            dag.setAttribute(OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(new Path(pathUri, LogicalPlan.SUBDIR_CHECKPOINTS).toString(), null));
        }
        this.dnmgr = new StreamingContainerManager(dag);
        this.umbilical = new UmbilicalProtocolLocalImpl();
        if (!this.perContainerBufferServer) {
            StreamingContainer.eventloop.start();
            this.bufferServer = new Server(0, 0x100000, 8);
            this.bufferServer.setSpoolStorage((Storage)new DiskStorage());
            this.bufferServerAddress = InetSocketAddress.createUnresolved(LOCALHOST, this.bufferServer.run((EventLoop)StreamingContainer.eventloop).getPort());
            LOG.info("Buffer server started: {}", (Object)this.bufferServerAddress);
        }
    }

    public static LogicalPlan cloneLogicalPlan(LogicalPlan lp) throws IOException, ClassNotFoundException {
        ByteArrayOutputStream bos = new ByteArrayOutputStream();
        LogicalPlan.write(lp, bos);
        LOG.debug("serialized size: {}", (Object)bos.toByteArray().length);
        bos.flush();
        ByteArrayInputStream bis = new ByteArrayInputStream(bos.toByteArray());
        return LogicalPlan.read(bis);
    }

    LocalStreamingContainer getContainer(String id) {
        return this.childContainers.get(id);
    }

    public StreamingContainerManager getStreamingContainerManager() {
        return this.dnmgr;
    }

    public DAG getDAG() {
        return this.dnmgr.getPhysicalPlan().getLogicalPlan();
    }

    public StramLocalCluster(LogicalPlan dag, MockComponentFactory mcf) throws Exception {
        this(dag);
        this.mockComponentFactory = mcf;
    }

    void failContainer(StreamingContainer c) {
        this.injectShutdown.put(c.getContainerId(), c);
        c.triggerHeartbeat();
        LOG.info("Container {} failed, launching new container.", (Object)c.getContainerId());
        this.dnmgr.scheduleContainerRestart(c.getContainerId());
        this.childContainers.remove(c.getContainerId());
    }

    public PTOperator findByLogicalNode(LogicalPlan.OperatorMeta logicalNode) {
        List<PTOperator> nodes = this.dnmgr.getPhysicalPlan().getOperators(logicalNode);
        if (nodes.isEmpty()) {
            return null;
        }
        return nodes.get(0);
    }

    List<PTOperator> getPlanOperators(LogicalPlan.OperatorMeta logicalNode) {
        return this.dnmgr.getPhysicalPlan().getOperators(logicalNode);
    }

    public LocalStreamingContainer getContainer(PTOperator planOperator) {
        LocalStreamingContainer container;
        String cid = planOperator.getContainer().getExternalId();
        if (cid != null && (container = this.getContainer(cid)) != null && container.getNodeContext(planOperator.getId()) != null) {
            return container;
        }
        return null;
    }

    StreamingContainerAgent getContainerAgent(StreamingContainer c) {
        return this.dnmgr.getContainerAgent(c.getContainerId());
    }

    public void runAsync() {
        new Thread((Runnable)this, "master").start();
    }

    public void shutdown() {
        this.appDone = true;
    }

    public boolean isFinished() {
        return this.appDone;
    }

    public void setHeartbeatMonitoringEnabled(boolean enabled) {
        this.heartbeatMonitoringEnabled = enabled;
    }

    public void setPerContainerBufferServer(boolean perContainerBufferServer) {
        this.perContainerBufferServer = perContainerBufferServer;
    }

    public void setExitCondition(Callable<Boolean> exitCondition) {
        this.exitCondition = exitCondition;
    }

    @Override
    public void run() {
        this.run(0L);
    }

    public void run(long runMillis) {
        long endMillis = System.currentTimeMillis() + runMillis;
        LinkedList containerThreads = new LinkedList();
        while (!this.appDone) {
            block16: {
                for (String containerIdStr : this.dnmgr.containerStopRequests.values()) {
                    StreamingContainer c = this.childContainers.get(containerIdStr);
                    if (c != null) {
                        StreamingContainerUmbilicalProtocol.ContainerHeartbeatResponse r = new StreamingContainerUmbilicalProtocol.ContainerHeartbeatResponse();
                        r.shutdown = true;
                        c.processHeartbeatResponse(r);
                    }
                    this.dnmgr.containerStopRequests.remove(containerIdStr);
                    LOG.info("Container {} restart.", (Object)containerIdStr);
                    this.dnmgr.scheduleContainerRestart(containerIdStr);
                }
                while (!this.dnmgr.containerStartRequests.isEmpty()) {
                    StreamingContainerAgent.ContainerStartRequest cdr = this.dnmgr.containerStartRequests.poll();
                    if (cdr == null) continue;
                    new LocalStreamingContainerLauncher(cdr, containerThreads);
                }
                if (this.heartbeatMonitoringEnabled) {
                    this.dnmgr.monitorHeartbeat();
                }
                if (this.childContainers.isEmpty() && this.dnmgr.containerStartRequests.isEmpty()) {
                    this.appDone = true;
                }
                if (runMillis > 0L && System.currentTimeMillis() > endMillis) {
                    this.appDone = true;
                }
                try {
                    if (this.exitCondition == null || !this.exitCondition.call().booleanValue()) break block16;
                    this.appDone = true;
                }
                catch (Exception ex) {
                    break;
                }
            }
            if (Thread.interrupted()) break;
            if (this.appDone) continue;
            try {
                Thread.sleep(1000L);
            }
            catch (InterruptedException e) {
                LOG.info("Sleep interrupted " + e.getMessage());
                break;
            }
        }
        for (LocalStreamingContainer lsc : this.childContainers.values()) {
            this.injectShutdown.put(lsc.getContainerId(), lsc);
            lsc.triggerHeartbeat();
        }
        for (Thread thread : containerThreads) {
            try {
                thread.join(1000L);
            }
            catch (InterruptedException e) {
                LOG.warn("Container thread didn't finish {}", (Object)thread.getName());
            }
        }
        this.dnmgr.teardown();
        LOG.info("Application finished.");
        if (!this.perContainerBufferServer) {
            StreamingContainer.eventloop.stop((Listener.ServerListener)this.bufferServer);
            StreamingContainer.eventloop.stop();
        }
    }

    private class LocalStreamingContainerLauncher
    implements Runnable {
        final String containerId;
        final LocalStreamingContainer child;

        private LocalStreamingContainerLauncher(StreamingContainerAgent.ContainerStartRequest cdr, List<Thread> containerThreads) {
            this.containerId = "container-" + StramLocalCluster.this.containerSeq++;
            WindowGenerator wingen = null;
            if (StramLocalCluster.this.mockComponentFactory != null) {
                wingen = StramLocalCluster.this.mockComponentFactory.setupWindowGenerator();
            }
            this.child = new LocalStreamingContainer(this.containerId, StramLocalCluster.this.umbilical, wingen);
            StreamingContainerManager.ContainerResource cr = new StreamingContainerManager.ContainerResource(cdr.container.getResourceRequestPriority(), this.containerId, "localhost", cdr.container.getRequiredMemoryMB(), cdr.container.getRequiredVCores(), null);
            StreamingContainerAgent sca = StramLocalCluster.this.dnmgr.assignContainer(cr, StramLocalCluster.this.perContainerBufferServer ? null : StramLocalCluster.this.bufferServerAddress);
            if (sca != null) {
                StramLocalCluster.this.childContainers.put(this.containerId, this.child);
                Thread launchThread = new Thread((Runnable)this, this.containerId);
                containerThreads.add(launchThread);
                launchThread.start();
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            try {
                StreamingContainerUmbilicalProtocol.StreamingContainerContext ctx = StramLocalCluster.this.umbilical.getInitContext(this.containerId);
                LOG.info("Started container {}", (Object)this.containerId);
                this.child.run(ctx);
            }
            catch (Error | Exception e) {
                LOG.error("Fatal {} in container {}", new Object[]{e instanceof Error ? "error" : "exception", this.containerId, e});
            }
            finally {
                StramLocalCluster.this.childContainers.remove(this.containerId);
                LOG.info("Container {} terminating.", (Object)this.containerId);
            }
        }
    }

    public static class LocalStreamingContainer
    extends StreamingContainer {
        private final AtomicInteger heartbeatCount = new AtomicInteger();
        private final WindowGenerator windowGenerator;

        public LocalStreamingContainer(String containerId, StreamingContainerUmbilicalProtocol umbilical, WindowGenerator winGen) {
            super(containerId, umbilical);
            this.windowGenerator = winGen;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void run(StreamingContainerUmbilicalProtocol.StreamingContainerContext ctx) throws Exception {
            LOG.debug("container {} context {}", (Object)this.getContainerId(), (Object)ctx);
            this.setup(ctx);
            if (this.bufferServerAddress != null && !this.bufferServerAddress.getAddress().isLoopbackAddress()) {
                this.bufferServerAddress = InetSocketAddress.createUnresolved(LOCALHOST, this.bufferServerAddress.getPort());
            }
            boolean hasError = true;
            try {
                this.heartbeatLoop();
                hasError = false;
            }
            finally {
                block9: {
                    try {
                        this.teardown();
                    }
                    catch (Exception e) {
                        if (hasError) break block9;
                        throw e;
                    }
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void waitForHeartbeat(int waitMillis) throws InterruptedException {
            AtomicInteger atomicInteger = this.heartbeatCount;
            synchronized (atomicInteger) {
                this.heartbeatCount.wait(waitMillis);
            }
        }

        @Override
        public void teardown() {
            super.teardown();
        }

        @Override
        protected WindowGenerator setupWindowGenerator(long smallestWindowId) {
            if (this.windowGenerator != null) {
                return this.windowGenerator;
            }
            return super.setupWindowGenerator(smallestWindowId);
        }

        OperatorContext getNodeContext(int id) {
            return ((Node)this.nodes.get((Object)Integer.valueOf((int)id))).context;
        }

        Operator getOperator(int id) {
            return ((Node)this.nodes.get(id)).getOperator();
        }

        Map<Integer, Node<?>> getNodes() {
            return Collections.unmodifiableMap(this.nodes);
        }
    }

    private class UmbilicalProtocolLocalImpl
    implements StreamingContainerUmbilicalProtocol {
        private UmbilicalProtocolLocalImpl() {
        }

        public long getProtocolVersion(String protocol, long clientVersion) throws IOException {
            throw new UnsupportedOperationException("not implemented in local mode");
        }

        public ProtocolSignature getProtocolSignature(String protocol, long clientVersion, int clientMethodsHash) throws IOException {
            throw new UnsupportedOperationException("not implemented in local mode");
        }

        @Override
        public void reportError(String containerId, int[] operators, String msg) {
            try {
                this.log(containerId, msg);
            }
            catch (IOException iOException) {
                // empty catch block
            }
        }

        @Override
        public void log(String containerId, String msg) throws IOException {
            LOG.info("{} msg: {}", (Object)containerId, (Object)msg);
        }

        @Override
        public StreamingContainerUmbilicalProtocol.StreamingContainerContext getInitContext(String containerId) throws IOException {
            StreamingContainerAgent sca = StramLocalCluster.this.dnmgr.getContainerAgent(containerId);
            StreamingContainerUmbilicalProtocol.StreamingContainerContext scc = sca.getInitContext();
            scc.deployBufferServer = StramLocalCluster.this.perContainerBufferServer;
            return scc;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public StreamingContainerUmbilicalProtocol.ContainerHeartbeatResponse processHeartbeat(StreamingContainerUmbilicalProtocol.ContainerHeartbeat msg) {
            if (StramLocalCluster.this.injectShutdown.containsKey(msg.getContainerId())) {
                StreamingContainerUmbilicalProtocol.ContainerHeartbeatResponse r = new StreamingContainerUmbilicalProtocol.ContainerHeartbeatResponse();
                r.shutdown = true;
                return r;
            }
            try {
                StreamingContainerUmbilicalProtocol.ContainerHeartbeatResponse rsp = StramLocalCluster.this.dnmgr.processHeartbeat(msg);
                if (rsp != null) {
                    rsp = (StreamingContainerUmbilicalProtocol.ContainerHeartbeatResponse)SerializationUtils.clone((Serializable)rsp);
                }
                StreamingContainerUmbilicalProtocol.ContainerHeartbeatResponse containerHeartbeatResponse = rsp;
                return containerHeartbeatResponse;
            }
            finally {
                LocalStreamingContainer c = (LocalStreamingContainer)StramLocalCluster.this.childContainers.get(msg.getContainerId());
                AtomicInteger atomicInteger = c.heartbeatCount;
                synchronized (atomicInteger) {
                    c.heartbeatCount.incrementAndGet();
                    c.heartbeatCount.notifyAll();
                }
            }
        }
    }

    public static interface MockComponentFactory {
        public WindowGenerator setupWindowGenerator();
    }
}

