/*
 * Decompiled with CFR 0.152.
 */
package ru.tinkoff.gatling.kafka.client;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.CoordinatedShutdown;
import akka.actor.CoordinatedShutdown$;
import com.typesafe.scalalogging.Logger;
import com.typesafe.scalalogging.StrictLogging;
import io.gatling.commons.util.Clock;
import io.gatling.core.stats.StatsEngine;
import io.gatling.core.util.NameGen;
import java.io.Serializable;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.scala.ImplicitConversions$;
import org.apache.kafka.streams.scala.StreamsBuilder;
import org.apache.kafka.streams.scala.StreamsBuilder$;
import org.apache.kafka.streams.scala.serialization.Serdes$;
import ru.tinkoff.gatling.kafka.client.KafkaMessageTracker;
import ru.tinkoff.gatling.kafka.client.KafkaMessageTrackerActor;
import ru.tinkoff.gatling.kafka.client.KafkaMessageTrackerActor$;
import ru.tinkoff.gatling.kafka.package;
import ru.tinkoff.gatling.kafka.protocol.KafkaProtocol;
import ru.tinkoff.gatling.kafka.request.KafkaProtocolMessage;
import ru.tinkoff.gatling.kafka.request.KafkaProtocolMessage$;
import scala.Function0;
import scala.Function1;
import scala.Function2;
import scala.MatchError;
import scala.Option;
import scala.Tuple2;
import scala.collection.immutable.Map;
import scala.jdk.CollectionConverters$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.Statics;
import scala.runtime.java8.JFunction0;

