/*
 * Decompiled with CFR 0.152.
 */
package kafka.tools;

import com.typesafe.scalalogging.Logger;
import java.io.Serializable;
import java.lang.invoke.MethodHandle;
import java.lang.invoke.SerializedLambda;
import java.util.Properties;
import joptsimple.ArgumentAcceptingOptionSpec;
import joptsimple.OptionParser;
import joptsimple.OptionSet;
import joptsimple.OptionSpec;
import joptsimple.OptionSpecBuilder;
import kafka.api.FetchRequest;
import kafka.api.FetchRequestBuilder;
import kafka.api.FetchResponse;
import kafka.api.OffsetRequest$;
import kafka.api.PartitionMetadata;
import kafka.api.Request$;
import kafka.api.TopicMetadata;
import kafka.client.ClientUtils$;
import kafka.cluster.BrokerEndPoint;
import kafka.common.MessageFormatter;
import kafka.common.TopicAndPartition;
import kafka.consumer.ConsumerConfig$;
import kafka.consumer.SimpleConsumer;
import kafka.message.ByteBufferMessageSet;
import kafka.message.Message;
import kafka.message.MessageAndOffset;
import kafka.tools.DefaultMessageFormatter;
import kafka.tools.SimpleConsumerShell$;
import kafka.utils.CommandLineUtils$;
import kafka.utils.Exit$;
import kafka.utils.Logging;
import kafka.utils.ToolsUtils$;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.utils.KafkaThread;
import org.apache.kafka.common.utils.Utils;
import scala.Function0;
import scala.Function1;
import scala.Option;
import scala.Predef$;
import scala.StringContext;
import scala.collection.Iterable;
import scala.collection.JavaConverters$;
import scala.collection.Seq;
import scala.collection.Set;
import scala.collection.immutable.StringOps;
import scala.collection.mutable.ArrayOps;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.IntRef;
import scala.runtime.LambdaDeserialize;
import scala.runtime.LongRef;
import scala.runtime.ObjectRef;

