/*
 * Decompiled with CFR 0.152.
 */
package com.twitter.heron.simulator.instance;

import com.google.protobuf.ByteString;
import com.twitter.heron.api.bolt.IBolt;
import com.twitter.heron.api.bolt.OutputCollector;
import com.twitter.heron.api.generated.TopologyAPI;
import com.twitter.heron.api.metric.GlobalMetrics;
import com.twitter.heron.api.serializer.IPluggableSerializer;
import com.twitter.heron.api.utils.Utils;
import com.twitter.heron.common.basics.Communicator;
import com.twitter.heron.common.basics.SingletonRegistry;
import com.twitter.heron.common.basics.SlaveLooper;
import com.twitter.heron.common.basics.TypeUtils;
import com.twitter.heron.common.config.SystemConfig;
import com.twitter.heron.common.utils.metrics.BoltMetrics;
import com.twitter.heron.common.utils.misc.PhysicalPlanHelper;
import com.twitter.heron.common.utils.misc.SerializeDeSerializeHelper;
import com.twitter.heron.common.utils.topology.TopologyContextImpl;
import com.twitter.heron.common.utils.tuple.TickTuple;
import com.twitter.heron.common.utils.tuple.TupleImpl;
import com.twitter.heron.proto.system.HeronTuples;
import com.twitter.heron.simulator.instance.BoltOutputCollectorImpl;
import com.twitter.heron.simulator.instance.IInstance;
import java.util.ArrayList;
import java.util.logging.Logger;

