/*
 * Decompiled with CFR 0.152.
 */
package io.github.amerousful.kafka.client;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.ActorSystem$;
import akka.actor.ClassicActorSystemProvider;
import akka.kafka.AutoSubscription;
import akka.kafka.ConsumerSettings;
import akka.kafka.ConsumerSettings$;
import akka.kafka.Subscription;
import akka.kafka.Subscriptions$;
import akka.kafka.scaladsl.Consumer;
import akka.kafka.scaladsl.Consumer$;
import akka.kafka.scaladsl.PartitionAssignmentHandler;
import akka.stream.Graph;
import akka.stream.Materializer;
import akka.stream.Materializer$;
import akka.stream.scaladsl.Keep$;
import akka.stream.scaladsl.Sink$;
import akka.stream.scaladsl.Source;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import com.typesafe.scalalogging.Logger;
import com.typesafe.scalalogging.StrictLogging;
import io.gatling.commons.util.Clock;
import io.gatling.core.stats.StatsEngine;
import io.gatling.core.util.NameGen;
import io.github.amerousful.kafka.action.KafkaLogging;
import io.github.amerousful.kafka.client.KafkaTracker;
import io.github.amerousful.kafka.client.MessageReceived;
import io.github.amerousful.kafka.client.Tracker$;
import io.github.amerousful.kafka.client.TrackerAndController;
import io.github.amerousful.kafka.client.WaitRebalancing;
import io.github.amerousful.kafka.protocol.KafkaMatcher;
import java.io.Serializable;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import scala.Function0;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef;
import scala.Predef$;
import scala.Some;
import scala.Tuple2;
import scala.collection.IterableOnce;
import scala.collection.IterableOnceOps;
import scala.collection.StringOps$;
import scala.collection.immutable.Map;
import scala.collection.immutable.MapOps;
import scala.collection.immutable.Seq;
import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.ScalaRunTime$;
import scala.runtime.Statics;
import scala.runtime.java8.JFunction0;

