/*
 * Decompiled with CFR 0.152.
 */
package kafka.tools;

import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.InputStreamReader;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.nio.file.Path;
import java.nio.file.attribute.FileAttribute;
import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.Properties;
import java.util.Random;
import java.util.Set;
import joptsimple.ArgumentAcceptingOptionSpec;
import joptsimple.OptionParser;
import joptsimple.OptionSet;
import joptsimple.OptionSpec;
import kafka.tools.TestRecord;
import kafka.tools.TestRecord$;
import kafka.utils.CommandLineUtils$;
import kafka.utils.Exit$;
import kafka.utils.TestUtils$;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.utils.AbstractIterator;
import org.apache.kafka.common.utils.Utils;
import scala.Function0;
import scala.Function1;
import scala.Predef;
import scala.Predef$;
import scala.Serializable;
import scala.StringContext;
import scala.Tuple2;
import scala.collection.IterableLike;
import scala.collection.Iterator;
import scala.collection.JavaConverters$;
import scala.collection.Map;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.SetLike;
import scala.collection.TraversableOnce;
import scala.collection.immutable.IndexedSeq$;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.StringOps;
import scala.collection.mutable.StringBuilder;
import scala.reflect.ClassTag$;
import scala.runtime.BoxesRunTime;
import scala.runtime.ObjectRef;
import scala.runtime.RichInt$;
import scala.runtime.RichLong;

public final class LogCompactionTester$ {
    public static final LogCompactionTester$ MODULE$;
    private final int ReadAheadLimit;

    static {
        new LogCompactionTester$();
    }

    private int ReadAheadLimit() {
        return this.ReadAheadLimit;
    }