@ScalaSignature(bytes="\u0006\u0005\u0005ub\u0001B\u0006\r\u0001]A\u0001b\u000f\u0001\u0003\u0002\u0003\u0006I\u0001\u0010\u0005\t\u000f\u0002\u0011\t\u0011)A\u0005\u0011\"A\u0001\u000b\u0001B\u0001B\u0003%\u0011\u000b\u0003\u0005X\u0001\t\u0005\t\u0015!\u0003Y\u0011\u0015y\u0006\u0001\"\u0001a\u0011\u001d9\u0007A1A\u0005\n!Da!\u001e\u0001!\u0002\u0013I\u0007b\u0002<\u0001\u0005\u0004%Ia\u001e\u0005\u0007y\u0002\u0001\u000b\u0011\u0002=\t\u000bu\u0004A\u0011\u0001@\u0003\u0019Q\u0013\u0018mY6feN\u0004vn\u001c7\u000b\u00055q\u0011AB2mS\u0016tGO\u0003\u0002\u0010!\u0005)1.\u00194lC*\u0011\u0011CE\u0001\bO\u0006$H.\u001b8h\u0015\t\u0019B#A\u0004uS:\\wN\u001a4\u000b\u0003U\t!A];\u0004\u0001M!\u0001\u0001\u0007\u00101!\tIB$D\u0001\u001b\u0015\u0005Y\u0012!B:dC2\f\u0017BA\u000f\u001b\u0005\u0019\te.\u001f*fMB\u0011q$\f\b\u0003A-r!!\t\u0016\u000f\u0005\tJcBA\u0012)\u001d\t!s%D\u0001&\u0015\t1c#\u0001\u0004=e>|GOP\u0005\u0002+%\u00111\u0003F\u0005\u0003#II!a\u0004\t\n\u00051r\u0011a\u00029bG.\fw-Z\u0005\u0003]=\u0012AbS1gW\u0006dunZ4j]\u001eT!\u0001\f\b\u0011\u0005EJT\"\u0001\u001a\u000b\u0005M\"\u0014\u0001B;uS2T!!\u000e\u001c\u0002\t\r|'/\u001a\u0006\u0003#]R\u0011\u0001O\u0001\u0003S>L!A\u000f\u001a\u0003\u000f9\u000bW.Z$f]\u0006y1\u000f\u001e:fC6\u001c8+\u001a;uS:<7\u000f\u0005\u0003>\u0003\u0012CbB\u0001 @!\t!#$\u0003\u0002A5\u00051\u0001K]3eK\u001aL!AQ\"\u0003\u00075\u000b\u0007O\u0003\u0002A5A\u0011Q(R\u0005\u0003\r\u000e\u0013aa\u0015;sS:<\u0017AB:zgR,W\u000e\u0005\u0002J\u001d6\t!J\u0003\u0002L\u0019\u0006)\u0011m\u0019;pe*\tQ*\u0001\u0003bW.\f\u0017BA(K\u0005-\t5\r^8s'f\u001cH/Z7\u0002\u0017M$\u0018\r^:F]\u001eLg.\u001a\t\u0003%Vk\u0011a\u0015\u0006\u0003)R\nQa\u001d;biNL!AV*\u0003\u0017M#\u0018\r^:F]\u001eLg.Z\u0001\u0006G2|7m\u001b\t\u00033vk\u0011A\u0017\u0006\u0003gmS!\u0001\u0018\u001c\u0002\u000f\r|W.\\8og&\u0011aL\u0017\u0002\u0006\u00072|7m[\u0001\u0007y%t\u0017\u000e\u001e \u0015\u000b\u0005\u001cG-\u001a4\u0011\u0005\t\u0004Q\"\u0001\u0007\t\u000bm*\u0001\u0019\u0001\u001f\t\u000b\u001d+\u0001\u0019\u0001%\t\u000bA+\u0001\u0019A)\t\u000b]+\u0001\u0019\u0001-\u0002\u0011Q\u0014\u0018mY6feN,\u0012!\u001b\t\u0005UB$%/D\u0001l\u0015\taW.\u0001\u0006d_:\u001cWO\u001d:f]RT!a\r8\u000b\u0003=\fAA[1wC&\u0011\u0011o\u001b\u0002\u0012\u0007>t7-\u001e:sK:$\b*Y:i\u001b\u0006\u0004\bC\u00012t\u0013\t!HBA\nLC\u001a\\\u0017-T3tg\u0006<W\r\u0016:bG.,'/A\u0005ue\u0006\u001c7.\u001a:tA\u0005)\u0001O]8qgV\t\u0001\u0010\u0005\u0002zu6\tQ.\u0003\u0002|[\nQ\u0001K]8qKJ$\u0018.Z:\u0002\rA\u0014x\u000e]:!\u0003\u001d!(/Y2lKJ$\u0002B]@\u0002\u0004\u0005\u001d\u0011\u0011\u0005\u0005\u0007\u0003\u0003Q\u0001\u0019\u0001#\u0002\u0015%t\u0007/\u001e;U_BL7\r\u0003\u0004\u0002\u0006)\u0001\r\u0001R\u0001\f_V$\b/\u001e;U_BL7\rC\u0004\u0002\n)\u0001\r!a\u0003\u0002\u001d5,7o]1hK6\u000bGo\u00195feB!\u0011QBA\u000e\u001d\u0011\ty!!\u0006\u000f\u0007\u0001\n\t\"C\u0002\u0002\u00149\t\u0001\u0002\u001d:pi>\u001cw\u000e\\\u0005\u0005\u0003/\tI\"A\u0007LC\u001a\\\u0017\r\u0015:pi>\u001cw\u000e\u001c\u0006\u0004\u0003'q\u0011\u0002BA\u000f\u0003?\u0011AbS1gW\u0006l\u0015\r^2iKJTA!a\u0006\u0002\u001a!9\u00111\u0005\u0006A\u0002\u0005\u0015\u0012a\u0005:fgB|gn]3Ue\u0006t7OZ8s[\u0016\u0014\b#B\r\u0002(\u0005-\u0012bAA\u00155\t1q\n\u001d;j_:\u0004r!GA\u0017\u0003c\t\t$C\u0002\u00020i\u0011\u0011BR;oGRLwN\\\u0019\u0011\t\u0005M\u0012\u0011H\u0007\u0003\u0003kQ1!a\u000e\u000f\u0003\u001d\u0011X-];fgRLA!a\u000f\u00026\t!2*\u00194lCB\u0013x\u000e^8d_2lUm]:bO\u0016\u0004")
public class TrackersPool
implements package.KafkaLogging,
NameGen {
    private final ActorSystem system;
    private final StatsEngine statsEngine;
    private final Clock clock;
    private final ConcurrentHashMap<String, KafkaMessageTracker> trackers;
    private final Properties props;
    private Logger logger;

    public String genName(String base) {
        return NameGen.genName$((NameGen)this, (String)base);
    }

    @Override
    public void logMessage(Function0<String> text, KafkaProtocolMessage msg) {
        package.KafkaLogging.logMessage$(this, text, msg);
    }

    public Logger logger() {
        return this.logger;
    }

    public void com$typesafe$scalalogging$StrictLogging$_setter_$logger_$eq(Logger x$1) {
        this.logger = x$1;
    }

    private ConcurrentHashMap<String, KafkaMessageTracker> trackers() {
        return this.trackers;
    }

    private Properties props() {
        return this.props;
    }

    public KafkaMessageTracker tracker(String inputTopic, String outputTopic, KafkaProtocol.KafkaMatcher messageMatcher, Option<Function1<KafkaProtocolMessage, KafkaProtocolMessage>> responseTransformer) {
        return this.trackers().computeIfAbsent(outputTopic, x$1 -> {
            ActorRef actor = $this.system.actorOf(KafkaMessageTrackerActor$.MODULE$.props($this.statsEngine, $this.clock), this.genName("kafkaTrackerActor"));
            StreamsBuilder builder = new StreamsBuilder(StreamsBuilder$.MODULE$.$lessinit$greater$default$1());
            builder.stream(outputTopic, ImplicitConversions$.MODULE$.consumedFromSerde(Serdes$.MODULE$.byteArraySerde(), Serdes$.MODULE$.byteArraySerde())).foreach((Function2 & Serializable)(x0$1, x1$1) -> {
                TrackersPool.$anonfun$tracker$2(this, inputTopic, outputTopic, messageMatcher, responseTransformer, actor, x0$1, x1$1);
                return BoxedUnit.UNIT;
            });
            KafkaStreams streams = new KafkaStreams(builder.build(), this.props());
            streams.cleanUp();
            streams.start();
            ((CoordinatedShutdown)CoordinatedShutdown$.MODULE$.apply($this.system)).addJvmShutdownHook((Function0)(JFunction0.mcV.sp & Serializable)() -> streams.close());
            return new KafkaMessageTracker(actor);
        });
    }

    public static final /* synthetic */ void $anonfun$tracker$2(TrackersPool $this, String inputTopic$1, String outputTopic$1, KafkaProtocol.KafkaMatcher messageMatcher$1, Option responseTransformer$1, ActorRef actor$1, byte[] x0$1, byte[] x1$1) {
        Tuple2 tuple2 = new Tuple2((Object)x0$1, (Object)x1$1);
        if (tuple2 != null) {
            BoxedUnit boxedUnit;
            byte[] v;
            byte[] k = (byte[])tuple2._1();
            KafkaProtocolMessage message = new KafkaProtocolMessage(k, v = (byte[])tuple2._2(), inputTopic$1, outputTopic$1, KafkaProtocolMessage$.MODULE$.apply$default$5(), KafkaProtocolMessage$.MODULE$.apply$default$6());
            if (messageMatcher$1.responseMatch(message) == null) {
                if ($this.logger().underlying().isErrorEnabled()) {
                    $this.logger().underlying().error("no messageMatcher key for read message");
                    return;
                }
                return;
            }
            if (k == null || v == null) {
                if ($this.logger().underlying().isInfoEnabled()) {
                    $this.logger().underlying().info(" --- received message with null key or value");
                    boxedUnit = BoxedUnit.UNIT;
                } else {
                    boxedUnit = BoxedUnit.UNIT;
                }
            } else if ($this.logger().underlying().isInfoEnabled()) {
                $this.logger().underlying().info(" --- received  {} {}", new Object[]{new String(k), new String(v)});
                boxedUnit = BoxedUnit.UNIT;
            } else {
                boxedUnit = BoxedUnit.UNIT;
            }
            long receivedTimestamp = $this.clock.nowMillis();
            byte[] replyId = messageMatcher$1.responseMatch(message);
            if (k != null) {
                $this.logMessage((Function0<String>)(Function0 & Serializable)() -> new StringBuilder(20).append("Record received key=").append(new String(k)).toString(), message);
            } else {
                $this.logMessage((Function0<String>)(Function0 & Serializable)() -> "Record received key=null", message);
            }
            KafkaMessageTrackerActor.MessageConsumed x$1 = new KafkaMessageTrackerActor.MessageConsumed(replyId, receivedTimestamp, (KafkaProtocolMessage)responseTransformer$1.map((Function1 & Serializable)x$2 -> (KafkaProtocolMessage)x$2.apply((Object)message)).getOrElse((Function0 & Serializable)() -> message));
            ActorRef x$22 = actor$1.$bang$default$2((Object)x$1);
            actor$1.$bang((Object)x$1, x$22);
            return;
        }
        throw new MatchError((Object)tuple2);
    }

    public TrackersPool(Map<String, Object> streamsSettings, ActorSystem system, StatsEngine statsEngine, Clock clock) {
        this.system = system;
        this.statsEngine = statsEngine;
        this.clock = clock;
        StrictLogging.$init$((StrictLogging)this);
        package.KafkaLogging.$init$(this);
        NameGen.$init$((NameGen)this);
        this.trackers = new ConcurrentHashMap();
        this.props = new Properties();
        this.props().putAll((java.util.Map<?, ?>)CollectionConverters$.MODULE$.MapHasAsJava(streamsSettings).asJava());
        Statics.releaseFence();
    }
}

