/*
 * Decompiled with CFR 0.152.
 */
package com.twitter.heron.simulator.executors;

import com.google.protobuf.GeneratedMessage;
import com.twitter.heron.api.generated.TopologyAPI;
import com.twitter.heron.common.basics.SlaveLooper;
import com.twitter.heron.common.basics.WakeableLooper;
import com.twitter.heron.proto.stmgr.StreamManager;
import com.twitter.heron.proto.system.HeronTuples;
import com.twitter.heron.proto.system.PhysicalPlans;
import com.twitter.heron.simulator.executors.InstanceExecutor;
import com.twitter.heron.simulator.utils.PhysicalPlanUtil;
import com.twitter.heron.simulator.utils.StreamConsumers;
import com.twitter.heron.simulator.utils.TupleCache;
import com.twitter.heron.simulator.utils.XORManager;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.logging.Logger;

public class StreamExecutor
implements Runnable {
    public static final int NUM_BUCKETS = 3;
    private static final Logger LOG = Logger.getLogger(InstanceExecutor.class.getName());
    private final Map<Integer, InstanceExecutor> taskIdToInstanceExecutor = new HashMap<Integer, InstanceExecutor>();
    private final Map<TopologyAPI.StreamId, StreamConsumers> streamIdStreamConsumersMap;
    private final Set<String> spoutSets;
    private final XORManager xorManager;
    private final TupleCache tupleCache;
    private final WakeableLooper looper = this.createWakeableLooper();

    public StreamExecutor(PhysicalPlans.PhysicalPlan physicalPlan) {
        this.spoutSets = this.createSpoutsSet(physicalPlan);
        Map<String, List<Integer>> map = PhysicalPlanUtil.getComponentToTaskIds(physicalPlan);
        this.streamIdStreamConsumersMap = StreamConsumers.populateStreamConsumers(physicalPlan.getTopology(), map);
        this.xorManager = XORManager.populateXORManager(this.looper, physicalPlan.getTopology(), 3, map);
        this.tupleCache = new TupleCache();
    }

    public void addInstanceExecutor(InstanceExecutor instanceExecutor) {
        instanceExecutor.getStreamOutQueue().setConsumer(this.looper);
        instanceExecutor.getStreamInQueue().setProducer(this.looper);
        this.taskIdToInstanceExecutor.put(instanceExecutor.getTaskId(), instanceExecutor);
    }

    @Override
    public void run() {
        Thread.currentThread().setName("Simulator_Stream_Executor");
        LOG.info("Stream_Executor starts");
        this.addStreamExecutorTasks();
        this.looper.loop();
    }

    public void stop() {
        this.looper.exitLoop();
    }

    protected void addStreamExecutorTasks() {
        Runnable runnable = new Runnable(){

            @Override
            public void run() {
                if (StreamExecutor.this.tupleCache.isEmpty()) {
                    StreamExecutor.this.handleInstanceExecutor();
                }
                StreamExecutor.this.drainCache();
            }
        };
        this.looper.addTasksOnWakeup(runnable);
    }

    public void handleInstanceExecutor() {
        for (InstanceExecutor instanceExecutor : this.taskIdToInstanceExecutor.values()) {
            HeronTuples.HeronTupleSet heronTupleSet;
            boolean bl = this.spoutSets.contains(instanceExecutor.getComponentName());
            int n = instanceExecutor.getTaskId();
            int n2 = instanceExecutor.getStreamOutQueue().size();
            for (int i = 0; i < n2 && (heronTupleSet = instanceExecutor.getStreamOutQueue().poll()) != null; ++i) {
                GeneratedMessage generatedMessage;
                if (heronTupleSet.hasData()) {
                    generatedMessage = heronTupleSet.getData();
                    TopologyAPI.StreamId streamId = ((HeronTuples.HeronDataTupleSet)generatedMessage).getStream();
                    StreamConsumers streamConsumers = this.streamIdStreamConsumersMap.get(streamId);
                    if (streamConsumers != null) {
                        for (HeronTuples.HeronDataTuple heronDataTuple : ((HeronTuples.HeronDataTupleSet)generatedMessage).getTuplesList()) {
                            List<Integer> list = streamConsumers.getListToSend(heronDataTuple);
                            list.addAll(heronDataTuple.getDestTaskIdsList());
                            if (list.isEmpty()) {
                                LOG.severe("Nobody to sent the tuple to");
                            }
                            this.copyDataOutBound(n, bl, streamId, heronDataTuple, list);
                        }
                    } else {
                        LOG.severe("Nobody consumes stream: " + streamId);
                    }
                }
                if (!heronTupleSet.hasControl()) continue;
                generatedMessage = heronTupleSet.getControl();
                for (HeronTuples.AckTuple ackTuple : ((HeronTuples.HeronControlTupleSet)generatedMessage).getAcksList()) {
                    this.copyControlOutBound(ackTuple, true);
                }
                for (HeronTuples.AckTuple ackTuple : ((HeronTuples.HeronControlTupleSet)generatedMessage).getFailsList()) {
                    this.copyControlOutBound(ackTuple, false);
                }
            }
        }
    }

    protected boolean isSendTuplesToInstance(List<Integer> list) {
        for (Integer n : list) {
            if (this.taskIdToInstanceExecutor.get(n).getStreamInQueue().remainingCapacity() > 0) continue;
            return false;
        }
        return true;
    }

    protected void copyDataOutBound(int n, boolean bl, TopologyAPI.StreamId streamId, HeronTuples.HeronDataTuple heronDataTuple, List<Integer> list) {
        boolean bl2 = true;
        boolean bl3 = heronDataTuple.getRootsCount() > 0;
        for (Integer n2 : list) {
            long l = this.tupleCache.addDataTuple(n2, streamId, heronDataTuple, bl3);
            if (bl3) {
                if (bl) {
                    if (bl2) {
                        this.xorManager.create(n, heronDataTuple.getRoots(0).getKey(), l);
                    } else {
                        this.xorManager.anchor(n, heronDataTuple.getRoots(0).getKey(), l);
                    }
                } else {
                    for (HeronTuples.RootId rootId : heronDataTuple.getRootsList()) {
                        HeronTuples.AckTuple ackTuple = HeronTuples.AckTuple.newBuilder().addRoots(rootId).setAckedtuple(l).build();
                        this.tupleCache.addEmitTuple(rootId.getTaskid(), ackTuple);
                    }
                }
            }
            bl2 = false;
        }
    }

    protected void copyControlOutBound(HeronTuples.AckTuple ackTuple, boolean bl) {
        for (HeronTuples.RootId rootId : ackTuple.getRootsList()) {
            HeronTuples.AckTuple ackTuple2 = HeronTuples.AckTuple.newBuilder().addRoots(rootId).setAckedtuple(ackTuple.getAckedtuple()).build();
            if (bl) {
                this.tupleCache.addAckTuple(rootId.getTaskid(), ackTuple2);
                continue;
            }
            this.tupleCache.addFailTuple(rootId.getTaskid(), ackTuple2);
        }
    }

    protected void processAcksAndFails(int n, HeronTuples.HeronControlTupleSet heronControlTupleSet) {
        HeronTuples.RootId.Builder builder;
        HeronTuples.AckTuple.Builder builder2;
        StreamManager.TupleMessage.Builder builder3 = StreamManager.TupleMessage.newBuilder();
        for (HeronTuples.AckTuple ackTuple : heronControlTupleSet.getEmitsList()) {
            for (HeronTuples.RootId rootId : ackTuple.getRootsList()) {
                this.xorManager.anchor(n, rootId.getKey(), ackTuple.getAckedtuple());
            }
        }
        for (HeronTuples.AckTuple ackTuple : heronControlTupleSet.getAcksList()) {
            for (HeronTuples.RootId rootId : ackTuple.getRootsList()) {
                if (!this.xorManager.anchor(n, rootId.getKey(), ackTuple.getAckedtuple())) continue;
                builder2 = builder3.getSetBuilder().getControlBuilder().addAcksBuilder();
                builder = builder2.addRootsBuilder();
                builder.setKey(rootId.getKey());
                builder.setTaskid(n);
                builder2.setAckedtuple(0L);
                this.xorManager.remove(n, rootId.getKey());
            }
        }
        for (HeronTuples.AckTuple ackTuple : heronControlTupleSet.getFailsList()) {
            for (HeronTuples.RootId rootId : ackTuple.getRootsList()) {
                if (!this.xorManager.remove(n, rootId.getKey())) continue;
                builder2 = builder3.getSetBuilder().getControlBuilder().addFailsBuilder();
                builder = builder2.addRootsBuilder();
                builder.setKey(rootId.getKey());
                builder.setTaskid(n);
                builder2.setAckedtuple(0L);
            }
        }
        if (builder3.hasSet()) {
            this.sendMessageToInstance(n, builder3.build());
        }
    }

    protected void drainCache() {
        Map<Integer, List<HeronTuples.HeronTupleSet>> map = this.tupleCache.getCache();
        if (!this.isSendTuplesToInstance(new LinkedList<Integer>(map.keySet()))) {
            return;
        }
        for (Map.Entry<Integer, List<HeronTuples.HeronTupleSet>> entry : map.entrySet()) {
            int n = entry.getKey();
            for (HeronTuples.HeronTupleSet heronTupleSet : entry.getValue()) {
                this.sendInBound(n, heronTupleSet);
            }
        }
        this.tupleCache.clear();
    }

    protected void sendInBound(int n, HeronTuples.HeronTupleSet heronTupleSet) {
        if (heronTupleSet.hasData()) {
            StreamManager.TupleMessage.Builder builder = StreamManager.TupleMessage.newBuilder();
            builder.getSetBuilder().setData(heronTupleSet.getData());
            this.sendMessageToInstance(n, builder.build());
        }
        if (heronTupleSet.hasControl()) {
            this.processAcksAndFails(n, heronTupleSet.getControl());
        }
    }

    protected void sendMessageToInstance(int n, StreamManager.TupleMessage tupleMessage) {
        this.taskIdToInstanceExecutor.get(n).getStreamInQueue().offer(tupleMessage.getSet());
    }

    protected WakeableLooper createWakeableLooper() {
        return new SlaveLooper();
    }

    protected Set<String> createSpoutsSet(PhysicalPlans.PhysicalPlan physicalPlan) {
        HashSet<String> hashSet = new HashSet<String>();
        for (TopologyAPI.Spout spout : physicalPlan.getTopology().getSpoutsList()) {
            hashSet.add(spout.getComp().getName());
        }
        return hashSet;
    }
}

