/*
 * Decompiled with CFR 0.152.
 */
package org.galaxio.gatling.kafka.client;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.CoordinatedShutdown;
import akka.actor.CoordinatedShutdown$;
import com.typesafe.scalalogging.Logger;
import com.typesafe.scalalogging.StrictLogging;
import io.gatling.commons.util.Clock;
import io.gatling.core.stats.StatsEngine;
import io.gatling.core.util.NameGen;
import java.io.Serializable;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.scala.ImplicitConversions$;
import org.apache.kafka.streams.scala.StreamsBuilder;
import org.apache.kafka.streams.scala.StreamsBuilder$;
import org.apache.kafka.streams.scala.serialization.Serdes$;
import org.galaxio.gatling.kafka.client.KafkaMessageTracker;
import org.galaxio.gatling.kafka.client.KafkaMessageTrackerActor;
import org.galaxio.gatling.kafka.client.KafkaMessageTrackerActor$;
import org.galaxio.gatling.kafka.package;
import org.galaxio.gatling.kafka.protocol.KafkaProtocol;
import org.galaxio.gatling.kafka.request.KafkaProtocolMessage;
import org.galaxio.gatling.kafka.request.KafkaProtocolMessage$;
import scala.Function0;
import scala.Function1;
import scala.Function2;
import scala.MatchError;
import scala.Option;
import scala.Tuple2;
import scala.collection.immutable.Map;
import scala.jdk.CollectionConverters$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.Statics;
import scala.runtime.java8.JFunction0;

