/*
 * Decompiled with CFR 0.152.
 */
package io.github.amerousful.kafka.client;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.ActorSystem$;
import akka.actor.ClassicActorSystemProvider;
import akka.kafka.AutoSubscription;
import akka.kafka.ConsumerSettings;
import akka.kafka.ConsumerSettings$;
import akka.kafka.Subscription;
import akka.kafka.Subscriptions$;
import akka.kafka.scaladsl.Consumer;
import akka.kafka.scaladsl.Consumer$;
import akka.kafka.scaladsl.PartitionAssignmentHandler;
import akka.stream.Graph;
import akka.stream.Materializer;
import akka.stream.Materializer$;
import akka.stream.scaladsl.Keep$;
import akka.stream.scaladsl.Sink$;
import akka.stream.scaladsl.Source;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import com.typesafe.scalalogging.Logger;
import com.typesafe.scalalogging.StrictLogging;
import io.gatling.commons.util.Clock;
import io.gatling.core.stats.StatsEngine;
import io.gatling.core.util.NameGen;
import io.github.amerousful.kafka.action.KafkaLogging;
import io.github.amerousful.kafka.client.KafkaTracker;
import io.github.amerousful.kafka.client.MessageReceived;
import io.github.amerousful.kafka.client.Tracker$;
import io.github.amerousful.kafka.client.TrackerAndController;
import io.github.amerousful.kafka.client.WaitRebalancing;
import io.github.amerousful.kafka.protocol.KafkaMatcher;
import java.io.Serializable;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import scala.Function0;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef;
import scala.Predef$;
import scala.Some;
import scala.Tuple2;
import scala.collection.IterableOnce;
import scala.collection.IterableOnceOps;
import scala.collection.StringOps$;
import scala.collection.immutable.Map;
import scala.collection.immutable.MapOps;
import scala.collection.immutable.Seq;
import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.ScalaRunTime$;
import scala.runtime.Statics;
import scala.runtime.java8.JFunction0;

