/*
 * Decompiled with CFR 0.152.
 */
package com.twitter.heron.simulator.instance;

import com.twitter.heron.api.generated.TopologyAPI;
import com.twitter.heron.api.metric.GlobalMetrics;
import com.twitter.heron.api.serializer.IPluggableSerializer;
import com.twitter.heron.api.spout.ISpout;
import com.twitter.heron.api.spout.SpoutOutputCollector;
import com.twitter.heron.api.utils.Utils;
import com.twitter.heron.common.basics.Communicator;
import com.twitter.heron.common.basics.SingletonRegistry;
import com.twitter.heron.common.basics.SlaveLooper;
import com.twitter.heron.common.basics.TypeUtils;
import com.twitter.heron.common.config.SystemConfig;
import com.twitter.heron.common.utils.metrics.SpoutMetrics;
import com.twitter.heron.common.utils.misc.PhysicalPlanHelper;
import com.twitter.heron.common.utils.misc.SerializeDeSerializeHelper;
import com.twitter.heron.common.utils.topology.TopologyContextImpl;
import com.twitter.heron.proto.system.HeronTuples;
import com.twitter.heron.simulator.instance.IInstance;
import com.twitter.heron.simulator.instance.RootTupleInfo;
import com.twitter.heron.simulator.instance.SpoutOutputCollectorImpl;
import java.util.List;
import java.util.Map;
import java.util.logging.Logger;

