/*
 * Decompiled with CFR 0.152.
 */
package com.twitter.heron.simulator.instance;

import com.google.protobuf.ByteString;
import com.twitter.heron.api.serializer.IPluggableSerializer;
import com.twitter.heron.api.spout.ISpoutOutputCollector;
import com.twitter.heron.common.basics.Communicator;
import com.twitter.heron.common.utils.metrics.SpoutMetrics;
import com.twitter.heron.common.utils.misc.PhysicalPlanHelper;
import com.twitter.heron.common.utils.misc.TupleKeyGenerator;
import com.twitter.heron.proto.system.HeronTuples;
import com.twitter.heron.simulator.instance.OutgoingTupleCollection;
import com.twitter.heron.simulator.instance.RootTupleInfo;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.logging.Level;
import java.util.logging.Logger;

public class SpoutOutputCollectorImpl
implements ISpoutOutputCollector {
    private static final Logger LOG = Logger.getLogger(SpoutOutputCollectorImpl.class.getName());
    private final LinkedHashMap<Long, RootTupleInfo> inFlightTuples;
    private final TupleKeyGenerator keyGenerator;
    private final SpoutMetrics spoutMetrics;
    private final PhysicalPlanHelper helper;
    private final boolean ackingEnabled;
    private final Queue<RootTupleInfo> immediateAcks;
    private final IPluggableSerializer serializer;
    private final OutgoingTupleCollection outputter;
    private long totalTuplesEmitted;

    public SpoutOutputCollectorImpl(IPluggableSerializer iPluggableSerializer, PhysicalPlanHelper physicalPlanHelper, Communicator<HeronTuples.HeronTupleSet> communicator, SpoutMetrics spoutMetrics) {
        if (physicalPlanHelper.getMySpout() == null) {
            throw new RuntimeException(physicalPlanHelper.getMyTaskId() + " is not a spout ");
        }
        this.serializer = iPluggableSerializer;
        this.helper = physicalPlanHelper;
        this.spoutMetrics = spoutMetrics;
        this.keyGenerator = new TupleKeyGenerator();
        this.inFlightTuples = new LinkedHashMap();
        Map<String, Object> map = physicalPlanHelper.getTopologyContext().getTopologyConfig();
        this.ackingEnabled = map.containsKey("topology.acking") && map.get("topology.acking") != null ? Boolean.parseBoolean(map.get("topology.acking").toString()) : false;
        this.immediateAcks = !this.ackingEnabled ? new ArrayDeque<RootTupleInfo>() : null;
        this.outputter = new OutgoingTupleCollection(physicalPlanHelper, communicator);
    }

    @Override
    public List<Integer> emit(String string, List<Object> list, Object object) {
        return this.admitSpoutTuple(string, list, object);
    }

    @Override
    public void emitDirect(int n, String string, List<Object> list, Object object) {
        this.admitSpoutTuple(n, string, list, object);
    }

    @Override
    public void reportError(Throwable throwable) {
        Exception exception = new Exception("Reporting an error in topology code", throwable);
        LOG.log(Level.SEVERE, "Error stack trace ", exception);
    }

    public boolean isOutQueuesAvailable() {
        return this.outputter.isOutQueuesAvailable();
    }

    public long getTotalDataEmittedInBytes() {
        return this.outputter.getTotalDataEmittedInBytes();
    }

    public void sendOutTuples() {
        this.outputter.sendOutTuples();
    }

    public long getTotalTuplesEmitted() {
        return this.totalTuplesEmitted;
    }

    public int numInFlight() {
        return this.inFlightTuples.size();
    }

    public Queue<RootTupleInfo> getImmediateAcks() {
        return this.immediateAcks;
    }

    public RootTupleInfo retireInFlight(long l) {
        return (RootTupleInfo)this.inFlightTuples.remove(l);
    }

    public List<RootTupleInfo> retireExpired(long l) {
        RootTupleInfo rootTupleInfo;
        ArrayList<RootTupleInfo> arrayList = new ArrayList<RootTupleInfo>();
        long l2 = System.nanoTime();
        Iterator<RootTupleInfo> iterator = this.inFlightTuples.values().iterator();
        while (iterator.hasNext() && (rootTupleInfo = iterator.next()).isExpired(l2, l)) {
            arrayList.add(rootTupleInfo);
            iterator.remove();
        }
        return arrayList;
    }

    public void clear() {
        this.outputter.clear();
    }

    private List<Integer> admitSpoutTuple(String string, List<Object> list, Object object) {
        this.helper.checkOutputSchema(string, list);
        List<Integer> list2 = this.helper.chooseTasksForCustomStreamGrouping(string, list);
        this.helper.getTopologyContext().invokeHookEmit(list, string, list2);
        HeronTuples.HeronDataTuple.Builder builder = HeronTuples.HeronDataTuple.newBuilder();
        builder.setKey(0L);
        if (list2 != null) {
            for (Integer object2 : list2) {
                builder.addDestTaskIds(object2);
            }
        }
        if (object != null) {
            RootTupleInfo rootTupleInfo = new RootTupleInfo(string, object);
            if (this.ackingEnabled) {
                HeronTuples.RootId.Builder builder2 = this.EstablishRootId(rootTupleInfo);
                builder.addRoots(builder2);
            } else {
                this.immediateAcks.offer(rootTupleInfo);
            }
        }
        long l = 0L;
        long l2 = System.nanoTime();
        for (Object object2 : list) {
            byte[] byArray = this.serializer.serialize(object2);
            ByteString byteString = ByteString.copyFrom(byArray);
            builder.addValues(byteString);
            l += (long)byArray.length;
        }
        long l3 = System.nanoTime() - l2;
        this.spoutMetrics.serializeDataTuple(string, l3);
        this.outputter.addDataTuple(string, builder, l);
        ++this.totalTuplesEmitted;
        this.spoutMetrics.emittedTuple(string);
        return null;
    }

    private void admitSpoutTuple(int n, String string, List<Object> list, Object object) {
        throw new RuntimeException("emitDirect Not implemented");
    }

    private HeronTuples.RootId.Builder EstablishRootId(RootTupleInfo rootTupleInfo) {
        long l = this.keyGenerator.next();
        HeronTuples.RootId.Builder builder = HeronTuples.RootId.newBuilder();
        builder.setTaskid(this.helper.getMyTaskId());
        builder.setKey(l);
        this.inFlightTuples.put(l, rootTupleInfo);
        return builder;
    }
}