public final class SimpleConsumerShell$
implements Logging {
    public static SimpleConsumerShell$ MODULE$;
    private Logger logger;
    private String logIdent;
    private volatile boolean bitmap$0;

    static {
        new SimpleConsumerShell$();
    }

    @Override
    public String loggerName() {
        return Logging.loggerName$(this);
    }

    @Override
    public String msgWithLogIdent(String msg) {
        return Logging.msgWithLogIdent$(this, msg);
    }

    @Override
    public void trace(Function0<String> msg) {
        Logging.trace$(this, msg);
    }

    @Override
    public void trace(Function0<String> msg, Function0<Throwable> e) {
        Logging.trace$(this, msg, e);
    }

    @Override
    public boolean isDebugEnabled() {
        return Logging.isDebugEnabled$(this);
    }

    @Override
    public boolean isTraceEnabled() {
        return Logging.isTraceEnabled$(this);
    }

    @Override
    public void debug(Function0<String> msg) {
        Logging.debug$(this, msg);
    }

    @Override
    public void debug(Function0<String> msg, Function0<Throwable> e) {
        Logging.debug$(this, msg, e);
    }

    @Override
    public void info(Function0<String> msg) {
        Logging.info$(this, msg);
    }

    @Override
    public void info(Function0<String> msg, Function0<Throwable> e) {
        Logging.info$(this, msg, e);
    }

    @Override
    public void warn(Function0<String> msg) {
        Logging.warn$(this, msg);
    }

    @Override
    public void warn(Function0<String> msg, Function0<Throwable> e) {
        Logging.warn$(this, msg, e);
    }

    @Override
    public void error(Function0<String> msg) {
        Logging.error$(this, msg);
    }

    @Override
    public void error(Function0<String> msg, Function0<Throwable> e) {
        Logging.error$(this, msg, e);
    }

    @Override
    public void fatal(Function0<String> msg) {
        Logging.fatal$(this, msg);
    }

    @Override
    public void fatal(Function0<String> msg, Function0<Throwable> e) {
        Logging.fatal$(this, msg, e);
    }

    private Logger logger$lzycompute() {
        SimpleConsumerShell$ simpleConsumerShell$ = this;
        synchronized (simpleConsumerShell$) {
            if (!this.bitmap$0) {
                this.logger = Logging.logger$(this);
                this.bitmap$0 = true;
            }
        }
        return this.logger;
    }

    @Override
    public Logger logger() {
        return !this.bitmap$0 ? this.logger$lzycompute() : this.logger;
    }

    @Override
    public String logIdent() {
        return this.logIdent;
    }

    @Override
    public void logIdent_$eq(String x$1) {
        this.logIdent = x$1;
    }

    public int UseLeaderReplica() {
        return -1;
    }

    public void main(String[] args) {
        this.warn((Function0<String> & Serializable & scala.Serializable)() -> "WARNING: SimpleConsumerShell is deprecated and will be dropped in a future release following 0.11.0.0.");
        OptionParser parser = new OptionParser(false);
        ArgumentAcceptingOptionSpec<String> brokerListOpt = parser.accepts("broker-list", "REQUIRED: The list of hostname and port of the server to connect to.").withRequiredArg().describedAs("hostname:port,...,hostname:port").ofType(String.class);
        ArgumentAcceptingOptionSpec<String> topicOpt = parser.accepts("topic", "REQUIRED: The topic to consume from.").withRequiredArg().describedAs("topic").ofType(String.class);
        ArgumentAcceptingOptionSpec<Integer> partitionIdOpt = parser.accepts("partition", "The partition to consume from.").withRequiredArg().describedAs("partition").ofType(Integer.class).defaultsTo(Predef$.MODULE$.int2Integer(0), (Integer[])((Object[])new Integer[0]));
        ArgumentAcceptingOptionSpec<Integer> replicaIdOpt = parser.accepts("replica", "The replica id to consume from, default -1 means leader broker.").withRequiredArg().describedAs("replica id").ofType(Integer.class).defaultsTo(Predef$.MODULE$.int2Integer(this.UseLeaderReplica()), (Integer[])((Object[])new Integer[0]));
        ArgumentAcceptingOptionSpec<Long> offsetOpt = parser.accepts("offset", "The offset id to consume from, default to -2 which means from beginning; while value -1 means from end").withRequiredArg().describedAs("consume offset").ofType(Long.class).defaultsTo(Predef$.MODULE$.long2Long(OffsetRequest$.MODULE$.EarliestTime()), (Long[])((Object[])new Long[0]));
        ArgumentAcceptingOptionSpec<String> clientIdOpt = parser.accepts("clientId", "The ID of this client.").withRequiredArg().describedAs("clientId").ofType(String.class).defaultsTo("SimpleConsumerShell", (String[])((Object[])new String[0]));
        ArgumentAcceptingOptionSpec<Integer> fetchSizeOpt = parser.accepts("fetchsize", "The fetch size of each request.").withRequiredArg().describedAs("fetchsize").ofType(Integer.class).defaultsTo(Predef$.MODULE$.int2Integer(0x100000), (Integer[])((Object[])new Integer[0]));
        ArgumentAcceptingOptionSpec<String> messageFormatterOpt = parser.accepts("formatter", "The name of a class to use for formatting kafka messages for display.").withRequiredArg().describedAs("class").ofType(String.class).defaultsTo(DefaultMessageFormatter.class.getName(), (String[])((Object[])new String[0]));
        ArgumentAcceptingOptionSpec<String> messageFormatterArgOpt = parser.accepts("property").withRequiredArg().describedAs("prop").ofType(String.class);
        OptionSpecBuilder printOffsetOpt = parser.accepts("print-offsets", "Print the offsets returned by the iterator");
        ArgumentAcceptingOptionSpec<Integer> maxWaitMsOpt = parser.accepts("max-wait-ms", "The max amount of time each fetch request waits.").withRequiredArg().describedAs("ms").ofType(Integer.class).defaultsTo(Predef$.MODULE$.int2Integer(1000), (Integer[])((Object[])new Integer[0]));
        ArgumentAcceptingOptionSpec<Integer> maxMessagesOpt = parser.accepts("max-messages", "The number of messages to consume").withRequiredArg().describedAs("max-messages").ofType(Integer.class).defaultsTo(Predef$.MODULE$.int2Integer(Integer.MAX_VALUE), (Integer[])((Object[])new Integer[0]));
        OptionSpecBuilder skipMessageOnErrorOpt = parser.accepts("skip-message-on-error", "If there is an error when processing a message, skip it instead of halt.");
        OptionSpecBuilder noWaitAtEndOfLogOpt = parser.accepts("no-wait-at-logend", "If set, when the simple consumer reaches the end of the Log, it will stop, not waiting for new produced messages");
        if (args.length == 0) {
            throw CommandLineUtils$.MODULE$.printUsageAndDie(parser, "A low-level tool for fetching data directly from a particular replica.");
        }
        OptionSet options = parser.parse(args);
        CommandLineUtils$.MODULE$.checkRequiredArgs(parser, options, Predef$.MODULE$.wrapRefArray((Object[])new OptionSpec[]{brokerListOpt, topicOpt}));
        String topic = options.valueOf(topicOpt);
        int partitionId = options.valueOf(partitionIdOpt);
        int replicaId = options.valueOf(replicaIdOpt);
        LongRef startingOffset = LongRef.create(options.valueOf(offsetOpt));
        int fetchSize = options.valueOf(fetchSizeOpt);
        String clientId = options.valueOf(clientIdOpt).toString();
        int maxWaitMs = options.valueOf(maxWaitMsOpt);
        int maxMessages = options.valueOf(maxMessagesOpt);
        boolean skipMessageOnError = options.has(skipMessageOnErrorOpt);
        boolean printOffsets = options.has(printOffsetOpt);
        boolean noWaitAtEndOfLog = options.has(noWaitAtEndOfLogOpt);
        Class<?> messageFormatterClass = Class.forName(options.valueOf(messageFormatterOpt));
        Properties formatterArgs = CommandLineUtils$.MODULE$.parseKeyValueArgs((Iterable<String>)JavaConverters$.MODULE$.asScalaBufferConverter(options.valuesOf(messageFormatterArgOpt)).asScala(), CommandLineUtils$.MODULE$.parseKeyValueArgs$default$2());
        FetchRequestBuilder fetchRequestBuilder = new FetchRequestBuilder().clientId(clientId).replicaId(Request$.MODULE$.DebuggingConsumerId()).maxWait(maxWaitMs).minBytes(ConsumerConfig$.MODULE$.MinFetchBytes());
        this.info((Function0<String> & Serializable & scala.Serializable)() -> "Getting topic metadata...");
        String brokerList = options.valueOf(brokerListOpt);
        ToolsUtils$.MODULE$.validatePortOrDie(parser, brokerList);
        Seq<BrokerEndPoint> metadataTargetBrokers = ClientUtils$.MODULE$.parseBrokerList(brokerList);
        Seq<TopicMetadata> topicsMetadata = ClientUtils$.MODULE$.fetchTopicMetadata((Set)Predef$.MODULE$.Set().apply(Predef$.MODULE$.wrapRefArray((Object[])new String[]{topic})), metadataTargetBrokers, clientId, maxWaitMs, ClientUtils$.MODULE$.fetchTopicMetadata$default$5()).topicsMetadata();
        if (topicsMetadata.size() != 1 || !((TopicMetadata)topicsMetadata.head()).topic().equals(topic)) {
            System.err.println(new StringOps(Predef$.MODULE$.augmentString("Error: no valid topic metadata for topic: %s, what we get from server is only: %s")).format(Predef$.MODULE$.genericWrapArray(new Object[]{topic, topicsMetadata})));
            throw Exit$.MODULE$.exit(1, Exit$.MODULE$.exit$default$2());
        }
        Seq<PartitionMetadata> partitionsMetadata = ((TopicMetadata)topicsMetadata.head()).partitionsMetadata();
        Option partitionMetadataOpt = partitionsMetadata.find((Function1<PartitionMetadata, Object> & Serializable & scala.Serializable)p -> BoxesRunTime.boxToBoolean(SimpleConsumerShell$.$anonfun$main$3(partitionId, p)));
        if (partitionMetadataOpt.isEmpty()) {
            System.err.println(new StringOps(Predef$.MODULE$.augmentString("Error: partition %d does not exist for topic %s")).format(Predef$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToInteger(partitionId), topic})));
            throw Exit$.MODULE$.exit(1, Exit$.MODULE$.exit$default$2());
        }
        ObjectRef<Object> fetchTargetBroker = ObjectRef.create(null);
        Option<BrokerEndPoint> replicaOpt = null;
        if (replicaId == this.UseLeaderReplica()) {
            replicaOpt = ((PartitionMetadata)partitionMetadataOpt.get()).leader();
            if (replicaOpt.isEmpty()) {
                System.err.println(new StringOps(Predef$.MODULE$.augmentString("Error: user specifies to fetch from leader for partition (%s, %d) which has not been elected yet")).format(Predef$.MODULE$.genericWrapArray(new Object[]{topic, BoxesRunTime.boxToInteger(partitionId)})));
                throw Exit$.MODULE$.exit(1, Exit$.MODULE$.exit$default$2());
            }
        } else {
            Seq<BrokerEndPoint> replicasForPartition = ((PartitionMetadata)partitionMetadataOpt.get()).replicas();
            replicaOpt = replicasForPartition.find((Function1<BrokerEndPoint, Object> & Serializable & scala.Serializable)r -> BoxesRunTime.boxToBoolean(SimpleConsumerShell$.$anonfun$main$4(replicaId, r)));
            if (replicaOpt.isEmpty()) {
                System.err.println(new StringOps(Predef$.MODULE$.augmentString("Error: replica %d does not exist for partition (%s, %d)")).format(Predef$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToInteger(replicaId), topic, BoxesRunTime.boxToInteger(partitionId)})));
                throw Exit$.MODULE$.exit(1, Exit$.MODULE$.exit$default$2());
            }
        }
        fetchTargetBroker.elem = replicaOpt.get();
        if (startingOffset.elem < OffsetRequest$.MODULE$.EarliestTime()) {
            System.err.println(new StringOps(Predef$.MODULE$.augmentString("Invalid starting offset: %d")).format(Predef$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToLong(startingOffset.elem)})));
            throw Exit$.MODULE$.exit(1, Exit$.MODULE$.exit$default$2());
        }
        if (startingOffset.elem < 0L) {
            try (SimpleConsumer simpleConsumer = new SimpleConsumer(((BrokerEndPoint)fetchTargetBroker.elem).host(), ((BrokerEndPoint)fetchTargetBroker.elem).port(), ConsumerConfig$.MODULE$.SocketTimeout(), ConsumerConfig$.MODULE$.SocketBufferSize(), clientId);){
                try {
                    startingOffset.elem = simpleConsumer.earliestOrLatestOffset(new TopicAndPartition(topic, partitionId), startingOffset.elem, Request$.MODULE$.DebuggingConsumerId());
                }
                catch (Throwable t) {
                    System.err.println("Error in getting earliest or latest offset due to: " + Utils.stackTrace(t));
                    throw Exit$.MODULE$.exit(1, Exit$.MODULE$.exit$default$2());
                }
            }
        }
        MessageFormatter formatter = (MessageFormatter)messageFormatterClass.newInstance();
        formatter.init(formatterArgs);
        String replicaString = replicaId > 0 ? "leader" : "replica";
        this.info((Function0<String> & Serializable & scala.Serializable)() -> new StringOps(Predef$.MODULE$.augmentString("Starting simple consumer shell to partition [%s, %d], %s [%d], host and port: [%s, %d], from offset [%d]")).format(Predef$.MODULE$.genericWrapArray(new Object[]{topic, BoxesRunTime.boxToInteger(partitionId), replicaString, BoxesRunTime.boxToInteger(replicaId), ((BrokerEndPoint)fetchTargetBroker$1.elem).host(), BoxesRunTime.boxToInteger(((BrokerEndPoint)fetchTargetBroker$1.elem).port()), BoxesRunTime.boxToLong(startingOffset$1.elem)})));
        SimpleConsumer simpleConsumer = new SimpleConsumer(((BrokerEndPoint)fetchTargetBroker.elem).host(), ((BrokerEndPoint)fetchTargetBroker.elem).port(), 10000, 65536, clientId);
        KafkaThread thread = KafkaThread.nonDaemon("kafka-simpleconsumer-shell", new Runnable(topic, partitionId, replicaId, startingOffset, fetchSize, maxMessages, skipMessageOnError, printOffsets, noWaitAtEndOfLog, fetchRequestBuilder, formatter, simpleConsumer){
            private final String topic$1;
            private final int partitionId$1;
            private final int replicaId$1;
            private final LongRef startingOffset$1;
            private final int fetchSize$1;
            private final int maxMessages$1;
            private final boolean skipMessageOnError$1;
            private final boolean printOffsets$1;
            private final boolean noWaitAtEndOfLog$1;
            private final FetchRequestBuilder fetchRequestBuilder$1;
            private final MessageFormatter formatter$1;
            private final SimpleConsumer simpleConsumer$1;

            public void run() {
                block8: {
                    block7: {
                        LongRef offset = LongRef.create(this.startingOffset$1.elem);
                        IntRef numMessagesConsumed = IntRef.create(0);
                        try {
                            try {
                                while (numMessagesConsumed.elem < this.maxMessages$1) {
                                    FetchRequest fetchRequest = this.fetchRequestBuilder$1.addFetch(this.topic$1, this.partitionId$1, offset.elem, this.fetchSize$1).build();
                                    FetchResponse fetchResponse = this.simpleConsumer$1.fetch(fetchRequest);
                                    ByteBufferMessageSet messageSet = fetchResponse.messageSet(this.topic$1, this.partitionId$1);
                                    if (messageSet.validBytes() <= 0 && this.noWaitAtEndOfLog$1) {
                                        Predef$.MODULE$.println(new StringOps(Predef$.MODULE$.augmentString("Terminating. Reached the end of partition (%s, %d) at offset %d")).format(Predef$.MODULE$.genericWrapArray(new Object[]{this.topic$1, BoxesRunTime.boxToInteger(this.partitionId$1), BoxesRunTime.boxToLong(offset.elem)})));
                                        break block7;
                                    }
                                    SimpleConsumerShell$.MODULE$.debug((Function0<String> & Serializable & scala.Serializable)() -> "multi fetched " + messageSet.sizeInBytes() + " bytes from offset " + offset$1.elem);
                                    messageSet.withFilter((Function1<MessageAndOffset, Object> & Serializable & scala.Serializable)messageAndOffset -> BoxesRunTime.boxToBoolean(anon.1.$anonfun$run$2(this, numMessagesConsumed, messageAndOffset))).foreach((Function1<MessageAndOffset, Object> & Serializable & scala.Serializable)messageAndOffset -> {
                                        anon.1.$anonfun$run$3(this, offset, numMessagesConsumed, messageAndOffset);
                                        return BoxedUnit.UNIT;
                                    });
                                }
                                break block8;
                            }
                            catch (Throwable e) {
                                SimpleConsumerShell$.MODULE$.error((Function0<String> & Serializable & scala.Serializable)() -> new StringOps(Predef$.MODULE$.augmentString("Error consuming topic, partition, replica (%s, %d, %d) with offset [%d]")).format(Predef$.MODULE$.genericWrapArray(new Object[]{$this.topic$1, BoxesRunTime.boxToInteger($this.partitionId$1), BoxesRunTime.boxToInteger($this.replicaId$1), BoxesRunTime.boxToLong(offset$1.elem)})), (Function0<Throwable> & Serializable & scala.Serializable)() -> e);
                                break block8;
                            }
                        }
                        finally {
                            SimpleConsumerShell$.MODULE$.info((Function0<String> & Serializable & scala.Serializable)() -> new StringContext(Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Consumed ", " messages"})).s(Predef$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToInteger(numMessagesConsumed$1.elem)})));
                        }
                    }
                    return;
                }
            }

            public static final /* synthetic */ boolean $anonfun$run$2(anon.1 $this, IntRef numMessagesConsumed$1, MessageAndOffset messageAndOffset) {
                return numMessagesConsumed$1.elem < $this.maxMessages$1;
            }

            public static final /* synthetic */ void $anonfun$run$3(anon.1 $this, LongRef offset$1, IntRef numMessagesConsumed$1, MessageAndOffset messageAndOffset) {
                try {
                    Message message;
                    offset$1.elem = messageAndOffset.nextOffset();
                    if ($this.printOffsets$1) {
                        System.out.println("next offset = " + offset$1.elem);
                    }
                    byte[] key = (message = messageAndOffset.message()).hasKey() ? Utils.readBytes(message.key()) : null;
                    byte[] value = message.isNull() ? null : Utils.readBytes(message.payload());
                    int serializedKeySize = message.hasKey() ? new ArrayOps.ofByte(Predef$.MODULE$.byteArrayOps(key)).size() : -1;
                    int serializedValueSize = message.isNull() ? -1 : new ArrayOps.ofByte(Predef$.MODULE$.byteArrayOps(value)).size();
                    $this.formatter$1.writeTo(new ConsumerRecord<byte[], byte[]>($this.topic$1, $this.partitionId$1, offset$1.elem, message.timestamp(), message.timestampType(), message.checksum(), serializedKeySize, serializedValueSize, key, value), System.out);
                    ++numMessagesConsumed$1.elem;
                }
                catch (Throwable e) {
                    if ($this.skipMessageOnError$1) {
                        SimpleConsumerShell$.MODULE$.error((Function0<String> & Serializable & scala.Serializable)() -> "Error processing message, skipping this message: ", (Function0<Throwable> & Serializable & scala.Serializable)() -> e);
                    }
                    throw e;
                }
                if (System.out.checkError()) {
                    System.err.println("Unable to write to standard out, closing consumer.");
                    $this.formatter$1.close();
                    $this.simpleConsumer$1.close();
                    throw Exit$.MODULE$.exit(1, Exit$.MODULE$.exit$default$2());
                }
            }
            {
                this.topic$1 = topic$1;
                this.partitionId$1 = partitionId$1;
                this.replicaId$1 = replicaId$1;
                this.startingOffset$1 = startingOffset$1;
                this.fetchSize$1 = fetchSize$1;
                this.maxMessages$1 = maxMessages$1;
                this.skipMessageOnError$1 = skipMessageOnError$1;
                this.printOffsets$1 = printOffsets$1;
                this.noWaitAtEndOfLog$1 = noWaitAtEndOfLog$1;
                this.fetchRequestBuilder$1 = fetchRequestBuilder$1;
                this.formatter$1 = formatter$1;
                this.simpleConsumer$1 = simpleConsumer$1;
            }

            private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
                return LambdaDeserialize.bootstrap("lambdaDeserialize", new MethodHandle[]{$anonfun$run$1(scala.runtime.LongRef kafka.message.ByteBufferMessageSet ), $anonfun$run$2$adapted(kafka.tools.SimpleConsumerShell$$anon$1 scala.runtime.IntRef kafka.message.MessageAndOffset ), $anonfun$run$3$adapted(kafka.tools.SimpleConsumerShell$$anon$1 scala.runtime.LongRef scala.runtime.IntRef kafka.message.MessageAndOffset ), $anonfun$run$6(kafka.tools.SimpleConsumerShell$$anon$1 scala.runtime.LongRef ), $anonfun$run$7(java.lang.Throwable ), $anonfun$run$8(scala.runtime.IntRef ), $anonfun$run$4(), $anonfun$run$5(java.lang.Throwable )}, serializedLambda);
            }
        });
        thread.start();
        thread.join();
        System.out.flush();
        formatter.close();
        simpleConsumer.close();
    }

    public static final /* synthetic */ boolean $anonfun$main$3(int partitionId$1, PartitionMetadata p) {
        return p.partitionId() == partitionId$1;
    }

    public static final /* synthetic */ boolean $anonfun$main$4(int replicaId$1, BrokerEndPoint r) {
        return r.id() == replicaId$1;
    }

    private SimpleConsumerShell$() {
        MODULE$ = this;
        Logging.$init$(this);
    }
}