public class BoltInstance
implements IInstance {
    private static final Logger LOG = Logger.getLogger(BoltInstance.class.getName());
    private final PhysicalPlanHelper helper;
    private final IBolt bolt;
    private final BoltOutputCollectorImpl collector;
    private final IPluggableSerializer serializer;
    private final BoltMetrics boltMetrics;
    private final Communicator<HeronTuples.HeronTupleSet> streamInQueue;
    private final SlaveLooper looper;
    private final SystemConfig systemConfig;

    public BoltInstance(PhysicalPlanHelper physicalPlanHelper, Communicator<HeronTuples.HeronTupleSet> communicator, Communicator<HeronTuples.HeronTupleSet> communicator2, SlaveLooper slaveLooper) {
        this.helper = physicalPlanHelper;
        this.looper = slaveLooper;
        this.streamInQueue = communicator;
        this.boltMetrics = new BoltMetrics();
        this.boltMetrics.initMultiCountMetrics(physicalPlanHelper);
        if (physicalPlanHelper.getMyBolt() == null) {
            throw new RuntimeException("HeronBoltInstance has no bolt in physical plan.");
        }
        TopologyContextImpl topologyContextImpl = physicalPlanHelper.getTopologyContext();
        this.serializer = SerializeDeSerializeHelper.getSerializer(topologyContextImpl.getTopologyConfig());
        this.systemConfig = (SystemConfig)SingletonRegistry.INSTANCE.getSingleton(SystemConfig.HERON_SYSTEM_CONFIG);
        if (physicalPlanHelper.getMyBolt().getComp().hasJavaObject()) {
            this.bolt = (IBolt)Utils.deserialize(physicalPlanHelper.getMyBolt().getComp().getJavaObject().toByteArray());
        } else if (physicalPlanHelper.getMyBolt().getComp().hasJavaClassName()) {
            try {
                String string = physicalPlanHelper.getMyBolt().getComp().getJavaClassName();
                this.bolt = (IBolt)Class.forName(string).newInstance();
            }
            catch (ClassNotFoundException classNotFoundException) {
                throw new RuntimeException(classNotFoundException + " Bolt class must be in class path.");
            }
            catch (InstantiationException instantiationException) {
                throw new RuntimeException(instantiationException + " Bolt class must be concrete.");
            }
            catch (IllegalAccessException illegalAccessException) {
                throw new RuntimeException(illegalAccessException + " Bolt class must have a no-arg constructor.");
            }
        } else {
            throw new RuntimeException("Neither java_object nor java_class_name set for bolt");
        }
        this.collector = new BoltOutputCollectorImpl(this.serializer, physicalPlanHelper, communicator2, this.boltMetrics);
    }

    @Override
    public void start() {
        TopologyContextImpl topologyContextImpl = this.helper.getTopologyContext();
        GlobalMetrics.init(topologyContextImpl, this.systemConfig.getHeronMetricsExportIntervalSec());
        this.boltMetrics.registerMetrics(topologyContextImpl);
        this.bolt.prepare(topologyContextImpl.getTopologyConfig(), topologyContextImpl, new OutputCollector(this.collector));
        topologyContextImpl.invokeHookPrepare();
        this.helper.prepareForCustomStreamGrouping(topologyContextImpl);
        this.addBoltTasks();
    }

    @Override
    public void stop() {
        this.helper.getTopologyContext().invokeHookCleanup();
        this.bolt.cleanup();
        this.looper.exitLoop();
        this.streamInQueue.clear();
        this.collector.clear();
    }

    private void addBoltTasks() {
        Runnable runnable = new Runnable(){

            @Override
            public void run() {
                if (BoltInstance.this.collector.isOutQueuesAvailable()) {
                    BoltInstance.this.readTuplesAndExecute(BoltInstance.this.streamInQueue);
                    BoltInstance.this.collector.sendOutTuples();
                } else {
                    BoltInstance.this.boltMetrics.updateOutQueueFullCount();
                }
                if (BoltInstance.this.collector.isOutQueuesAvailable() && !BoltInstance.this.streamInQueue.isEmpty()) {
                    BoltInstance.this.looper.wakeUp();
                }
            }
        };
        this.looper.addTasksOnWakeup(runnable);
        this.PrepareTickTupleTimer();
    }

    private void handleDataTuple(HeronTuples.HeronDataTuple heronDataTuple, TopologyContextImpl topologyContextImpl, TopologyAPI.StreamId streamId) {
        long l = System.nanoTime();
        ArrayList<Object> arrayList = new ArrayList<Object>();
        for (ByteString byteString : heronDataTuple.getValuesList()) {
            arrayList.add(this.serializer.deserialize(byteString.toByteArray()));
        }
        TupleImpl tupleImpl = new TupleImpl(topologyContextImpl, streamId, heronDataTuple.getKey(), heronDataTuple.getRootsList(), arrayList);
        long l2 = System.nanoTime();
        this.bolt.execute(tupleImpl);
        long l3 = System.nanoTime() - l2;
        topologyContextImpl.invokeHookBoltExecute(tupleImpl, l3);
        this.boltMetrics.deserializeDataTuple(streamId.getId(), streamId.getComponentName(), l2 - l);
        this.boltMetrics.executeTuple(streamId.getId(), streamId.getComponentName(), l3);
    }

    @Override
    public void readTuplesAndExecute(Communicator<HeronTuples.HeronTupleSet> communicator) {
        long l = System.nanoTime();
        long l2 = this.collector.getTotalDataEmittedInBytes();
        long l3 = this.systemConfig.getInstanceExecuteBatchTimeMs() * 1000000L;
        long l4 = this.systemConfig.getInstanceExecuteBatchSizeBytes();
        while (!communicator.isEmpty()) {
            HeronTuples.HeronTupleSet heronTupleSet = communicator.poll();
            TopologyContextImpl topologyContextImpl = this.helper.getTopologyContext();
            if (heronTupleSet.hasControl()) {
                throw new RuntimeException("Bolt cannot get acks/fails from other components");
            }
            TopologyAPI.StreamId streamId = heronTupleSet.getData().getStream();
            for (HeronTuples.HeronDataTuple heronDataTuple : heronTupleSet.getData().getTuplesList()) {
                this.handleDataTuple(heronDataTuple, topologyContextImpl, streamId);
            }
            if (System.nanoTime() - l - l3 <= 0L && this.collector.getTotalDataEmittedInBytes() - l2 <= l4) continue;
            break;
        }
    }

    @Override
    public void activate() {
    }

    @Override
    public void deactivate() {
    }

    private void PrepareTickTupleTimer() {
        Object object = this.helper.getTopologyContext().getTopologyConfig().get("topology.tick.tuple.freq.secs");
        if (object != null) {
            int n = TypeUtils.getInteger(object);
            Runnable runnable = new Runnable(){

                @Override
                public void run() {
                    BoltInstance.this.SendTickTuple();
                }
            };
            this.looper.registerTimerEventInSeconds(n, runnable);
        }
    }

    private void SendTickTuple() {
        TickTuple tickTuple = new TickTuple();
        long l = System.nanoTime();
        this.bolt.execute(tickTuple);
        long l2 = System.nanoTime() - l;
        this.boltMetrics.executeTuple(tickTuple.getSourceStreamId(), tickTuple.getSourceComponent(), l2);
        this.collector.sendOutTuples();
        this.PrepareTickTupleTimer();
    }
}

