/*
 * Decompiled with CFR 0.152.
 */
package kafka.tools;

import com.typesafe.scalalogging.LazyLogging;
import com.typesafe.scalalogging.Logger;
import java.io.Serializable;
import java.text.SimpleDateFormat;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import kafka.consumer.Consumer$;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerConnector;
import kafka.consumer.KafkaStream;
import kafka.tools.ConsumerPerformance;
import kafka.utils.ToolsUtils$;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import scala.Function1;
import scala.MatchError;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Tuple2;
import scala.Tuple2$mcDD$sp;
import scala.collection.Iterable;
import scala.collection.JavaConverters$;
import scala.collection.Map;
import scala.collection.immutable.List;
import scala.collection.immutable.List$;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.StringOps;
import scala.collection.mutable.ArrayOps;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.LongRef;
import scala.runtime.ObjectRef;
import scala.runtime.RichInt$;
import scala.runtime.java8.JFunction1$mcVI$sp;

public final class ConsumerPerformance$
implements LazyLogging {
    public static ConsumerPerformance$ MODULE$;
    private Logger logger;
    private volatile boolean bitmap$0;

    static {
        new ConsumerPerformance$();
    }

    private Logger logger$lzycompute() {
        ConsumerPerformance$ consumerPerformance$ = this;
        synchronized (consumerPerformance$) {
            if (!this.bitmap$0) {
                this.logger = LazyLogging.logger$(this);
                this.bitmap$0 = true;
            }
        }
        return this.logger;
    }

    @Override
    public Logger logger() {
        return !this.bitmap$0 ? this.logger$lzycompute() : this.logger;
    }

    public void main(String[] args) {
        block12: {
            BoxedUnit boxedUnit;
            ConsumerPerformance.ConsumerPerfConfig config = new ConsumerPerformance.ConsumerPerfConfig(args);
            if (this.logger().underlying().isInfoEnabled()) {
                this.logger().underlying().info("Starting consumer...");
                boxedUnit = BoxedUnit.UNIT;
            } else {
                boxedUnit = BoxedUnit.UNIT;
            }
            AtomicLong totalMessagesRead = new AtomicLong(0L);
            AtomicLong totalBytesRead = new AtomicLong(0L);
            AtomicBoolean consumerTimeout = new AtomicBoolean(false);
            scala.collection.mutable.Map<MetricName, Metric> metrics = null;
            AtomicLong joinGroupTimeInMs = new AtomicLong(0L);
            if (!config.hideHeader()) {
                this.printHeader(config.showDetailedStats(), config.useOldConsumer());
            }
            long startMs = 0L;
            long endMs = 0L;
            if (!config.useOldConsumer()) {
                KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<byte[], byte[]>(config.props());
                consumer.subscribe(Collections.singletonList(config.topic()));
                startMs = System.currentTimeMillis();
                this.consume(consumer, (List<String>)List$.MODULE$.apply(Predef$.MODULE$.wrapRefArray((Object[])new String[]{config.topic()})), config.numMessages(), 1000L, config, totalMessagesRead, totalBytesRead, joinGroupTimeInMs, startMs);
                endMs = System.currentTimeMillis();
                if (config.printMetrics()) {
                    metrics = JavaConverters$.MODULE$.mapAsScalaMapConverter(consumer.metrics()).asScala();
                }
                consumer.close();
            } else {
                BoxedUnit boxedUnit2;
                BoxedUnit boxedUnit3;
                ConsumerConfig consumerConfig = new ConsumerConfig(config.props());
                ConsumerConnector consumerConnector = Consumer$.MODULE$.create(consumerConfig);
                Map<String, List<KafkaStream<byte[], byte[]>>> topicMessageStreams = consumerConnector.createMessageStreams((Map)Predef$.MODULE$.Map().apply(Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(config.topic()), BoxesRunTime.boxToInteger(config.numThreads()))})));
                ObjectRef<Nil$> threadList = ObjectRef.create(Nil$.MODULE$);
                topicMessageStreams.values().foreach((Function1<List, Object> & Serializable & scala.Serializable)streamList -> {
                    ConsumerPerformance$.$anonfun$main$1(config, totalMessagesRead, totalBytesRead, consumerTimeout, threadList, streamList);
                    return BoxedUnit.UNIT;
                });
                if (this.logger().underlying().isInfoEnabled()) {
                    this.logger().underlying().info("Sleeping for 1 second.");
                    boxedUnit3 = BoxedUnit.UNIT;
                } else {
                    boxedUnit3 = BoxedUnit.UNIT;
                }
                Thread.sleep(1000L);
                if (this.logger().underlying().isInfoEnabled()) {
                    this.logger().underlying().info("starting threads");
                    boxedUnit2 = BoxedUnit.UNIT;
                } else {
                    boxedUnit2 = BoxedUnit.UNIT;
                }
                startMs = System.currentTimeMillis();
                ((List)threadList.elem).foreach((Function1<ConsumerPerformance.ConsumerPerfThread, Object> & Serializable & scala.Serializable)thread -> {
                    thread.start();
                    return BoxedUnit.UNIT;
                });
                ((List)threadList.elem).foreach((Function1<ConsumerPerformance.ConsumerPerfThread, Object> & Serializable & scala.Serializable)thread -> {
                    thread.join();
                    return BoxedUnit.UNIT;
                });
                endMs = consumerTimeout.get() ? System.currentTimeMillis() - (long)consumerConfig.consumerTimeoutMs() : System.currentTimeMillis();
                consumerConnector.shutdown();
            }
            double elapsedSecs = (double)(endMs - startMs) / 1000.0;
            long fetchTimeInMs = endMs - startMs - joinGroupTimeInMs.get();
            if (!config.showDetailedStats()) {
                double totalMBRead = (double)totalBytesRead.get() * 1.0 / (double)0x100000;
                Predef$.MODULE$.print(new StringOps(Predef$.MODULE$.augmentString("%s, %s, %.4f, %.4f, %d, %.4f")).format(Predef$.MODULE$.genericWrapArray(new Object[]{config.dateFormat().format(BoxesRunTime.boxToLong(startMs)), config.dateFormat().format(BoxesRunTime.boxToLong(endMs)), BoxesRunTime.boxToDouble(totalMBRead), BoxesRunTime.boxToDouble(totalMBRead / elapsedSecs), BoxesRunTime.boxToLong(totalMessagesRead.get()), BoxesRunTime.boxToDouble((double)totalMessagesRead.get() / elapsedSecs)})));
                if (!config.useOldConsumer()) {
                    Predef$.MODULE$.print(new StringOps(Predef$.MODULE$.augmentString(", %d, %d, %.4f, %.4f")).format(Predef$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToLong(joinGroupTimeInMs.get()), BoxesRunTime.boxToLong(fetchTimeInMs), BoxesRunTime.boxToDouble(totalMBRead / ((double)fetchTimeInMs / 1000.0)), BoxesRunTime.boxToDouble((double)totalMessagesRead.get() / ((double)fetchTimeInMs / 1000.0))})));
                }
                Predef$.MODULE$.println();
            }
            if (metrics == null) break block12;
            ToolsUtils$.MODULE$.printMetrics(metrics);
        }
    }

    public void printHeader(boolean showDetailedStats, boolean useOldConsumer) {
        String newFieldsInHeader;
        String string = newFieldsInHeader = !useOldConsumer ? ", rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec" : "";
        if (!showDetailedStats) {
            Predef$.MODULE$.println("start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec" + newFieldsInHeader);
        } else {
            Predef$.MODULE$.println("time, threadId, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec" + newFieldsInHeader);
        }
    }

    public void consume(KafkaConsumer<byte[], byte[]> consumer, List<String> topics, long count, long timeout, ConsumerPerformance.ConsumerPerfConfig config, AtomicLong totalMessagesRead, AtomicLong totalBytesRead, AtomicLong joinTime, long testStartTime) {
        LongRef bytesRead = LongRef.create(0L);
        LongRef messagesRead = LongRef.create(0L);
        LongRef lastBytesRead = LongRef.create(0L);
        LongRef lastMessagesRead = LongRef.create(0L);
        LongRef joinStart = LongRef.create(0L);
        LongRef joinTimeMsInSingleRound = LongRef.create(0L);
        consumer.subscribe((Collection<String>)JavaConverters$.MODULE$.seqAsJavaListConverter(topics).asJava(), new ConsumerRebalanceListener(joinTime, joinStart, joinTimeMsInSingleRound){
            private final AtomicLong joinTime$1;
            private final LongRef joinStart$1;
            private final LongRef joinTimeMsInSingleRound$1;

            public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                this.joinTime$1.addAndGet(System.currentTimeMillis() - this.joinStart$1.elem);
                this.joinTimeMsInSingleRound$1.elem += System.currentTimeMillis() - this.joinStart$1.elem;
            }

            public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
                this.joinStart$1.elem = System.currentTimeMillis();
            }
            {
                this.joinTime$1 = joinTime$1;
                this.joinStart$1 = joinStart$1;
                this.joinTimeMsInSingleRound$1 = joinTimeMsInSingleRound$1;
            }
        });
        consumer.poll(0L);
        consumer.seekToBeginning(Collections.emptyList());
        long startMs = System.currentTimeMillis();
        LongRef lastReportTime = LongRef.create(startMs);
        long lastConsumedTime = System.currentTimeMillis();
        LongRef currentTimeMillis = LongRef.create(lastConsumedTime);
        while (messagesRead.elem < count && currentTimeMillis.elem - lastConsumedTime <= timeout) {
            Iterable records = JavaConverters$.MODULE$.iterableAsScalaIterableConverter(consumer.poll(100L)).asScala();
            currentTimeMillis.elem = System.currentTimeMillis();
            if (records.nonEmpty()) {
                lastConsumedTime = currentTimeMillis.elem;
            }
            records.foreach((Function1<ConsumerRecord, Object> & Serializable & scala.Serializable)record -> {
                ConsumerPerformance$.$anonfun$consume$1(config, bytesRead, messagesRead, lastBytesRead, lastMessagesRead, joinTimeMsInSingleRound, lastReportTime, currentTimeMillis, record);
                return BoxedUnit.UNIT;
            });
        }
        totalMessagesRead.set(messagesRead.elem);
        totalBytesRead.set(bytesRead.elem);
    }

    public void printOldConsumerProgress(int id, long bytesRead, long lastBytesRead, long messagesRead, long lastMessagesRead, long startMs, long endMs, SimpleDateFormat dateFormat) {
        this.printBasicProgress(id, bytesRead, lastBytesRead, messagesRead, lastMessagesRead, startMs, endMs, dateFormat);
        Predef$.MODULE$.println();
    }

    public void printNewConsumerProgress(int id, long bytesRead, long lastBytesRead, long messagesRead, long lastMessagesRead, long startMs, long endMs, SimpleDateFormat dateFormat, long periodicJoinTimeInMs) {
        this.printBasicProgress(id, bytesRead, lastBytesRead, messagesRead, lastMessagesRead, startMs, endMs, dateFormat);
        this.printExtendedProgress(bytesRead, lastBytesRead, messagesRead, lastMessagesRead, startMs, endMs, periodicJoinTimeInMs);
        Predef$.MODULE$.println();
    }

    private void printBasicProgress(int id, long bytesRead, long lastBytesRead, long messagesRead, long lastMessagesRead, long startMs, long endMs, SimpleDateFormat dateFormat) {
        double elapsedMs = endMs - startMs;
        double totalMbRead = (double)bytesRead * 1.0 / (double)0x100000;
        double intervalMbRead = (double)(bytesRead - lastBytesRead) * 1.0 / (double)0x100000;
        double intervalMbPerSec = 1000.0 * intervalMbRead / elapsedMs;
        double intervalMessagesPerSec = (double)(messagesRead - lastMessagesRead) / elapsedMs * 1000.0;
        Predef$.MODULE$.print(new StringOps(Predef$.MODULE$.augmentString("%s, %d, %.4f, %.4f, %d, %.4f")).format(Predef$.MODULE$.genericWrapArray(new Object[]{dateFormat.format(BoxesRunTime.boxToLong(endMs)), BoxesRunTime.boxToInteger(id), BoxesRunTime.boxToDouble(totalMbRead), BoxesRunTime.boxToDouble(intervalMbPerSec), BoxesRunTime.boxToLong(messagesRead), BoxesRunTime.boxToDouble(intervalMessagesPerSec)})));
    }

    private void printExtendedProgress(long bytesRead, long lastBytesRead, long messagesRead, long lastMessagesRead, long startMs, long endMs, long periodicJoinTimeInMs) {
        Tuple2$mcDD$sp tuple2$mcDD$sp;
        long fetchTimeMs = endMs - startMs - periodicJoinTimeInMs;
        double intervalMbRead = (double)(bytesRead - lastBytesRead) * 1.0 / (double)0x100000;
        long intervalMessagesRead = messagesRead - lastMessagesRead;
        Tuple2$mcDD$sp tuple2$mcDD$sp2 = tuple2$mcDD$sp = fetchTimeMs <= 0L ? new Tuple2$mcDD$sp(0.0, 0.0) : new Tuple2$mcDD$sp(1000.0 * intervalMbRead / (double)fetchTimeMs, 1000.0 * (double)intervalMessagesRead / (double)fetchTimeMs);
        if (tuple2$mcDD$sp == null) {
            throw new MatchError(tuple2$mcDD$sp);
        }
        double intervalMbPerSec = ((Tuple2)tuple2$mcDD$sp)._1$mcD$sp();
        double intervalMessagesPerSec = ((Tuple2)tuple2$mcDD$sp)._2$mcD$sp();
        Tuple2$mcDD$sp tuple2$mcDD$sp3 = new Tuple2$mcDD$sp(intervalMbPerSec, intervalMessagesPerSec);
        Tuple2$mcDD$sp tuple2$mcDD$sp4 = tuple2$mcDD$sp3;
        double intervalMbPerSec2 = ((Tuple2)tuple2$mcDD$sp4)._1$mcD$sp();
        double intervalMessagesPerSec2 = ((Tuple2)tuple2$mcDD$sp4)._2$mcD$sp();
        Predef$.MODULE$.print(new StringOps(Predef$.MODULE$.augmentString(", %d, %d, %.4f, %.4f")).format(Predef$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToLong(periodicJoinTimeInMs), BoxesRunTime.boxToLong(fetchTimeMs), BoxesRunTime.boxToDouble(intervalMbPerSec2), BoxesRunTime.boxToDouble(intervalMessagesPerSec2)})));
    }

    public static final /* synthetic */ void $anonfun$main$1(ConsumerPerformance.ConsumerPerfConfig config$1, AtomicLong totalMessagesRead$1, AtomicLong totalBytesRead$1, AtomicBoolean consumerTimeout$1, ObjectRef threadList$1, List streamList) {
        RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), streamList.length()).foreach$mVc$sp((JFunction1$mcVI$sp & scala.Serializable)i -> {
            threadList$1.elem = ((List)threadList$1.elem).$colon$colon(new ConsumerPerformance.ConsumerPerfThread(i, "kafka-zk-consumer-" + i, (KafkaStream)streamList.apply(i), config$1, totalMessagesRead$1, totalBytesRead$1, consumerTimeout$1));
        });
    }

    public static final /* synthetic */ void $anonfun$consume$1(ConsumerPerformance.ConsumerPerfConfig config$2, LongRef bytesRead$1, LongRef messagesRead$1, LongRef lastBytesRead$1, LongRef lastMessagesRead$1, LongRef joinTimeMsInSingleRound$1, LongRef lastReportTime$1, LongRef currentTimeMillis$1, ConsumerRecord record) {
        block3: {
            ++messagesRead$1.elem;
            if (record.key() != null) {
                bytesRead$1.elem += (long)new ArrayOps.ofByte(Predef$.MODULE$.byteArrayOps((byte[])record.key())).size();
            }
            if (record.value() != null) {
                bytesRead$1.elem += (long)new ArrayOps.ofByte(Predef$.MODULE$.byteArrayOps((byte[])record.value())).size();
            }
            if (currentTimeMillis$1.elem - lastReportTime$1.elem < (long)config$2.reportingInterval()) break block3;
            if (config$2.showDetailedStats()) {
                MODULE$.printNewConsumerProgress(0, bytesRead$1.elem, lastBytesRead$1.elem, messagesRead$1.elem, lastMessagesRead$1.elem, lastReportTime$1.elem, currentTimeMillis$1.elem, config$2.dateFormat(), joinTimeMsInSingleRound$1.elem);
            }
            joinTimeMsInSingleRound$1.elem = 0L;
            lastReportTime$1.elem = currentTimeMillis$1.elem;
            lastMessagesRead$1.elem = messagesRead$1.elem;
            lastBytesRead$1.elem = bytesRead$1.elem;
        }
    }

    private ConsumerPerformance$() {
        MODULE$ = this;
        LazyLogging.$init$(this);
    }
}

