/*
 * Decompiled with CFR 0.152.
 */
package com.twitter.heron.simulator;

import com.twitter.heron.api.Config;
import com.twitter.heron.api.HeronTopology;
import com.twitter.heron.api.generated.TopologyAPI;
import com.twitter.heron.common.basics.SingletonRegistry;
import com.twitter.heron.common.config.SystemConfig;
import com.twitter.heron.proto.system.PhysicalPlans;
import com.twitter.heron.simulator.executors.InstanceExecutor;
import com.twitter.heron.simulator.executors.MetricsExecutor;
import com.twitter.heron.simulator.executors.StreamExecutor;
import com.twitter.heron.simulator.utils.PhysicalPlanUtil;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.logging.Handler;
import java.util.logging.Level;
import java.util.logging.Logger;

public class Simulator {
    private static final Logger LOG = Logger.getLogger(Simulator.class.getName());
    private final List<InstanceExecutor> instanceExecutors = new LinkedList<InstanceExecutor>();
    private final ExecutorService threadsPool = Executors.newCachedThreadPool();
    private SystemConfig systemConfig;
    private StreamExecutor streamExecutor;
    private MetricsExecutor metricsExecutor;

    public Simulator() {
        this(true);
    }

    public Simulator(boolean bl) {
        if (bl) {
            this.init();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void init() {
        this.systemConfig = this.getSystemConfig();
        SingletonRegistry singletonRegistry = SingletonRegistry.INSTANCE;
        synchronized (singletonRegistry) {
            if (!this.isSystemConfigExisted()) {
                LOG.info("System config not existed. Registering...");
                this.registerSystemConfig(this.systemConfig);
                LOG.info("System config registered.");
            } else {
                LOG.info("System config already existed.");
            }
        }
    }

    protected boolean isSystemConfigExisted() {
        return SingletonRegistry.INSTANCE.containsSingleton(SystemConfig.HERON_SYSTEM_CONFIG);
    }

    protected void registerSystemConfig(SystemConfig systemConfig) {
        SingletonRegistry.INSTANCE.registerSingleton(SystemConfig.HERON_SYSTEM_CONFIG, systemConfig);
    }

    public void submitTopology(String string, Config config, HeronTopology heronTopology) {
        TopologyAPI.Topology topology = heronTopology.setConfig(config).setName(string).setState(TopologyAPI.TopologyState.RUNNING).getTopology();
        PhysicalPlans.PhysicalPlan physicalPlan = PhysicalPlanUtil.getPhysicalPlan(topology);
        LOG.info("Physical Plan: \n" + physicalPlan);
        this.streamExecutor = new StreamExecutor(physicalPlan);
        this.metricsExecutor = new MetricsExecutor(this.systemConfig);
        for (PhysicalPlans.Instance object : physicalPlan.getInstancesList()) {
            InstanceExecutor instanceExecutor = new InstanceExecutor(physicalPlan, object.getInstanceId());
            this.streamExecutor.addInstanceExecutor(instanceExecutor);
            this.metricsExecutor.addInstanceExecutor(instanceExecutor);
            this.instanceExecutors.add(instanceExecutor);
        }
        Thread.setDefaultUncaughtExceptionHandler(new DefaultExceptionHandler());
        this.threadsPool.execute(this.metricsExecutor);
        this.threadsPool.execute(this.streamExecutor);
        for (InstanceExecutor instanceExecutor : this.instanceExecutors) {
            this.threadsPool.execute(instanceExecutor);
        }
    }

    public void killTopology(String string) {
        LOG.info("To kill topology: " + string);
        this.stop();
        LOG.info("Topology killed successfully");
    }

    public void activate(String string) {
        LOG.info("To activate topology: " + string);
        for (InstanceExecutor instanceExecutor : this.instanceExecutors) {
            instanceExecutor.activate();
        }
        LOG.info("Activated topology: " + string);
    }

    public void deactivate(String string) {
        LOG.info("To deactivate topology: " + string);
        for (InstanceExecutor instanceExecutor : this.instanceExecutors) {
            instanceExecutor.deactivate();
        }
        LOG.info("Deactivated topology:" + string);
    }

    public void shutdown() {
        LOG.info("To shutdown thread pool");
        if (this.threadsPool.isShutdown()) {
            this.threadsPool.shutdownNow();
        }
        LOG.info("Heron simulator exited.");
    }

    public void stop() {
        for (InstanceExecutor instanceExecutor : this.instanceExecutors) {
            instanceExecutor.stop();
        }
        LOG.info("To stop Stream Executor");
        this.streamExecutor.stop();
        LOG.info("To stop Metrics Executor");
        this.metricsExecutor.stop();
        this.threadsPool.shutdown();
    }

    protected SystemConfig getSystemConfig() {
        SystemConfig systemConfig = new SystemConfig();
        systemConfig.put("heron.instance.set.data.tuple.capacity", 256);
        systemConfig.put("heron.instance.set.control.tuple.capacity", 256);
        systemConfig.put("heron.metrics.export.interval.sec", 60);
        systemConfig.put("heron.instance.execute.batch.time.ms", 16);
        systemConfig.put("heron.instance.execute.batch.size.bytes", 32768);
        systemConfig.put("heron.instance.emit.batch.time.ms", 16);
        systemConfig.put("heron.instance.emit.batch.size.bytes", 32768);
        systemConfig.put("heron.instance.ack.batch.time.ms", 128);
        systemConfig.put("heron.instance.acknowledgement.nbuckets", 10);
        return systemConfig;
    }

    public class DefaultExceptionHandler
    implements Thread.UncaughtExceptionHandler {
        @Override
        public void uncaughtException(Thread thread, Throwable throwable) {
            LOG.severe("Local Mode Process exiting.");
            LOG.log(Level.SEVERE, "Exception caught in thread: " + thread.getName() + " with id: " + thread.getId(), throwable);
            for (Handler handler : Logger.getLogger("").getHandlers()) {
                handler.close();
            }
            Simulator.this.threadsPool.shutdownNow();
            Runtime.getRuntime().halt(1);
        }
    }
}