@ScalaSignature(bytes="\u0006\u0005\u0005ub\u0001B\u0006\r\u0001]A\u0001b\u000f\u0001\u0003\u0002\u0003\u0006I\u0001\u0010\u0005\t\u000f\u0002\u0011\t\u0011)A\u0005\u0011\"A\u0001\u000b\u0001B\u0001B\u0003%\u0011\u000b\u0003\u0005X\u0001\t\u0005\t\u0015!\u0003Y\u0011\u0015y\u0006\u0001\"\u0001a\u0011\u001d9\u0007A1A\u0005\n!Da!\u001e\u0001!\u0002\u0013I\u0007b\u0002<\u0001\u0005\u0004%Ia\u001e\u0005\u0007y\u0002\u0001\u000b\u0011\u0002=\t\u000bu\u0004A\u0011\u0001@\u0003\u0019Q\u0013\u0018mY6feN\u0004vn\u001c7\u000b\u00055q\u0011AB2mS\u0016tGO\u0003\u0002\u0010!\u0005)1.\u00194lC*\u0011\u0011CE\u0001\bO\u0006$H.\u001b8h\u0015\t\u0019B#A\u0004hC2\f\u00070[8\u000b\u0003U\t1a\u001c:h\u0007\u0001\u0019B\u0001\u0001\r\u001faA\u0011\u0011\u0004H\u0007\u00025)\t1$A\u0003tG\u0006d\u0017-\u0003\u0002\u001e5\t1\u0011I\\=SK\u001a\u0004\"aH\u0017\u000f\u0005\u0001ZcBA\u0011+\u001d\t\u0011\u0013F\u0004\u0002$Q9\u0011AeJ\u0007\u0002K)\u0011aEF\u0001\u0007yI|w\u000e\u001e \n\u0003UI!a\u0005\u000b\n\u0005E\u0011\u0012BA\b\u0011\u0013\tac\"A\u0004qC\u000e\\\u0017mZ3\n\u00059z#\u0001D&bM.\fGj\\4hS:<'B\u0001\u0017\u000f!\t\t\u0014(D\u00013\u0015\t\u0019D'\u0001\u0003vi&d'BA\u001b7\u0003\u0011\u0019wN]3\u000b\u0005E9$\"\u0001\u001d\u0002\u0005%|\u0017B\u0001\u001e3\u0005\u001dq\u0015-\\3HK:\fqb\u001d;sK\u0006l7oU3ui&twm\u001d\t\u0005{\u0005#\u0005D\u0004\u0002?\u007fA\u0011AEG\u0005\u0003\u0001j\ta\u0001\u0015:fI\u00164\u0017B\u0001\"D\u0005\ri\u0015\r\u001d\u0006\u0003\u0001j\u0001\"!P#\n\u0005\u0019\u001b%AB*ue&tw-\u0001\u0004tsN$X-\u001c\t\u0003\u0013:k\u0011A\u0013\u0006\u0003\u00172\u000bQ!Y2u_JT\u0011!T\u0001\u0005C.\\\u0017-\u0003\u0002P\u0015\nY\u0011i\u0019;peNK8\u000f^3n\u0003-\u0019H/\u0019;t\u000b:<\u0017N\\3\u0011\u0005I+V\"A*\u000b\u0005Q#\u0014!B:uCR\u001c\u0018B\u0001,T\u0005-\u0019F/\u0019;t\u000b:<\u0017N\\3\u0002\u000b\rdwnY6\u0011\u0005ekV\"\u0001.\u000b\u0005MZ&B\u0001/7\u0003\u001d\u0019w.\\7p]NL!A\u0018.\u0003\u000b\rcwnY6\u0002\rqJg.\u001b;?)\u0015\t7\rZ3g!\t\u0011\u0007!D\u0001\r\u0011\u0015YT\u00011\u0001=\u0011\u00159U\u00011\u0001I\u0011\u0015\u0001V\u00011\u0001R\u0011\u00159V\u00011\u0001Y\u0003!!(/Y2lKJ\u001cX#A5\u0011\t)\u0004HI]\u0007\u0002W*\u0011A.\\\u0001\u000bG>t7-\u001e:sK:$(BA\u001ao\u0015\u0005y\u0017\u0001\u00026bm\u0006L!!]6\u0003#\r{gnY;se\u0016tG\u000fS1tQ6\u000b\u0007\u000f\u0005\u0002cg&\u0011A\u000f\u0004\u0002\u0014\u0017\u000647.Y'fgN\fw-\u001a+sC\u000e\\WM]\u0001\niJ\f7m[3sg\u0002\nQ\u0001\u001d:paN,\u0012\u0001\u001f\t\u0003sjl\u0011!\\\u0005\u0003w6\u0014!\u0002\u0015:pa\u0016\u0014H/[3t\u0003\u0019\u0001(o\u001c9tA\u00059AO]1dW\u0016\u0014H\u0003\u0003:\u0000\u0003\u0007\t9!!\t\t\r\u0005\u0005!\u00021\u0001E\u0003)Ig\u000e];u)>\u0004\u0018n\u0019\u0005\u0007\u0003\u000bQ\u0001\u0019\u0001#\u0002\u0017=,H\u000f];u)>\u0004\u0018n\u0019\u0005\b\u0003\u0013Q\u0001\u0019AA\u0006\u00039iWm]:bO\u0016l\u0015\r^2iKJ\u0004B!!\u0004\u0002\u001c9!\u0011qBA\u000b\u001d\r\u0001\u0013\u0011C\u0005\u0004\u0003'q\u0011\u0001\u00039s_R|7m\u001c7\n\t\u0005]\u0011\u0011D\u0001\u000e\u0017\u000647.\u0019)s_R|7m\u001c7\u000b\u0007\u0005Ma\"\u0003\u0003\u0002\u001e\u0005}!\u0001D&bM.\fW*\u0019;dQ\u0016\u0014(\u0002BA\f\u00033Aq!a\t\u000b\u0001\u0004\t)#A\nsKN\u0004xN\\:f)J\fgn\u001d4pe6,'\u000fE\u0003\u001a\u0003O\tY#C\u0002\u0002*i\u0011aa\u00149uS>t\u0007cB\r\u0002.\u0005E\u0012\u0011G\u0005\u0004\u0003_Q\"!\u0003$v]\u000e$\u0018n\u001c82!\u0011\t\u0019$!\u000f\u000e\u0005\u0005U\"bAA\u001c\u001d\u00059!/Z9vKN$\u0018\u0002BA\u001e\u0003k\u0011AcS1gW\u0006\u0004&o\u001c;pG>dW*Z:tC\u001e,\u0007")
public class TrackersPool
implements package.KafkaLogging,
NameGen {
    private final ActorSystem system;
    private final StatsEngine statsEngine;
    private final Clock clock;
    private final ConcurrentHashMap<String, KafkaMessageTracker> trackers;
    private final Properties props;
    private Logger logger;

    public String genName(String base) {
        return NameGen.genName$((NameGen)this, (String)base);
    }

    @Override
    public void logMessage(Function0<String> text, KafkaProtocolMessage msg) {
        package.KafkaLogging.logMessage$(this, text, msg);
    }

    public Logger logger() {
        return this.logger;
    }

    public void com$typesafe$scalalogging$StrictLogging$_setter_$logger_$eq(Logger x$1) {
        this.logger = x$1;
    }

    private ConcurrentHashMap<String, KafkaMessageTracker> trackers() {
        return this.trackers;
    }

    private Properties props() {
        return this.props;
    }

    public KafkaMessageTracker tracker(String inputTopic, String outputTopic, KafkaProtocol.KafkaMatcher messageMatcher, Option<Function1<KafkaProtocolMessage, KafkaProtocolMessage>> responseTransformer) {
        return this.trackers().computeIfAbsent(outputTopic, x$1 -> {
            ActorRef actor = $this.system.actorOf(KafkaMessageTrackerActor$.MODULE$.props($this.statsEngine, $this.clock), this.genName("kafkaTrackerActor"));
            StreamsBuilder builder = new StreamsBuilder(StreamsBuilder$.MODULE$.$lessinit$greater$default$1());
            builder.stream(outputTopic, ImplicitConversions$.MODULE$.consumedFromSerde(Serdes$.MODULE$.byteArraySerde(), Serdes$.MODULE$.byteArraySerde())).foreach((Function2 & Serializable)(x0$1, x1$1) -> {
                TrackersPool.$anonfun$tracker$2(this, inputTopic, outputTopic, messageMatcher, responseTransformer, actor, x0$1, x1$1);
                return BoxedUnit.UNIT;
            });
            KafkaStreams streams = new KafkaStreams(builder.build(), this.props());
            streams.cleanUp();
            streams.start();
            ((CoordinatedShutdown)CoordinatedShutdown$.MODULE$.apply($this.system)).addJvmShutdownHook((Function0)(JFunction0.mcV.sp & Serializable)() -> streams.close());
            return new KafkaMessageTracker(actor);
        });
    }

    public static final /* synthetic */ void $anonfun$tracker$2(TrackersPool $this, String inputTopic$1, String outputTopic$1, KafkaProtocol.KafkaMatcher messageMatcher$1, Option responseTransformer$1, ActorRef actor$1, byte[] x0$1, byte[] x1$1) {
        Tuple2 tuple2 = new Tuple2((Object)x0$1, (Object)x1$1);
        if (tuple2 != null) {
            BoxedUnit boxedUnit;
            byte[] v;
            byte[] k = (byte[])tuple2._1();
            KafkaProtocolMessage message = new KafkaProtocolMessage(k, v = (byte[])tuple2._2(), inputTopic$1, outputTopic$1, KafkaProtocolMessage$.MODULE$.apply$default$5(), KafkaProtocolMessage$.MODULE$.apply$default$6());
            if (messageMatcher$1.responseMatch(message) == null) {
                if ($this.logger().underlying().isErrorEnabled()) {
                    $this.logger().underlying().error("no messageMatcher key for read message");
                    return;
                }
                return;
            }
            if (k == null || v == null) {
                if ($this.logger().underlying().isInfoEnabled()) {
                    $this.logger().underlying().info(" --- received message with null key or value");
                    boxedUnit = BoxedUnit.UNIT;
                } else {
                    boxedUnit = BoxedUnit.UNIT;
                }
            } else if ($this.logger().underlying().isInfoEnabled()) {
                $this.logger().underlying().info(" --- received  {} {}", new Object[]{new String(k), new String(v)});
                boxedUnit = BoxedUnit.UNIT;
            } else {
                boxedUnit = BoxedUnit.UNIT;
            }
            long receivedTimestamp = $this.clock.nowMillis();
            byte[] replyId = messageMatcher$1.responseMatch(message);
            if (k != null) {
                $this.logMessage((Function0<String>)(Function0 & Serializable)() -> new StringBuilder(20).append("Record received key=").append(new String(k)).toString(), message);
            } else {
                $this.logMessage((Function0<String>)(Function0 & Serializable)() -> "Record received key=null", message);
            }
            KafkaMessageTrackerActor.MessageConsumed x$1 = new KafkaMessageTrackerActor.MessageConsumed(replyId, receivedTimestamp, (KafkaProtocolMessage)responseTransformer$1.map((Function1 & Serializable)x$2 -> (KafkaProtocolMessage)x$2.apply((Object)message)).getOrElse((Function0 & Serializable)() -> message));
            ActorRef x$22 = actor$1.$bang$default$2((Object)x$1);
            actor$1.$bang((Object)x$1, x$22);
            return;
        }
        throw new MatchError((Object)tuple2);
    }

    public TrackersPool(Map<String, Object> streamsSettings, ActorSystem system, StatsEngine statsEngine, Clock clock) {
        this.system = system;
        this.statsEngine = statsEngine;
        this.clock = clock;
        StrictLogging.$init$((StrictLogging)this);
        package.KafkaLogging.$init$(this);
        NameGen.$init$((NameGen)this);
        this.trackers = new ConcurrentHashMap();
        this.props = new Properties();
        this.props().putAll((java.util.Map<?, ?>)CollectionConverters$.MODULE$.MapHasAsJava(streamsSettings).asJava());
        Statics.releaseFence();
    }
}