@ScalaSignature(bytes="\u0006\u0005\u0005\u0005f\u0001\u0002\u000b\u0016\u0001\u0001B\u0001b\u000e\u0001\u0003\u0002\u0003\u0006I\u0001\u000f\u0005\t\r\u0002\u0011\t\u0011)A\u0005\u000f\"Aq\n\u0001B\u0001B\u0003%\u0001\u000b\u0003\u0005W\u0001\t\u0005\t\u0015!\u0003X\u0011\u0015q\u0006\u0001\"\u0001`\u0011\u001d1\u0007A1A\u0005\n\u001dDa\u0001\u001e\u0001!\u0002\u0013A\u0007bB;\u0001\u0005\u0004%IA\u001e\u0005\u0007u\u0002\u0001\u000b\u0011B<\t\u000fm\u0004!\u0019!C\u0001y\"9\u0011q\u0002\u0001!\u0002\u0013i\b\"CA\t\u0001\t\u0007I1AA\n\u0011\u001d\t)\u0002\u0001Q\u0001\n\u001dC\u0011\"a\u0006\u0001\u0005\u0004%\u0019!!\u0007\t\u0011\u0005\u001d\u0002\u0001)A\u0005\u00037Aq!!\u000b\u0001\t\u0003\tY\u0003C\u0004\u00024\u0001!I!!\u000e\t\u000f\u0005e\u0002\u0001\"\u0003\u0002<!9\u0011Q\u0011\u0001\u0005\u0002\u0005\u001d%\u0001E&bM.\fGK]1dW\u0016\u0014\bk\u001c7m\u0015\t1r#\u0001\u0004dY&,g\u000e\u001e\u0006\u00031e\tQa[1gW\u0006T!AG\u000e\u0002\u0015\u0005lWM]8vg\u001a,HN\u0003\u0002\u001d;\u00051q-\u001b;ik\nT\u0011AH\u0001\u0003S>\u001c\u0001a\u0005\u0003\u0001C\u001dj\u0003C\u0001\u0012&\u001b\u0005\u0019#\"\u0001\u0013\u0002\u000bM\u001c\u0017\r\\1\n\u0005\u0019\u001a#AB!osJ+g\r\u0005\u0002)W5\t\u0011F\u0003\u0002+/\u00051\u0011m\u0019;j_:L!\u0001L\u0015\u0003\u0019-\u000bgm[1M_\u001e<\u0017N\\4\u0011\u00059*T\"A\u0018\u000b\u0005A\n\u0014\u0001B;uS2T!AM\u001a\u0002\t\r|'/\u001a\u0006\u0003iu\tqaZ1uY&tw-\u0003\u00027_\t9a*Y7f\u000f\u0016t\u0017AE2p]N,X.\u001a:Qe>\u0004XM\u001d;jKN\u0004B!\u000f!DC9\u0011!H\u0010\t\u0003w\rj\u0011\u0001\u0010\u0006\u0003{}\ta\u0001\u0010:p_Rt\u0014BA $\u0003\u0019\u0001&/\u001a3fM&\u0011\u0011I\u0011\u0002\u0004\u001b\u0006\u0004(BA $!\tID)\u0003\u0002F\u0005\n11\u000b\u001e:j]\u001e\faa]=ti\u0016l\u0007C\u0001%N\u001b\u0005I%B\u0001&L\u0003\u0015\t7\r^8s\u0015\u0005a\u0015\u0001B1lW\u0006L!AT%\u0003\u0017\u0005\u001bGo\u001c:TsN$X-\\\u0001\fgR\fGo]#oO&tW\r\u0005\u0002R)6\t!K\u0003\u0002Tc\u0005)1\u000f^1ug&\u0011QK\u0015\u0002\f'R\fGo]#oO&tW-A\u0003dY>\u001c7\u000e\u0005\u0002Y96\t\u0011L\u0003\u000215*\u00111lM\u0001\bG>lWn\u001c8t\u0013\ti\u0016LA\u0003DY>\u001c7.\u0001\u0004=S:LGO\u0010\u000b\u0006A\n\u001cG-\u001a\t\u0003C\u0002i\u0011!\u0006\u0005\u0006o\u0015\u0001\r\u0001\u000f\u0005\u0006\r\u0016\u0001\ra\u0012\u0005\u0006\u001f\u0016\u0001\r\u0001\u0015\u0005\u0006-\u0016\u0001\raV\u0001\tiJ\f7m[3sgV\t\u0001\u000e\u0005\u0003j_\u000e\u000bX\"\u00016\u000b\u0005-d\u0017AC2p]\u000e,(O]3oi*\u0011\u0001'\u001c\u0006\u0002]\u0006!!.\u0019<b\u0013\t\u0001(NA\tD_:\u001cWO\u001d:f]RD\u0015m\u001d5NCB\u0004\"!\u0019:\n\u0005M,\"\u0001\u0006+sC\u000e\\WM]!oI\u000e{g\u000e\u001e:pY2,'/A\u0005ue\u0006\u001c7.\u001a:tA\u0005\u0001\"/\u001a2bY\u0006t7-\u001b8h\u0019\u0006$8\r[\u000b\u0002oB\u0011\u0011\u000e_\u0005\u0003s*\u0014abQ8v]R$un\u001e8MCR\u001c\u0007.A\tsK\n\fG.\u00198dS:<G*\u0019;dQ\u0002\n\u0011\u0003Z5tC\ndW\rT8hg\u000e{gNZ5h+\u0005i\bc\u0001@\u0002\f5\tqP\u0003\u0003\u0002\u0002\u0005\r\u0011AB2p]\u001aLwM\u0003\u0003\u0002\u0006\u0005\u001d\u0011\u0001\u0003;za\u0016\u001c\u0018MZ3\u000b\u0005\u0005%\u0011aA2p[&\u0019\u0011QB@\u0003\r\r{gNZ5h\u0003I!\u0017n]1cY\u0016dunZ:D_:4\u0017n\u001a\u0011\u0002%ML8\u000f^3n\u0003.\\\u0017mQ8ogVlWM]\u000b\u0002\u000f\u0006\u00192/_:uK6\f5n[1D_:\u001cX/\\3sA\u0005aQ.\u0019;fe&\fG.\u001b>feV\u0011\u00111\u0004\t\u0005\u0003;\t\u0019#\u0004\u0002\u0002 )\u0019\u0011\u0011E&\u0002\rM$(/Z1n\u0013\u0011\t)#a\b\u0003\u00195\u000bG/\u001a:jC2L'0\u001a:\u0002\u001b5\fG/\u001a:jC2L'0\u001a:!\u0003\u0015\u0019Gn\\:f)\t\ti\u0003E\u0002#\u0003_I1!!\r$\u0005\u0011)f.\u001b;\u00025\u0011,\u0017\r\\,ji\"\u001cuN\\:v[\u0016\u0014\bK]8qKJ$\u0018.Z:\u0015\u0005\u0005]\u0002\u0003B\u001dA\u0007\u000e\u000bab\u0019:fCR,7i\u001c8tk6,'\u000f\u0006\u0003\u0002>\u0005\u0005\u0005\u0003CA \u0003\u000b\nI%!\u001c\u000e\u0005\u0005\u0005#\u0002BA\"\u0003?\t\u0001b]2bY\u0006$7\u000f\\\u0005\u0005\u0003\u000f\n\tE\u0001\u0004T_V\u00148-\u001a\t\t\u0003\u0017\ny&a\u0019\u0002d5\u0011\u0011Q\n\u0006\u0005\u0003\u001f\n\t&\u0001\u0005d_:\u001cX/\\3s\u0015\u0011\t\u0019&!\u0016\u0002\u000f\rd\u0017.\u001a8ug*\u0019\u0001$a\u0016\u000b\t\u0005e\u00131L\u0001\u0007CB\f7\r[3\u000b\u0005\u0005u\u0013aA8sO&!\u0011\u0011MA'\u00059\u0019uN\\:v[\u0016\u0014(+Z2pe\u0012\u0004B!!\u001a\u0002l5\u0011\u0011q\r\u0006\u0004\u0003Sj\u0017\u0001\u00027b]\u001eL1!RA4!\u0011\ty'a\u001f\u000f\t\u0005E\u0014qO\u0007\u0003\u0003gRA!a\u0011\u0002v)\u0011\u0001dS\u0005\u0005\u0003s\n\u0019(\u0001\u0005D_:\u001cX/\\3s\u0013\u0011\ti(a \u0003\u000f\r{g\u000e\u001e:pY*!\u0011\u0011PA:\u0011\u0019\t\u0019I\u0005a\u0001\u0007\u0006I!/Z1e)>\u0004\u0018nY\u0001\biJ\f7m[3s)\u0019\tI)a$\u0002\u0012B\u0019\u0011-a#\n\u0007\u00055UC\u0001\u0007LC\u001a\\\u0017\r\u0016:bG.,'\u000f\u0003\u0004\u0002\u0004N\u0001\ra\u0011\u0005\b\u0003'\u001b\u0002\u0019AAK\u00039iWm]:bO\u0016l\u0015\r^2iKJ\u0004B!a&\u0002\u001e6\u0011\u0011\u0011\u0014\u0006\u0004\u00037;\u0012\u0001\u00039s_R|7m\u001c7\n\t\u0005}\u0015\u0011\u0014\u0002\r\u0017\u000647.Y'bi\u000eDWM\u001d")
public class KafkaTrackerPoll
implements KafkaLogging,
NameGen {
    private final Map<String, Object> consumerProperties;
    private final ActorSystem system;
    private final StatsEngine statsEngine;
    private final Clock clock;
    private final ConcurrentHashMap<String, TrackerAndController> trackers;
    private final CountDownLatch rebalancingLatch;
    private final Config disableLogsConfig;
    private final ActorSystem systemAkkaConsumer;
    private final Materializer materializer;
    private Logger logger;

    public String genName(String base) {
        return NameGen.genName$((NameGen)this, (String)base);
    }

    @Override
    public void logMessage(Function0<String> text, ProducerRecord<String, String> msg) {
        KafkaLogging.logMessage$(this, text, msg);
    }

    public Logger logger() {
        return this.logger;
    }

    public void com$typesafe$scalalogging$StrictLogging$_setter_$logger_$eq(Logger x$1) {
        this.logger = x$1;
    }

    private ConcurrentHashMap<String, TrackerAndController> trackers() {
        return this.trackers;
    }

    private CountDownLatch rebalancingLatch() {
        return this.rebalancingLatch;
    }

    public Config disableLogsConfig() {
        return this.disableLogsConfig;
    }

    public ActorSystem systemAkkaConsumer() {
        return this.systemAkkaConsumer;
    }

    public Materializer materializer() {
        return this.materializer;
    }

    public void close() {
        this.trackers().values().forEach(x0$1 -> {
            TrackerAndController trackerAndController = x0$1;
            if (trackerAndController == null) {
                throw new MatchError((Object)trackerAndController);
            }
            Consumer.Control consumerControl = trackerAndController.consumerControl();
            consumerControl.shutdown();
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        });
    }

    /*
     * WARNING - void declaration
     */
    private Map<String, String> dealWithConsumerProperties() {
        void var2_2;
        BoxedUnit boxedUnit;
        Map properties = (Map)((MapOps)this.consumerProperties.$plus$plus((IterableOnce)Predef$.MODULE$.Map().apply((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"enable.auto.commit"), (Object)"true"), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"auto.offset.reset"), (Object)"latest")})))).updatedWith((Object)"group.id", (Function1 & Serializable)x0$1 -> {
            Some some;
            Option option = x0$1;
            if (None$.MODULE$.equals(option)) {
                some = new Some((Object)new StringBuilder(13).append("gatling-test-").append(UUID.randomUUID()).toString());
            } else if (option instanceof Some) {
                Some some2 = (Some)option;
                Object value = some2.value();
                some = new Some(value);
            } else {
                throw new MatchError((Object)option);
            }
            return some;
        });
        Map updateProperties = (Map)properties.map((Function1 & Serializable)x0$2 -> {
            Tuple2 tuple2 = x0$2;
            if (tuple2 != null) {
                String key = (String)tuple2._1();
                Object value = tuple2._2();
                if (value instanceof String) {
                    String string = (String)value;
                    return Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)key), (Object)string);
                }
            }
            if (tuple2 == null) throw new MatchError((Object)tuple2);
            String key = (String)tuple2._1();
            Object value = tuple2._2();
            return Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)key), (Object)value.toString());
        });
        String consProps = ((IterableOnceOps)updateProperties.map((Function1 & Serializable)i -> new StringBuilder(2).append(i._1()).append(": ").append(i._2()).toString())).mkString("\n");
        if (this.logger().underlying().isDebugEnabled()) {
            this.logger().underlying().debug("Consumer properties:\n{}", (Object)consProps);
            boxedUnit = BoxedUnit.UNIT;
        } else {
            boxedUnit = BoxedUnit.UNIT;
        }
        return var2_2;
    }

    private Source<ConsumerRecord<String, String>, Consumer.Control> createConsumer(String readTopic) {
        BoxedUnit boxedUnit;
        Map<String, String> properties = this.dealWithConsumerProperties();
        String consumerName = (String)properties.apply((Object)"group.id");
        if (this.logger().underlying().isDebugEnabled()) {
            this.logger().underlying().debug("Create consumer - {}", (Object)consumerName);
            boxedUnit = BoxedUnit.UNIT;
        } else {
            boxedUnit = BoxedUnit.UNIT;
        }
        Config kafkaConfig = this.systemAkkaConsumer().settings().config().getConfig("akka.kafka.consumer");
        ConsumerSettings consumerSettings = ConsumerSettings$.MODULE$.apply(kafkaConfig, (Deserializer)new StringDeserializer(), (Deserializer)new StringDeserializer()).withProperties(properties);
        AutoSubscription subscription = Subscriptions$.MODULE$.topics((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new String[]{readTopic})).withPartitionAssignmentHandler((PartitionAssignmentHandler)new WaitRebalancing(consumerName, (Function0<BoxedUnit>)(JFunction0.mcV.sp & Serializable)() -> this.rebalancingLatch().countDown()));
        return Consumer$.MODULE$.plainSource(consumerSettings, (Subscription)subscription);
    }

    public KafkaTracker tracker(String readTopic, KafkaMatcher messageMatcher) {
        return this.trackers().computeIfAbsent(readTopic, x$1 -> {
            ActorRef actor = $this.system.actorOf(Tracker$.MODULE$.props($this.statsEngine, $this.clock), this.genName("kafkaTrackerActor"));
            Source<ConsumerRecord<String, String>, Consumer.Control> consumer = this.createConsumer(readTopic);
            Tuple2 tuple2 = (Tuple2)consumer.toMat((Graph)Sink$.MODULE$.foreach((Function1 & Serializable)record -> {
                KafkaTrackerPoll.$anonfun$tracker$2(this, messageMatcher, actor, record);
                return BoxedUnit.UNIT;
            }), Keep$.MODULE$.both()).run(this.materializer());
            if (tuple2 == null) {
                throw new MatchError((Object)tuple2);
            }
            Consumer.Control consumerControl = (Consumer.Control)tuple2._1();
            Future streamComplete = (Future)tuple2._2();
            Tuple2 tuple22 = new Tuple2((Object)consumerControl, (Object)streamComplete);
            Tuple2 tuple23 = tuple22;
            Consumer.Control consumerControl2 = (Consumer.Control)tuple23._1();
            Future streamComplete2 = (Future)tuple23._2();
            streamComplete2.onComplete((Function1 & Serializable)x$3 -> {
                BoxedUnit boxedUnit;
                if (this.logger().underlying().isDebugEnabled()) {
                    this.logger().underlying().debug("Kafka akka consumer stream has been completed.");
                    boxedUnit = BoxedUnit.UNIT;
                } else {
                    boxedUnit = BoxedUnit.UNIT;
                }
                return this.systemAkkaConsumer().terminate();
            }, ExecutionContext.Implicits$.MODULE$.global());
            this.rebalancingLatch().await();
            return new TrackerAndController(new KafkaTracker(actor), consumerControl2);
        }).kafkaTracker();
    }

    public static final /* synthetic */ void $anonfun$tracker$2(KafkaTrackerPoll $this, KafkaMatcher messageMatcher$1, ActorRef actor$1, ConsumerRecord record) {
        BoxedUnit boxedUnit;
        String matchId = messageMatcher$1.responseMatchId((ConsumerRecord<String, String>)record);
        if ($this.logger().underlying().isDebugEnabled()) {
            $this.logger().underlying().debug("Received Kafka message. Key: {} Payload: {}. With matchId - {}", new Object[]{record.key(), record.value(), matchId});
            boxedUnit = BoxedUnit.UNIT;
        } else {
            boxedUnit = BoxedUnit.UNIT;
        }
        MessageReceived x$1 = new MessageReceived(matchId, $this.clock.nowMillis(), (ConsumerRecord<String, String>)record);
        ActorRef x$2 = actor$1.$bang$default$2((Object)x$1);
        actor$1.$bang((Object)x$1, x$2);
    }

    public KafkaTrackerPoll(Map<String, Object> consumerProperties, ActorSystem system, StatsEngine statsEngine, Clock clock) {
        this.consumerProperties = consumerProperties;
        this.system = system;
        this.statsEngine = statsEngine;
        this.clock = clock;
        StrictLogging.$init$((StrictLogging)this);
        KafkaLogging.$init$(this);
        NameGen.$init$((NameGen)this);
        this.trackers = new ConcurrentHashMap();
        this.rebalancingLatch = new CountDownLatch(1);
        String configString = StringOps$.MODULE$.stripMargin$extension(Predef$.MODULE$.augmentString("\n        | akka {\n        |   loglevel = \"OFF\"\n        | }\n        "));
        Config config = ConfigFactory.parseString((String)configString);
        this.disableLogsConfig = ConfigFactory.load((Config)config);
        this.systemAkkaConsumer = ActorSystem$.MODULE$.apply("KafkaAkkaConsumer", this.disableLogsConfig());
        this.materializer = Materializer$.MODULE$.matFromSystem((ClassicActorSystemProvider)this.systemAkkaConsumer());
        Statics.releaseFence();
    }
}