public class SpoutInstance
implements IInstance {
    private static final Logger LOG = Logger.getLogger(SpoutInstance.class.getName());
    private final ISpout spout;
    private final SpoutOutputCollectorImpl collector;
    private final IPluggableSerializer serializer;
    private final SpoutMetrics spoutMetrics;
    private final Communicator<HeronTuples.HeronTupleSet> streamInQueue;
    private final boolean ackEnabled;
    private final boolean enableMessageTimeouts;
    private final SlaveLooper looper;
    private final SystemConfig systemConfig;
    private final Map<String, Object> config;
    private PhysicalPlanHelper helper;
    private TopologyAPI.TopologyState topologyState;

    public SpoutInstance(PhysicalPlanHelper physicalPlanHelper, Communicator<HeronTuples.HeronTupleSet> communicator, Communicator<HeronTuples.HeronTupleSet> communicator2, SlaveLooper slaveLooper) {
        this.helper = physicalPlanHelper;
        this.looper = slaveLooper;
        this.streamInQueue = communicator;
        this.spoutMetrics = new SpoutMetrics();
        this.spoutMetrics.initMultiCountMetrics(physicalPlanHelper);
        TopologyContextImpl topologyContextImpl = physicalPlanHelper.getTopologyContext();
        this.config = topologyContextImpl.getTopologyConfig();
        this.systemConfig = (SystemConfig)SingletonRegistry.INSTANCE.getSingleton(SystemConfig.HERON_SYSTEM_CONFIG);
        this.ackEnabled = Boolean.parseBoolean((String)this.config.get("topology.acking"));
        this.enableMessageTimeouts = Boolean.parseBoolean((String)this.config.get("topology.enable.message.timeouts"));
        LOG.info("Enable Ack: " + this.ackEnabled);
        LOG.info("EnableMessageTimeouts: " + this.enableMessageTimeouts);
        if (physicalPlanHelper.getMySpout() == null) {
            throw new RuntimeException("HeronSpoutInstance has no spout in physical plan");
        }
        this.serializer = SerializeDeSerializeHelper.getSerializer(this.config);
        if (physicalPlanHelper.getMySpout().getComp().hasJavaObject()) {
            this.spout = (ISpout)Utils.deserialize(physicalPlanHelper.getMySpout().getComp().getJavaObject().toByteArray());
        } else if (physicalPlanHelper.getMySpout().getComp().hasJavaClassName()) {
            String string = physicalPlanHelper.getMySpout().getComp().getJavaClassName();
            try {
                this.spout = (ISpout)Class.forName(string).newInstance();
            }
            catch (ClassNotFoundException classNotFoundException) {
                throw new RuntimeException(classNotFoundException + " Spout class must be in class path.");
            }
            catch (InstantiationException instantiationException) {
                throw new RuntimeException(instantiationException + " Spout class must be concrete.");
            }
            catch (IllegalAccessException illegalAccessException) {
                throw new RuntimeException(illegalAccessException + " Spout class must have a no-arg constructor.");
            }
        } else {
            throw new RuntimeException("Neither java_object nor java_class_name set for spout");
        }
        this.collector = new SpoutOutputCollectorImpl(this.serializer, physicalPlanHelper, communicator2, this.spoutMetrics);
    }

    @Override
    public void start() {
        TopologyContextImpl topologyContextImpl = this.helper.getTopologyContext();
        GlobalMetrics.init(topologyContextImpl, this.systemConfig.getHeronMetricsExportIntervalSec());
        this.spoutMetrics.registerMetrics(topologyContextImpl);
        this.spout.open(topologyContextImpl.getTopologyConfig(), topologyContextImpl, new SpoutOutputCollector(this.collector));
        topologyContextImpl.invokeHookPrepare();
        this.helper.prepareForCustomStreamGrouping(topologyContextImpl);
        this.addSpoutsTasks();
        this.topologyState = TopologyAPI.TopologyState.RUNNING;
    }

    @Override
    public void stop() {
        this.helper.getTopologyContext().invokeHookCleanup();
        this.spout.close();
        this.looper.exitLoop();
        this.streamInQueue.clear();
        this.collector.clear();
    }

    @Override
    public void activate() {
        LOG.info("Spout is activated");
        this.spout.activate();
        this.topologyState = TopologyAPI.TopologyState.RUNNING;
    }

    @Override
    public void deactivate() {
        LOG.info("Spout is deactivated");
        this.spout.deactivate();
        this.topologyState = TopologyAPI.TopologyState.PAUSED;
    }

    private void addSpoutsTasks() {
        Runnable runnable = new Runnable(){

            @Override
            public void run() {
                if (SpoutInstance.this.isProduceTuple()) {
                    SpoutInstance.this.produceTuple();
                    SpoutInstance.this.collector.sendOutTuples();
                } else {
                    SpoutInstance.this.spoutMetrics.updateOutQueueFullCount();
                }
                if (SpoutInstance.this.ackEnabled) {
                    SpoutInstance.this.readTuplesAndExecute(SpoutInstance.this.streamInQueue);
                    SpoutInstance.this.spoutMetrics.updatePendingTuplesCount(SpoutInstance.this.collector.numInFlight());
                } else {
                    SpoutInstance.this.doImmediateAcks();
                }
                if (SpoutInstance.this.isContinueWork()) {
                    SpoutInstance.this.looper.wakeUp();
                }
            }
        };
        this.looper.addTasksOnWakeup(runnable);
        if (this.enableMessageTimeouts) {
            this.lookForTimeouts();
        }
    }

    private boolean isContinueWork() {
        long l = TypeUtils.getLong(this.config.get("topology.max.spout.pending"));
        return this.topologyState.equals(TopologyAPI.TopologyState.RUNNING) && (!this.ackEnabled && this.collector.isOutQueuesAvailable() || this.ackEnabled && this.collector.isOutQueuesAvailable() && (long)this.collector.numInFlight() < l || this.ackEnabled && !this.streamInQueue.isEmpty());
    }

    private boolean isProduceTuple() {
        return this.collector.isOutQueuesAvailable() && this.topologyState.equals(TopologyAPI.TopologyState.RUNNING);
    }

    private void produceTuple() {
        int n = TypeUtils.getInteger(this.config.get("topology.max.spout.pending"));
        long l = this.collector.getTotalTuplesEmitted();
        long l2 = this.collector.getTotalDataEmittedInBytes();
        long l3 = this.systemConfig.getInstanceEmitBatchTimeMs() * 1000000L;
        long l4 = this.systemConfig.getInstanceEmitBatchSizeBytes();
        long l5 = System.nanoTime();
        while (this.ackEnabled && n > this.collector.numInFlight() || !this.ackEnabled) {
            long l6 = System.nanoTime();
            this.spout.nextTuple();
            long l7 = System.nanoTime() - l6;
            this.spoutMetrics.nextTuple(l7);
            long l8 = this.collector.getTotalTuplesEmitted();
            if (l8 == l) break;
            l = l8;
            if (System.nanoTime() - l5 - l3 <= 0L && this.collector.getTotalDataEmittedInBytes() - l2 <= l4) continue;
            break;
        }
    }

    private void handleAckTuple(HeronTuples.AckTuple ackTuple, boolean bl) {
        for (HeronTuples.RootId rootId : ackTuple.getRootsList()) {
            if (rootId.getTaskid() != this.helper.getMyTaskId()) {
                throw new RuntimeException(String.format("Receiving tuple for task %d in task %d", rootId.getTaskid(), this.helper.getMyTaskId()));
            }
            long l = rootId.getKey();
            RootTupleInfo rootTupleInfo = this.collector.retireInFlight(l);
            if (rootTupleInfo == null) {
                return;
            }
            Object object = rootTupleInfo.getMessageId();
            if (object == null) continue;
            long l2 = System.nanoTime() - rootTupleInfo.getInsertionTime();
            if (bl) {
                this.invokeAck(object, rootTupleInfo.getStreamId(), l2);
                continue;
            }
            this.invokeFail(object, rootTupleInfo.getStreamId(), l2);
        }
    }

    private void lookForTimeouts() {
        long l = TypeUtils.getLong(this.config.get("topology.message.timeout.secs"));
        long l2 = l * 1000000000L;
        int n = this.systemConfig.getInstanceAcknowledgementNbuckets();
        List<RootTupleInfo> list = this.collector.retireExpired(l2);
        for (RootTupleInfo rootTupleInfo : list) {
            this.spoutMetrics.timeoutTuple(rootTupleInfo.getStreamId());
            this.invokeFail(rootTupleInfo.getMessageId(), rootTupleInfo.getStreamId(), l2);
        }
        Runnable runnable = new Runnable(){

            @Override
            public void run() {
                SpoutInstance.this.lookForTimeouts();
            }
        };
        this.looper.registerTimerEventInNanoSeconds(l2 / (long)n, runnable);
    }

    @Override
    public void readTuplesAndExecute(Communicator<HeronTuples.HeronTupleSet> communicator) {
        long l = System.nanoTime();
        long l2 = this.systemConfig.getInstanceAckBatchTimeMs() * 1000000L;
        while (!communicator.isEmpty()) {
            HeronTuples.HeronTupleSet heronTupleSet = communicator.poll();
            if (heronTupleSet.hasData()) {
                throw new RuntimeException("Spout cannot get incoming data tuples from other components");
            }
            if (heronTupleSet.hasControl()) {
                for (HeronTuples.AckTuple ackTuple : heronTupleSet.getControl().getAcksList()) {
                    this.handleAckTuple(ackTuple, true);
                }
                for (HeronTuples.AckTuple ackTuple : heronTupleSet.getControl().getFailsList()) {
                    this.handleAckTuple(ackTuple, false);
                }
            }
            if (System.nanoTime() - l - l2 <= 0L) continue;
            break;
        }
    }

    private void doImmediateAcks() {
        int n = this.collector.getImmediateAcks().size();
        for (int i = 0; i < n; ++i) {
            RootTupleInfo rootTupleInfo = this.collector.getImmediateAcks().poll();
            this.invokeAck(rootTupleInfo.getMessageId(), rootTupleInfo.getStreamId(), 0L);
        }
    }

    private void invokeAck(Object object, String string, Long l) {
        this.spout.ack(object);
        this.helper.getTopologyContext().invokeHookSpoutAck(object, l);
        this.spoutMetrics.ackedTuple(string, l);
    }

    private void invokeFail(Object object, String string, Long l) {
        this.spout.fail(object);
        this.helper.getTopologyContext().invokeHookSpoutFail(object, l);
        this.spoutMetrics.failedTuple(string, l);
    }
}