    public void main(String[] args) {
        OptionParser parser = new OptionParser(false);
        ArgumentAcceptingOptionSpec numMessagesOpt = parser.accepts("messages", "The number of messages to send or consume.").withRequiredArg().describedAs("count").ofType(Long.class).defaultsTo((Object)Predef$.MODULE$.long2Long(Long.MAX_VALUE), (Object[])new Long[0]);
        ArgumentAcceptingOptionSpec messageCompressionOpt = parser.accepts("compression-type", "message compression type").withOptionalArg().describedAs("compressionType").ofType(String.class).defaultsTo((Object)"none", (Object[])new String[0]);
        ArgumentAcceptingOptionSpec numDupsOpt = parser.accepts("duplicates", "The number of duplicates for each key.").withRequiredArg().describedAs("count").ofType(Integer.class).defaultsTo((Object)Predef$.MODULE$.int2Integer(5), (Object[])new Integer[0]);
        ArgumentAcceptingOptionSpec brokerOpt = parser.accepts("bootstrap-server", "The server(s) to connect to.").withRequiredArg().describedAs("url").ofType(String.class);
        ArgumentAcceptingOptionSpec topicsOpt = parser.accepts("topics", "The number of topics to test.").withRequiredArg().describedAs("count").ofType(Integer.class).defaultsTo((Object)Predef$.MODULE$.int2Integer(1), (Object[])new Integer[0]);
        ArgumentAcceptingOptionSpec percentDeletesOpt = parser.accepts("percent-deletes", "The percentage of updates that are deletes.").withRequiredArg().describedAs("percent").ofType(Integer.class).defaultsTo((Object)Predef$.MODULE$.int2Integer(0), (Object[])new Integer[0]);
        ArgumentAcceptingOptionSpec sleepSecsOpt = parser.accepts("sleep", "Time in milliseconds to sleep between production and consumption.").withRequiredArg().describedAs("ms").ofType(Integer.class).defaultsTo((Object)Predef$.MODULE$.int2Integer(0), (Object[])new Integer[0]);
        OptionSet options = parser.parse(args);
        if (args.length == 0) {
            throw CommandLineUtils$.MODULE$.printUsageAndDie(parser, "A tool to test log compaction. Valid options are: ");
        }
        CommandLineUtils$.MODULE$.checkRequiredArgs(parser, options, (Seq)Predef$.MODULE$.wrapRefArray((Object[])new OptionSpec[]{brokerOpt, numMessagesOpt}));
        long messages = (Long)options.valueOf((OptionSpec)numMessagesOpt);
        String compressionType = (String)options.valueOf((OptionSpec)messageCompressionOpt);
        int percentDeletes = (Integer)options.valueOf((OptionSpec)percentDeletesOpt);
        int dups = (Integer)options.valueOf((OptionSpec)numDupsOpt);
        String brokerUrl = (String)options.valueOf((OptionSpec)brokerOpt);
        int topicCount = (Integer)options.valueOf((OptionSpec)topicsOpt);
        int sleepSecs = (Integer)options.valueOf((OptionSpec)sleepSecsOpt);
        long testId = new Random().nextLong();
        String[] topics = (String[])((TraversableOnce)RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), topicCount).map((Function1)new Serializable(testId){
            public static final long serialVersionUID = 0L;
            private final long testId$1;

            public final String apply(int x$1) {
                return new StringBuilder().append((Object)"log-cleaner-test-").append((Object)BoxesRunTime.boxToLong((long)this.testId$1)).append((Object)"-").append((Object)BoxesRunTime.boxToInteger((int)x$1)).toString();
            }
            {
                this.testId$1 = testId$1;
            }
        }, IndexedSeq$.MODULE$.canBuildFrom())).toArray(ClassTag$.MODULE$.apply(String.class));
        this.createTopics(brokerUrl, (Seq<String>)Predef$.MODULE$.refArrayOps((Object[])topics).toSeq());
        Predef$.MODULE$.println((Object)new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Producing ", " messages..to topics ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToLong((long)messages), Predef$.MODULE$.refArrayOps((Object[])topics).mkString(",")})));
        Path producedDataFilePath = this.produceMessages(brokerUrl, topics, messages, compressionType, dups, percentDeletes);
        Predef$.MODULE$.println((Object)new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Sleeping for ", " seconds..."})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToInteger((int)sleepSecs)})));
        Thread.sleep(sleepSecs * 1000);
        Predef$.MODULE$.println((Object)"Consuming messages...");
        Path consumedDataFilePath = this.consumeMessages(brokerUrl, topics);
        int producedLines = this.lineCount(producedDataFilePath);
        int consumedLines = this.lineCount(consumedDataFilePath);
        double reduction = (double)100 * (1.0 - (double)consumedLines / (double)producedLines);
        int arg$macro$1 = producedLines;
        int arg$macro$2 = consumedLines;
        double arg$macro$3 = reduction;
        Predef$.MODULE$.println((Object)new StringOps("%d rows of data produced, %d rows of data consumed (%.1f%% reduction).").format((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToInteger((int)arg$macro$1), BoxesRunTime.boxToInteger((int)arg$macro$2), BoxesRunTime.boxToDouble((double)arg$macro$3)})));
        Predef$.MODULE$.println((Object)"De-duplicating and validating output files...");
        this.validateOutput(producedDataFilePath.toFile(), consumedDataFilePath.toFile());
        Utils.delete((File)producedDataFilePath.toFile());
        Utils.delete((File)consumedDataFilePath.toFile());
        Predef$.MODULE$.println((Object)"Data verification is completed");
    }

    /*
     * WARNING - void declaration
     */
    public void createTopics(String brokerUrl, Seq<String> topics) {
        Properties adminConfig = new Properties();
        adminConfig.put("bootstrap.servers", brokerUrl);
        AdminClient adminClient = AdminClient.create((Properties)adminConfig);
        try {
            scala.collection.immutable.Map topicConfigs = (scala.collection.immutable.Map)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"cleanup.policy"), (Object)"compact")}));
            List newTopics = (List)JavaConverters$.MODULE$.seqAsJavaListConverter((Seq)topics.map((Function1)new Serializable(topicConfigs){
                public static final long serialVersionUID = 0L;
                private final scala.collection.immutable.Map topicConfigs$1;

                public final NewTopic apply(String name) {
                    return new NewTopic(name, 1, 1).configs((java.util.Map)JavaConverters$.MODULE$.mapAsJavaMapConverter((Map)this.topicConfigs$1).asJava());
                }
                {
                    this.topicConfigs$1 = topicConfigs$1;
                }
            }, Seq$.MODULE$.canBuildFrom())).asJava();
            adminClient.createTopics((Collection)newTopics).all().get();
            ObjectRef pendingTopics = ObjectRef.create((Object)((Seq)Seq$.MODULE$.apply((Seq)Nil$.MODULE$)));
            TestUtils$.MODULE$.waitUntilTrue((Function0<Object>)new Serializable(topics, adminClient, pendingTopics){
                public static final long serialVersionUID = 0L;
                private final Seq topics$1;
                private final AdminClient adminClient$1;
                private final ObjectRef pendingTopics$1;

                public final boolean apply() {
                    return this.apply$mcZ$sp();
                }

                public boolean apply$mcZ$sp() {
                    Seq allTopics = ((SetLike)JavaConverters$.MODULE$.asScalaSetConverter((Set)this.adminClient$1.listTopics().names().get()).asScala()).toSeq();
                    this.pendingTopics$1.elem = (Seq)this.topics$1.filter((Function1)new Serializable(this, allTopics){
                        public static final long serialVersionUID = 0L;
                        private final Seq allTopics$1;

                        public final boolean apply(String topicName) {
                            return !this.allTopics$1.contains((Object)topicName);
                        }
                        {
                            this.allTopics$1 = allTopics$1;
                        }
                    });
                    return ((Seq)this.pendingTopics$1.elem).isEmpty();
                }
                {
                    this.topics$1 = topics$1;
                    this.adminClient$1 = adminClient$1;
                    this.pendingTopics$1 = pendingTopics$1;
                }
            }, (Function0<String>)new Serializable(pendingTopics){
                public static final long serialVersionUID = 0L;
                private final ObjectRef pendingTopics$1;

                public final String apply() {
                    return new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"timed out waiting for topics : ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{(Seq)this.pendingTopics$1.elem}));
                }
                {
                    this.pendingTopics$1 = pendingTopics$1;
                }
            }, TestUtils$.MODULE$.waitUntilTrue$default$3(), TestUtils$.MODULE$.waitUntilTrue$default$4(), TestUtils$.MODULE$.waitUntilTrue$default$5());
        }
        catch (Throwable throwable) {
            void var4_4;
            var4_4.close();
            throw throwable;
        }
        adminClient.close();
    }

    public int lineCount(Path filPath) {
        return Files.readAllLines(filPath).size();
    }

    public void validateOutput(File producedDataFile, File consumedDataFile) {
        BufferedReader producedReader = this.externalSort(producedDataFile);
        BufferedReader consumedReader = this.externalSort(consumedDataFile);
        Iterator<TestRecord> produced = this.valuesIterator(producedReader);
        Iterator<TestRecord> consumed = this.valuesIterator(consumedReader);
        File producedDedupedFile = new File(new StringBuilder().append((Object)producedDataFile.getAbsolutePath()).append((Object)".deduped").toString());
        BufferedWriter producedDeduped = Files.newBufferedWriter(producedDedupedFile.toPath(), StandardCharsets.UTF_8, new OpenOption[0]);
        File consumedDedupedFile = new File(new StringBuilder().append((Object)consumedDataFile.getAbsolutePath()).append((Object)".deduped").toString());
        BufferedWriter consumedDeduped = Files.newBufferedWriter(consumedDedupedFile.toPath(), StandardCharsets.UTF_8, new OpenOption[0]);
        int total = 0;
        int mismatched = 0;
        while (produced.hasNext() && consumed.hasNext()) {
            TestRecord p = (TestRecord)produced.next();
            producedDeduped.write(p.toString());
            producedDeduped.newLine();
            TestRecord c = (TestRecord)consumed.next();
            consumedDeduped.write(c.toString());
            consumedDeduped.newLine();
            TestRecord testRecord = p;
            TestRecord testRecord2 = c;
            if (testRecord == null ? testRecord2 != null : !((Object)testRecord).equals(testRecord2)) {
                ++mismatched;
            }
            ++total;
        }
        producedDeduped.close();
        consumedDeduped.close();
        Predef$.MODULE$.println((Object)new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Validated ", " values, ", " mismatches."})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToInteger((int)total), BoxesRunTime.boxToInteger((int)mismatched)})));
        this.require(!produced.hasNext(), (Function0<Object>)new Serializable(){
            public static final long serialVersionUID = 0L;

            public final String apply() {
                return "Additional values produced not found in consumer log.";
            }
        });
        this.require(!consumed.hasNext(), (Function0<Object>)new Serializable(){
            public static final long serialVersionUID = 0L;

            public final String apply() {
                return "Additional values consumed not found in producer log.";
            }
        });
        this.require(mismatched == 0, (Function0<Object>)new Serializable(){
            public static final long serialVersionUID = 0L;

            public final String apply() {
                return "Non-zero number of row mismatches.";
            }
        });
        Utils.delete((File)producedDedupedFile);
        Utils.delete((File)consumedDedupedFile);
    }

    public void require(boolean requirement, Function0<Object> message) {
        if (requirement) {
            return;
        }
        System.err.println(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Data validation failed : ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{message.apply()})));
        throw Exit$.MODULE$.exit(1, Exit$.MODULE$.exit$default$2());
    }

    public Iterator<TestRecord> valuesIterator(BufferedReader reader) {
        return (Iterator)JavaConverters$.MODULE$.asScalaIteratorConverter((java.util.Iterator)new AbstractIterator<TestRecord>(reader){
            private final BufferedReader reader$1;

            public TestRecord makeNext() {
                TestRecord next = LogCompactionTester$.MODULE$.readNext(this.reader$1);
                while (next != null && next.delete()) {
                    next = LogCompactionTester$.MODULE$.readNext(this.reader$1);
                }
                return next == null ? (TestRecord)this.allDone() : next;
            }
            {
                this.reader$1 = reader$1;
            }
        }).asScala();
    }

    public TestRecord readNext(BufferedReader reader) {
        String line = reader.readLine();
        if (line == null) {
            return null;
        }
        TestRecord curr = TestRecord$.MODULE$.parse(line);
        while (true) {
            if ((line = this.peekLine(reader)) == null) {
                return curr;
            }
            TestRecord next = TestRecord$.MODULE$.parse(line);
            if (next == null) break;
            String string = next.topicAndKey();
            String string2 = curr.topicAndKey();
            if (string != null ? !string.equals(string2) : string2 != null) break;
            curr = next;
            reader.readLine();
        }
        return curr;
    }

    /*
     * WARNING - void declaration
     */
    public String peekLine(BufferedReader reader) {
        void var2_2;
        reader.mark(this.ReadAheadLimit());
        String line = reader.readLine();
        reader.reset();
        return var2_2;
    }

    public BufferedReader externalSort(File file) {
        ProcessBuilder builder = new ProcessBuilder("sort", "--key=1,2", "--stable", "--buffer-size=20%", new StringBuilder().append((Object)"--temporary-directory=").append((Object)Files.createTempDirectory("log_compaction_test", new FileAttribute[0])).toString(), file.getAbsolutePath());
        Process process = builder.start();
        new Thread(process){
            private final Process process$1;

            public void run() {
                int exitCode = this.process$1.waitFor();
                if (exitCode != 0) {
                    System.err.println("Process exited abnormally.");
                    while (this.process$1.getErrorStream().available() > 0) {
                        System.err.write(this.process$1.getErrorStream().read());
                    }
                }
            }
            {
                this.process$1 = process$1;
            }
        }.start();
        return new BufferedReader(new InputStreamReader(process.getInputStream(), StandardCharsets.UTF_8), 0xA00000);
    }

    /*
     * WARNING - void declaration
     */
    public Path produceMessages(String brokerUrl, String[] topics, long messages, String compressionType, int dups, int percentDeletes) {
        Path path;
        Properties producerProps = new Properties();
        producerProps.setProperty("max.block.ms", ((Object)BoxesRunTime.boxToLong((long)Long.MAX_VALUE)).toString());
        producerProps.setProperty("bootstrap.servers", brokerUrl);
        producerProps.setProperty("compression.type", compressionType);
        KafkaProducer producer = new KafkaProducer(producerProps, (Serializer)new ByteArraySerializer(), (Serializer)new ByteArraySerializer());
        try {
            Random rand = new Random(1L);
            int keyCount = (int)(messages / (long)dups);
            Path producedFilePath = Files.createTempFile("kafka-log-cleaner-produced-", ".txt", new FileAttribute[0]);
            Predef$.MODULE$.println((Object)new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Logging produce requests to ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{producedFilePath})));
            BufferedWriter producedWriter = Files.newBufferedWriter(producedFilePath, StandardCharsets.UTF_8, new OpenOption[0]);
            new RichLong(Predef$.MODULE$.longWrapper(0L)).until((Object)BoxesRunTime.boxToLong((long)(messages * (long)topics.length))).foreach((Function1)new Serializable(topics, percentDeletes, producer, rand, keyCount, producedWriter){
                public static final long serialVersionUID = 0L;
                private final String[] topics$2;
                private final int percentDeletes$1;
                private final KafkaProducer producer$1;
                private final Random rand$1;
                private final int keyCount$1;
                private final BufferedWriter producedWriter$1;

                public final void apply(long i) {
                    this.apply$mcVJ$sp(i);
                }

                public void apply$mcVJ$sp(long i) {
                    String topic = this.topics$2[(int)(i % (long)this.topics$2.length)];
                    int key = this.rand$1.nextInt(this.keyCount$1);
                    boolean delete = i % 100L < (long)this.percentDeletes$1;
                    ProducerRecord msg = delete ? new ProducerRecord(topic, (Object)((Object)BoxesRunTime.boxToInteger((int)key)).toString().getBytes(StandardCharsets.UTF_8), null) : new ProducerRecord(topic, (Object)((Object)BoxesRunTime.boxToInteger((int)key)).toString().getBytes(StandardCharsets.UTF_8), (Object)((Object)BoxesRunTime.boxToLong((long)i)).toString().getBytes(StandardCharsets.UTF_8));
                    this.producer$1.send(msg);
                    this.producedWriter$1.write(new TestRecord(topic, key, i, delete).toString());
                    this.producedWriter$1.newLine();
                }
                {
                    this.topics$2 = topics$2;
                    this.percentDeletes$1 = percentDeletes$1;
                    this.producer$1 = producer$1;
                    this.rand$1 = rand$1;
                    this.keyCount$1 = keyCount$1;
                    this.producedWriter$1 = producedWriter$1;
                }
            });
            producedWriter.close();
            path = producedFilePath;
        }
        catch (Throwable throwable) {
            void var9_8;
            var9_8.close();
            throw throwable;
        }
        producer.close();
        return path;
    }

    public KafkaConsumer<String, String> createConsumer(String brokerUrl) {
        Properties consumerProps = new Properties();
        consumerProps.setProperty("group.id", new StringBuilder().append((Object)"log-cleaner-test-").append((Object)BoxesRunTime.boxToInteger((int)new Random().nextInt(Integer.MAX_VALUE))).toString());
        consumerProps.setProperty("bootstrap.servers", brokerUrl);
        consumerProps.setProperty("auto.offset.reset", "earliest");
        return new KafkaConsumer(consumerProps, (Deserializer)new StringDeserializer(), (Deserializer)new StringDeserializer());
    }

    /*
     * WARNING - void declaration
     */
    public Path consumeMessages(String brokerUrl, String[] topics) {
        void var3_3;
        void var5_5;
        Path path;
        KafkaConsumer<String, String> consumer = this.createConsumer(brokerUrl);
        consumer.subscribe((Collection)JavaConverters$.MODULE$.mutableSeqAsJavaListConverter((scala.collection.mutable.Seq)Predef$.MODULE$.refArrayOps((Object[])topics).seq()).asJava());
        Path consumedFilePath = Files.createTempFile("kafka-log-cleaner-consumed-", ".txt", new FileAttribute[0]);
        Predef$.MODULE$.println((Object)new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Logging consumed messages to ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{consumedFilePath})));
        BufferedWriter consumedWriter = Files.newBufferedWriter(consumedFilePath, StandardCharsets.UTF_8, new OpenOption[0]);
        try {
            boolean done = false;
            while (true) {
                if (!done) break block5;
                path = consumedFilePath;
                break;
            }
        }
        catch (Throwable throwable) {
            var5_5.close();
            var3_3.close();
            throw throwable;
        }
        {
            block5: {
                consumedWriter.close();
                consumer.close();
                return path;
            }
            ConsumerRecords consumerRecords = var3_3.poll(Duration.ofSeconds(20L));
            if (consumerRecords.isEmpty()) {
                boolean bl = true;
                continue;
            }
            ((IterableLike)JavaConverters$.MODULE$.iterableAsScalaIterableConverter((Iterable)consumerRecords).asScala()).foreach((Function1)new Serializable((BufferedWriter)var5_5){
                public static final long serialVersionUID = 0L;
                private final BufferedWriter consumedWriter$1;

                public final void apply(ConsumerRecord<String, String> record) {
                    boolean delete = record.value() == null;
                    long value = delete ? -1L : new StringOps(Predef$.MODULE$.augmentString((String)record.value())).toLong();
                    this.consumedWriter$1.write(new TestRecord(record.topic(), new StringOps(Predef$.MODULE$.augmentString((String)record.key())).toInt(), value, delete).toString());
                    this.consumedWriter$1.newLine();
                }
                {
                    this.consumedWriter$1 = consumedWriter$1;
                }
            });
            continue;
        }
    }

    public String readString(ByteBuffer buffer) {
        return Utils.utf8((ByteBuffer)buffer);
    }

    private LogCompactionTester$() {
        MODULE$ = this;
        this.ReadAheadLimit = 4906;
    }
}