@ScalaSignature(bytes="\u0006\u0005\u0005ee\u0001\u0002\n\u0014\u0001yA\u0001\"\u000e\u0001\u0003\u0002\u0003\u0006IA\u000e\u0005\t\t\u0002\u0011\t\u0011)A\u0005\u000b\"AQ\n\u0001B\u0001B\u0003%a\n\u0003\u0005U\u0001\t\u0005\t\u0015!\u0003V\u0011\u0015a\u0006\u0001\"\u0001^\u0011\u001d!\u0007A1A\u0005\n\u0015DaA\u001d\u0001!\u0002\u00131\u0007bB:\u0001\u0005\u0004%I\u0001\u001e\u0005\u0007q\u0002\u0001\u000b\u0011B;\t\u000fe\u0004!\u0019!C\u0005u\"9\u00111\u0002\u0001!\u0002\u0013Y\bBCA\u0007\u0001!\u0015\r\u0011b\u0001\u0002\u0010!Q\u0011\u0011\u0003\u0001\t\u0006\u0004%\u0019!a\u0005\t\u000f\u0005\u0005\u0002\u0001\"\u0001\u0002$!9\u00111\u0006\u0001\u0005\n\u00055\u0002bBA\u0019\u0001\u0011%\u00111\u0007\u0005\b\u0003{\u0002A\u0011AA@\u0005AY\u0015MZ6b)J\f7m[3s!>dGN\u0003\u0002\u0015+\u000511\r\\5f]RT!AF\f\u0002\u000b-\fgm[1\u000b\u0005aI\u0012AC1nKJ|Wo\u001d4vY*\u0011!dG\u0001\u0007O&$\b.\u001e2\u000b\u0003q\t!![8\u0004\u0001M!\u0001aH\u0013,!\t\u00013%D\u0001\"\u0015\u0005\u0011\u0013!B:dC2\f\u0017B\u0001\u0013\"\u0005\u0019\te.\u001f*fMB\u0011a%K\u0007\u0002O)\u0011\u0001&F\u0001\u0007C\u000e$\u0018n\u001c8\n\u0005):#\u0001D&bM.\fGj\\4hS:<\u0007C\u0001\u00174\u001b\u0005i#B\u0001\u00180\u0003\u0011)H/\u001b7\u000b\u0005A\n\u0014\u0001B2pe\u0016T!AM\u000e\u0002\u000f\u001d\fG\u000f\\5oO&\u0011A'\f\u0002\b\u001d\u0006lWmR3o\u0003I\u0019wN\\:v[\u0016\u0014\bK]8qKJ$\u0018.Z:\u0011\t]r\u0014i\b\b\u0003qq\u0002\"!O\u0011\u000e\u0003iR!aO\u000f\u0002\rq\u0012xn\u001c;?\u0013\ti\u0014%\u0001\u0004Qe\u0016$WMZ\u0005\u0003\u007f\u0001\u00131!T1q\u0015\ti\u0014\u0005\u0005\u00028\u0005&\u00111\t\u0011\u0002\u0007'R\u0014\u0018N\\4\u0002\rML8\u000f^3n!\t15*D\u0001H\u0015\tA\u0015*A\u0003bGR|'OC\u0001K\u0003\u0011\t7n[1\n\u00051;%aC!di>\u00148+_:uK6\f1b\u001d;biN,enZ5oKB\u0011qJU\u0007\u0002!*\u0011\u0011kL\u0001\u0006gR\fGo]\u0005\u0003'B\u00131b\u0015;biN,enZ5oK\u0006)1\r\\8dWB\u0011aKW\u0007\u0002/*\u0011a\u0006\u0017\u0006\u00033F\nqaY8n[>t7/\u0003\u0002\\/\n)1\t\\8dW\u00061A(\u001b8jiz\"RA\u00181bE\u000e\u0004\"a\u0018\u0001\u000e\u0003MAQ!N\u0003A\u0002YBQ\u0001R\u0003A\u0002\u0015CQ!T\u0003A\u00029CQ\u0001V\u0003A\u0002U\u000b\u0001\u0002\u001e:bG.,'o]\u000b\u0002MB!q-\\!p\u001b\u0005A'BA5k\u0003)\u0019wN\\2veJ,g\u000e\u001e\u0006\u0003]-T\u0011\u0001\\\u0001\u0005U\u00064\u0018-\u0003\u0002oQ\n\t2i\u001c8dkJ\u0014XM\u001c;ICNDW*\u00199\u0011\u0005}\u0003\u0018BA9\u0014\u0005Q!&/Y2lKJ\fe\u000eZ\"p]R\u0014x\u000e\u001c7fe\u0006IAO]1dW\u0016\u00148\u000fI\u0001\u0011e\u0016\u0014\u0017\r\\1oG&tw\rT1uG\",\u0012!\u001e\t\u0003OZL!a\u001e5\u0003\u001d\r{WO\u001c;E_^tG*\u0019;dQ\u0006\t\"/\u001a2bY\u0006t7-\u001b8h\u0019\u0006$8\r\u001b\u0011\u0002#\u0011L7/\u00192mK2{wm]\"p]\u001aLw-F\u0001|!\ra\u0018qA\u0007\u0002{*\u0011ap`\u0001\u0007G>tg-[4\u000b\t\u0005\u0005\u00111A\u0001\tif\u0004Xm]1gK*\u0011\u0011QA\u0001\u0004G>l\u0017bAA\u0005{\n11i\u001c8gS\u001e\f!\u0003Z5tC\ndW\rT8hg\u000e{gNZ5hA\u0005\u00112/_:uK6\f5n[1D_:\u001cX/\\3s+\u0005)\u0015\u0001D7bi\u0016\u0014\u0018.\u00197ju\u0016\u0014XCAA\u000b!\u0011\t9\"!\b\u000e\u0005\u0005e!bAA\u000e\u0013\u000611\u000f\u001e:fC6LA!a\b\u0002\u001a\taQ*\u0019;fe&\fG.\u001b>fe\u0006)1\r\\8tKR\u0011\u0011Q\u0005\t\u0004A\u0005\u001d\u0012bAA\u0015C\t!QK\\5u\u0003i!W-\u00197XSRD7i\u001c8tk6,'\u000f\u0015:pa\u0016\u0014H/[3t)\t\ty\u0003\u0005\u00038}\u0005\u000b\u0015AD2sK\u0006$XmQ8ogVlWM\u001d\u000b\u0005\u0003k\tI\b\u0005\u0005\u00028\u0005u\u0012\u0011IA3\u001b\t\tID\u0003\u0003\u0002<\u0005e\u0011\u0001C:dC2\fGm\u001d7\n\t\u0005}\u0012\u0011\b\u0002\u0007'>,(oY3\u0011\u0011\u0005\r\u0013qKA.\u00037j!!!\u0012\u000b\t\u0005\u001d\u0013\u0011J\u0001\tG>t7/^7fe*!\u00111JA'\u0003\u001d\u0019G.[3oiNT1AFA(\u0015\u0011\t\t&a\u0015\u0002\r\u0005\u0004\u0018m\u00195f\u0015\t\t)&A\u0002pe\u001eLA!!\u0017\u0002F\tq1i\u001c8tk6,'OU3d_J$\u0007\u0003BA/\u0003Gj!!a\u0018\u000b\u0007\u0005\u00054.\u0001\u0003mC:<\u0017bA\"\u0002`A!\u0011qMA:\u001d\u0011\tI'a\u001c\u000e\u0005\u0005-$\u0002BA\u001e\u0003[R!AF%\n\t\u0005E\u00141N\u0001\t\u0007>t7/^7fe&!\u0011QOA<\u0005\u001d\u0019uN\u001c;s_2TA!!\u001d\u0002l!1\u00111\u0010\tA\u0002\u0005\u000b\u0011B]3bIR{\u0007/[2\u0002\u000fQ\u0014\u0018mY6feR1\u0011\u0011QAD\u0003\u0013\u00032aXAB\u0013\r\t)i\u0005\u0002\r\u0017\u000647.\u0019+sC\u000e\\WM\u001d\u0005\u0007\u0003w\n\u0002\u0019A!\t\u000f\u0005-\u0015\u00031\u0001\u0002\u000e\u0006qQ.Z:tC\u001e,W*\u0019;dQ\u0016\u0014\b\u0003BAH\u0003+k!!!%\u000b\u0007\u0005MU#\u0001\u0005qe>$xnY8m\u0013\u0011\t9*!%\u0003\u0019-\u000bgm[1NCR\u001c\u0007.\u001a:")
public class KafkaTrackerPoll
implements KafkaLogging,
NameGen {
    private ActorSystem systemAkkaConsumer;
    private Materializer materializer;
    private final Map<String, Object> consumerProperties;
    private final ActorSystem system;
    private final StatsEngine statsEngine;
    private final Clock clock;
    private final ConcurrentHashMap<String, TrackerAndController> trackers;
    private final CountDownLatch rebalancingLatch;
    private Config disableLogsConfig;
    private Logger logger;
    private volatile byte bitmap$0;

    public String genName(String base) {
        return NameGen.genName$((NameGen)this, (String)base);
    }

    @Override
    public void logMessage(Function0<String> text, ProducerRecord<String, String> msg) {
        KafkaLogging.logMessage$(this, text, msg);
    }

    public Logger logger() {
        return this.logger;
    }

    public void com$typesafe$scalalogging$StrictLogging$_setter_$logger_$eq(Logger x$1) {
        this.logger = x$1;
    }

    private ConcurrentHashMap<String, TrackerAndController> trackers() {
        return this.trackers;
    }

    private CountDownLatch rebalancingLatch() {
        return this.rebalancingLatch;
    }

    private Config disableLogsConfig() {
        return this.disableLogsConfig;
    }

    private ActorSystem systemAkkaConsumer$lzycompute() {
        KafkaTrackerPoll kafkaTrackerPoll = this;
        synchronized (kafkaTrackerPoll) {
            if ((byte)(this.bitmap$0 & 1) == 0) {
                this.systemAkkaConsumer = ActorSystem$.MODULE$.apply("KafkaAkkaConsumer", this.disableLogsConfig());
                this.bitmap$0 = (byte)(this.bitmap$0 | 1);
            }
        }
        this.disableLogsConfig = null;
        return this.systemAkkaConsumer;
    }

    public ActorSystem systemAkkaConsumer() {
        if ((byte)(this.bitmap$0 & 1) == 0) {
            return this.systemAkkaConsumer$lzycompute();
        }
        return this.systemAkkaConsumer;
    }

    private Materializer materializer$lzycompute() {
        KafkaTrackerPoll kafkaTrackerPoll = this;
        synchronized (kafkaTrackerPoll) {
            if ((byte)(this.bitmap$0 & 2) == 0) {
                this.materializer = Materializer$.MODULE$.matFromSystem((ClassicActorSystemProvider)this.systemAkkaConsumer());
                this.bitmap$0 = (byte)(this.bitmap$0 | 2);
            }
        }
        return this.materializer;
    }

    public Materializer materializer() {
        if ((byte)(this.bitmap$0 & 2) == 0) {
            return this.materializer$lzycompute();
        }
        return this.materializer;
    }

    public void close() {
        this.trackers().values().forEach(x0$1 -> {
            TrackerAndController trackerAndController = x0$1;
            if (trackerAndController != null) {
                Consumer.Control consumerControl = trackerAndController.consumerControl();
                consumerControl.shutdown();
                return;
            }
            throw new MatchError((Object)trackerAndController);
        });
    }

    private Map<String, String> dealWithConsumerProperties() {
        BoxedUnit boxedUnit;
        Map properties = (Map)((MapOps)this.consumerProperties.$plus$plus((IterableOnce)Predef$.MODULE$.Map().apply((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"enable.auto.commit"), (Object)"true"), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"auto.offset.reset"), (Object)"latest")})))).updatedWith((Object)"group.id", (Function1 & Serializable)x0$1 -> {
            Option option = x0$1;
            if (None$.MODULE$.equals(option)) {
                return new Some((Object)new StringBuilder(13).append("gatling-test-").append(UUID.randomUUID()).toString());
            }
            if (option instanceof Some) {
                Some some = (Some)option;
                Object value = some.value();
                return new Some(value);
            }
            throw new MatchError((Object)option);
        });
        Map updateProperties = (Map)properties.map((Function1 & Serializable)x0$2 -> {
            Tuple2 tuple2 = x0$2;
            if (tuple2 != null) {
                String key = (String)tuple2._1();
                Object value = tuple2._2();
                if (value instanceof String) {
                    String string = (String)value;
                    return Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)key), (Object)string);
                }
            }
            if (tuple2 != null) {
                String key = (String)tuple2._1();
                Object value = tuple2._2();
                return Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)key), (Object)value.toString());
            }
            throw new MatchError((Object)tuple2);
        });
        String consProps = ((IterableOnceOps)updateProperties.map((Function1 & Serializable)i -> new StringBuilder(2).append(i._1()).append(": ").append(i._2()).toString())).mkString("\n");
        if (this.logger().underlying().isDebugEnabled()) {
            this.logger().underlying().debug("Consumer properties:\n{}\n", (Object)consProps);
            boxedUnit = BoxedUnit.UNIT;
        } else {
            boxedUnit = BoxedUnit.UNIT;
        }
        return updateProperties;
    }

    private Source<ConsumerRecord<String, String>, Consumer.Control> createConsumer(String readTopic) {
        BoxedUnit boxedUnit;
        Map<String, String> properties = this.dealWithConsumerProperties();
        String consumerName = (String)properties.apply((Object)"group.id");
        if (this.logger().underlying().isDebugEnabled()) {
            this.logger().underlying().debug("Create consumer - {}", (Object)consumerName);
            boxedUnit = BoxedUnit.UNIT;
        } else {
            boxedUnit = BoxedUnit.UNIT;
        }
        Config kafkaConfig = this.systemAkkaConsumer().settings().config().getConfig("akka.kafka.consumer");
        ConsumerSettings consumerSettings = ConsumerSettings$.MODULE$.apply(kafkaConfig, (Deserializer)new StringDeserializer(), (Deserializer)new StringDeserializer()).withProperties(properties);
        AutoSubscription subscription = Subscriptions$.MODULE$.topics((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new String[]{readTopic})).withPartitionAssignmentHandler((PartitionAssignmentHandler)new WaitRebalancing(consumerName, (Function0<BoxedUnit>)(JFunction0.mcV.sp & Serializable)() -> this.rebalancingLatch().countDown()));
        return Consumer$.MODULE$.plainSource(consumerSettings, (Subscription)subscription);
    }

    public KafkaTracker tracker(String readTopic, KafkaMatcher messageMatcher) {
        return this.trackers().computeIfAbsent(readTopic, x$1 -> {
            ActorRef actor = $this.system.actorOf(Tracker$.MODULE$.props($this.statsEngine, $this.clock), this.genName("kafkaTrackerActor"));
            Source<ConsumerRecord<String, String>, Consumer.Control> consumer = this.createConsumer(readTopic);
            Tuple2 tuple2 = (Tuple2)consumer.toMat((Graph)Sink$.MODULE$.foreach((Function1 & Serializable)record -> {
                KafkaTrackerPoll.$anonfun$tracker$2(this, messageMatcher, actor, record);
                return BoxedUnit.UNIT;
            }), Keep$.MODULE$.both()).run(this.materializer());
            if (tuple2 == null) {
                throw new MatchError((Object)tuple2);
            }
            Consumer.Control consumerControl = (Consumer.Control)tuple2._1();
            Future streamComplete = (Future)tuple2._2();
            Tuple2 tuple22 = new Tuple2((Object)consumerControl, (Object)streamComplete);
            Consumer.Control consumerControl2 = (Consumer.Control)tuple22._1();
            Future streamComplete2 = (Future)tuple22._2();
            streamComplete2.onComplete((Function1 & Serializable)x$3 -> {
                BoxedUnit boxedUnit;
                if (this.logger().underlying().isDebugEnabled()) {
                    this.logger().underlying().debug("Kafka akka consumer stream has been completed.");
                    boxedUnit = BoxedUnit.UNIT;
                } else {
                    boxedUnit = BoxedUnit.UNIT;
                }
                return this.systemAkkaConsumer().terminate();
            }, ExecutionContext.Implicits$.MODULE$.global());
            this.rebalancingLatch().await();
            return new TrackerAndController(new KafkaTracker(actor), consumerControl2);
        }).kafkaTracker();
    }

    public static final /* synthetic */ void $anonfun$tracker$2(KafkaTrackerPoll $this, KafkaMatcher messageMatcher$1, ActorRef actor$1, ConsumerRecord record) {
        BoxedUnit boxedUnit;
        String matchId = messageMatcher$1.responseMatchId((ConsumerRecord<String, String>)record);
        if ($this.logger().underlying().isDebugEnabled()) {
            $this.logger().underlying().debug("Received Kafka message. Key: {} Payload: {}. With matchId - {}", new Object[]{record.key(), record.value(), matchId});
            boxedUnit = BoxedUnit.UNIT;
        } else {
            boxedUnit = BoxedUnit.UNIT;
        }
        MessageReceived x$1 = new MessageReceived(matchId, $this.clock.nowMillis(), (ConsumerRecord<String, String>)record);
        ActorRef x$2 = actor$1.$bang$default$2((Object)x$1);
        actor$1.$bang((Object)x$1, x$2);
    }

    public KafkaTrackerPoll(Map<String, Object> consumerProperties, ActorSystem system, StatsEngine statsEngine, Clock clock) {
        this.consumerProperties = consumerProperties;
        this.system = system;
        this.statsEngine = statsEngine;
        this.clock = clock;
        StrictLogging.$init$((StrictLogging)this);
        KafkaLogging.$init$(this);
        NameGen.$init$((NameGen)this);
        this.trackers = new ConcurrentHashMap();
        this.rebalancingLatch = new CountDownLatch(1);
        String configString = StringOps$.MODULE$.stripMargin$extension(Predef$.MODULE$.augmentString("\n        | akka {\n        |   loglevel = \"OFF\"\n        | }\n        "));
        Config config = ConfigFactory.parseString((String)configString);
        this.disableLogsConfig = ConfigFactory.load((Config)config);
        Statics.releaseFence();
    }
}

