/*
 * Decompiled with CFR 0.152.
 */
package com.twitter.heron.simulator.instance;

import com.google.protobuf.ByteString;
import com.twitter.heron.api.bolt.IOutputCollector;
import com.twitter.heron.api.serializer.IPluggableSerializer;
import com.twitter.heron.api.tuple.Tuple;
import com.twitter.heron.common.basics.Communicator;
import com.twitter.heron.common.utils.metrics.BoltMetrics;
import com.twitter.heron.common.utils.misc.PhysicalPlanHelper;
import com.twitter.heron.common.utils.tuple.TupleImpl;
import com.twitter.heron.proto.system.HeronTuples;
import com.twitter.heron.simulator.instance.OutgoingTupleCollection;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;

public class BoltOutputCollectorImpl
implements IOutputCollector {
    private static final Logger LOG = Logger.getLogger(BoltOutputCollectorImpl.class.getName());
    private final IPluggableSerializer serializer;
    private final OutgoingTupleCollection outputter;
    private final BoltMetrics boltMetrics;
    private final PhysicalPlanHelper helper;
    private final boolean ackEnabled;

    public BoltOutputCollectorImpl(IPluggableSerializer iPluggableSerializer, PhysicalPlanHelper physicalPlanHelper, Communicator<HeronTuples.HeronTupleSet> communicator, BoltMetrics boltMetrics) {
        if (physicalPlanHelper.getMyBolt() == null) {
            throw new RuntimeException(physicalPlanHelper.getMyTaskId() + " is not a bolt ");
        }
        this.serializer = iPluggableSerializer;
        this.helper = physicalPlanHelper;
        this.boltMetrics = boltMetrics;
        Map<String, Object> map = physicalPlanHelper.getTopologyContext().getTopologyConfig();
        this.ackEnabled = map.containsKey("topology.acking") && map.get("topology.acking") != null ? Boolean.parseBoolean(map.get("topology.acking").toString()) : false;
        this.outputter = new OutgoingTupleCollection(physicalPlanHelper, communicator);
    }

    @Override
    public List<Integer> emit(String string, Collection<Tuple> collection, List<Object> list) {
        return this.admitBoltTuple(string, collection, list);
    }

    @Override
    public void emitDirect(int n, String string, Collection<Tuple> collection, List<Object> list) {
        this.admitBoltTuple(n, string, collection, list);
    }

    @Override
    public void reportError(Throwable throwable) {
        Exception exception = new Exception("Reporting an error in topology code", throwable);
        LOG.log(Level.SEVERE, "Error stack trace ", exception);
    }

    @Override
    public void ack(Tuple tuple) {
        this.admitAckTuple(tuple);
    }

    @Override
    public void fail(Tuple tuple) {
        this.admitFailTuple(tuple);
    }

    public boolean isOutQueuesAvailable() {
        return this.outputter.isOutQueuesAvailable();
    }

    public long getTotalDataEmittedInBytes() {
        return this.outputter.getTotalDataEmittedInBytes();
    }

    public void sendOutTuples() {
        this.outputter.sendOutTuples();
    }

    public void clear() {
        this.outputter.clear();
    }

    private List<Integer> admitBoltTuple(String string, Collection<Tuple> collection, List<Object> list) {
        this.helper.checkOutputSchema(string, list);
        List<Integer> list2 = this.helper.chooseTasksForCustomStreamGrouping(string, list);
        this.helper.getTopologyContext().invokeHookEmit(list, string, list2);
        HeronTuples.HeronDataTuple.Builder builder = HeronTuples.HeronDataTuple.newBuilder();
        builder.setKey(0L);
        if (list2 != null) {
            for (Integer iterator : list2) {
                builder.addDestTaskIds(iterator);
            }
        }
        if (collection != null) {
            HashSet hashSet = new HashSet();
            for (Tuple tuple : collection) {
                if (!(tuple instanceof TupleImpl)) continue;
                TupleImpl tupleImpl = (TupleImpl)tuple;
                hashSet.addAll(tupleImpl.getRoots());
            }
            Iterator iterator = hashSet.iterator();
            while (iterator.hasNext()) {
                HeronTuples.RootId rootId = (HeronTuples.RootId)iterator.next();
                builder.addRoots(rootId);
            }
        }
        long l = 0L;
        long l2 = System.nanoTime();
        for (Object object : list) {
            byte[] byArray = this.serializer.serialize(object);
            ByteString byteString = ByteString.copyFrom(byArray);
            builder.addValues(byteString);
            l += (long)byArray.length;
        }
        long l3 = System.nanoTime() - l2;
        this.boltMetrics.serializeDataTuple(string, l3);
        this.outputter.addDataTuple(string, builder, l);
        this.boltMetrics.emittedTuple(string);
        return null;
    }

    private void admitBoltTuple(int n, String string, Collection<Tuple> collection, List<Object> list) {
        throw new RuntimeException("emitDirect not supported");
    }

    private void admitAckTuple(Tuple tuple) {
        if (tuple instanceof TupleImpl) {
            TupleImpl tupleImpl = (TupleImpl)tuple;
            if (this.ackEnabled) {
                HeronTuples.AckTuple.Builder builder = HeronTuples.AckTuple.newBuilder();
                builder.setAckedtuple(tupleImpl.getTupleKey());
                long l = 0L;
                for (HeronTuples.RootId rootId : tupleImpl.getRoots()) {
                    builder.addRoots(rootId);
                    l += (long)rootId.getSerializedSize();
                }
                this.outputter.addAckTuple(builder, l);
            }
            long l = System.nanoTime() - tupleImpl.getCreationTime();
            this.helper.getTopologyContext().invokeHookBoltAck(tuple, l);
            this.boltMetrics.ackedTuple(tuple.getSourceStreamId(), tuple.getSourceComponent(), l);
        }
    }

    private void admitFailTuple(Tuple tuple) {
        if (tuple instanceof TupleImpl) {
            TupleImpl tupleImpl = (TupleImpl)tuple;
            if (this.ackEnabled) {
                HeronTuples.AckTuple.Builder builder = HeronTuples.AckTuple.newBuilder();
                builder.setAckedtuple(tupleImpl.getTupleKey());
                long l = 0L;
                for (HeronTuples.RootId rootId : tupleImpl.getRoots()) {
                    builder.addRoots(rootId);
                    l += (long)rootId.getSerializedSize();
                }
                this.outputter.addFailTuple(builder, l);
            }
            long l = System.nanoTime() - tupleImpl.getCreationTime();
            this.helper.getTopologyContext().invokeHookBoltFail(tuple, l);
            this.boltMetrics.failedTuple(tuple.getSourceStreamId(), tuple.getSourceComponent(), l);
        }
    }
}

