/*
 * Decompiled with CFR 0.152.
 */
package ai.chronon.spark.streaming;

import ai.chronon.online.KVStore;
import com.yahoo.sketches.kll.KllFloatsSketch;
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import org.apache.commons.io.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Function1;
import scala.Predef$;
import scala.Serializable;
import scala.StringContext;
import scala.collection.Seq;
import scala.collection.immutable.StringOps;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;

@ScalaSignature(bytes="\u0006\u0001\u0005ec\u0001B\u0001\u0003\u0001-\u0011ab\u0015;sK\u0006l\u0017N\\4Ti\u0006$8O\u0003\u0002\u0004\t\u0005I1\u000f\u001e:fC6Lgn\u001a\u0006\u0003\u000b\u0019\tQa\u001d9be.T!a\u0002\u0005\u0002\u000f\rD'o\u001c8p]*\t\u0011\"\u0001\u0002bS\u000e\u00011C\u0001\u0001\r!\ti\u0001#D\u0001\u000f\u0015\u0005y\u0011!B:dC2\f\u0017BA\t\u000f\u0005\u0019\te.\u001f*fM\"A1\u0003\u0001BC\u0002\u0013\u0005A#A\nqk\nd\u0017n\u001d5EK2\f\u0017pU3d_:$7/F\u0001\u0016!\tia#\u0003\u0002\u0018\u001d\t\u0019\u0011J\u001c;\t\u0011e\u0001!\u0011!Q\u0001\nU\tA\u0003];cY&\u001c\b\u000eR3mCf\u001cVmY8oIN\u0004\u0003\"B\u000e\u0001\t\u0003a\u0012A\u0002\u001fj]&$h\b\u0006\u0002\u001e?A\u0011a\u0004A\u0007\u0002\u0005!)1C\u0007a\u0001+!A\u0011\u0005\u0001EC\u0002\u0013\u0005!%\u0001\u0004m_\u001e<WM]\u000b\u0002GA\u0011A%K\u0007\u0002K)\u0011aeJ\u0001\u0006g24GG\u001b\u0006\u0002Q\u0005\u0019qN]4\n\u0005)*#A\u0002'pO\u001e,'\u000f\u0003\u0005-\u0001!\u0005\t\u0015)\u0003$\u0003\u001dawnZ4fe\u0002B#a\u000b\u0018\u0011\u00055y\u0013B\u0001\u0019\u000f\u0005%!(/\u00198tS\u0016tG\u000fC\u00043\u0001\u0001\u0007I\u0011B\u001a\u0002!1\fG/\u001a8ds\"K7\u000f^8he\u0006lW#\u0001\u001b\u0011\u0005UrT\"\u0001\u001c\u000b\u0005]B\u0014aA6mY*\u0011\u0011HO\u0001\tg.,Go\u00195fg*\u00111\bP\u0001\u0006s\u0006Dwn\u001c\u0006\u0002{\u0005\u00191m\\7\n\u0005}2$aD&mY\u001acw.\u0019;t'.,Go\u00195\t\u000f\u0005\u0003\u0001\u0019!C\u0005\u0005\u0006!B.\u0019;f]\u000eL\b*[:u_\u001e\u0014\u0018-\\0%KF$\"a\u0011$\u0011\u00055!\u0015BA#\u000f\u0005\u0011)f.\u001b;\t\u000f\u001d\u0003\u0015\u0011!a\u0001i\u0005\u0019\u0001\u0010J\u0019\t\r%\u0003\u0001\u0015)\u00035\u0003Ea\u0017\r^3oGfD\u0015n\u001d;pOJ\fW\u000e\t\u0005\b\u0017\u0002\u0001\r\u0011\"\u0003M\u00039a\u0017\r^3oGfl5\u000fV8uC2,\u0012!\u0014\t\u0003\u001b9K!a\u0014\b\u0003\t1{gn\u001a\u0005\b#\u0002\u0001\r\u0011\"\u0003S\u0003Ia\u0017\r^3oGfl5\u000fV8uC2|F%Z9\u0015\u0005\r\u001b\u0006bB$Q\u0003\u0003\u0005\r!\u0014\u0005\u0007+\u0002\u0001\u000b\u0015B'\u0002\u001f1\fG/\u001a8ds6\u001bHk\u001c;bY\u0002Bqa\u0016\u0001A\u0002\u0013%A*A\u0006xe&$Xm\u001d+pi\u0006d\u0007bB-\u0001\u0001\u0004%IAW\u0001\u0010oJLG/Z:U_R\fGn\u0018\u0013fcR\u00111i\u0017\u0005\b\u000fb\u000b\t\u00111\u0001N\u0011\u0019i\u0006\u0001)Q\u0005\u001b\u0006aqO]5uKN$v\u000e^1mA!9q\f\u0001a\u0001\n\u0013a\u0015!D6fs\nKH/Z:U_R\fG\u000eC\u0004b\u0001\u0001\u0007I\u0011\u00022\u0002#-,\u0017PQ=uKN$v\u000e^1m?\u0012*\u0017\u000f\u0006\u0002DG\"9q\tYA\u0001\u0002\u0004i\u0005BB3\u0001A\u0003&Q*\u0001\blKf\u0014\u0015\u0010^3t)>$\u0018\r\u001c\u0011\t\u000f\u001d\u0004\u0001\u0019!C\u0005\u0019\u0006ya/\u00197vK\nKH/Z:U_R\fG\u000eC\u0004j\u0001\u0001\u0007I\u0011\u00026\u0002'Y\fG.^3CsR,7\u000fV8uC2|F%Z9\u0015\u0005\r[\u0007bB$i\u0003\u0003\u0005\r!\u0014\u0005\u0007[\u0002\u0001\u000b\u0015B'\u0002!Y\fG.^3CsR,7\u000fV8uC2\u0004\u0003bB8\u0001\u0001\u0004%I\u0001T\u0001\bgR\f'\u000f^'t\u0011\u001d\t\b\u00011A\u0005\nI\f1b\u001d;beRl5o\u0018\u0013fcR\u00111i\u001d\u0005\b\u000fB\f\t\u00111\u0001N\u0011\u0019)\b\u0001)Q\u0005\u001b\u0006A1\u000f^1si6\u001b\b\u0005C\u0004x\u0001\t\u0007I\u0011\u0002=\u0002\u0007U$8-F\u0001z!\rQ\u00181A\u0007\u0002w*\u0011A0`\u0001\u0007M>\u0014X.\u0019;\u000b\u0005y|\u0018\u0001\u0002;j[\u0016T!!!\u0001\u0002\t)\fg/Y\u0005\u0004\u0003\u000bY(!\u0005#bi\u0016$\u0016.\\3G_Jl\u0017\r\u001e;fe\"9\u0011\u0011\u0002\u0001!\u0002\u0013I\u0018\u0001B;uG\u0002Bq!!\u0004\u0001\t\u0013\ty!\u0001\u0006uS6,7\u000b\u001e:j]\u001e$b!!\u0005\u0002\u001e\u0005\u0005\u0002\u0003BA\n\u00033i!!!\u0006\u000b\u0007\u0005]q0\u0001\u0003mC:<\u0017\u0002BA\u000e\u0003+\u0011aa\u0015;sS:<\u0007bBA\u0010\u0003\u0017\u0001\r!_\u0001\nM>\u0014X.\u0019;uKJDq!a\t\u0002\f\u0001\u0007Q*\u0001\u0002ug\"9\u0011q\u0005\u0001\u0005\u0002\u0005%\u0012a\u00039sS:$8\u000b^1ukN$\u0012a\u0011\u0005\b\u0003[\u0001A\u0011AA\u0018\u0003%Ign\u0019:f[\u0016tG\u000fF\u0002D\u0003cA\u0001\"a\r\u0002,\u0001\u0007\u0011QG\u0001\u000baV$(+Z9vKN$\b\u0003BA\u001c\u0003'rA!!\u000f\u0002N9!\u00111HA%\u001d\u0011\ti$a\u0012\u000f\t\u0005}\u0012QI\u0007\u0003\u0003\u0003R1!a\u0011\u000b\u0003\u0019a$o\\8u}%\t\u0011\"\u0003\u0002\b\u0011%\u0019\u00111\n\u0004\u0002\r=tG.\u001b8f\u0013\u0011\ty%!\u0015\u0002\u000f-36\u000b^8sK*\u0019\u00111\n\u0004\n\t\u0005U\u0013q\u000b\u0002\u000b!V$(+Z9vKN$(\u0002BA(\u0003#\u0002")
public class StreamingStats {
    private final int publishDelaySeconds;
    private transient Logger logger;
    private KllFloatsSketch ai$chronon$spark$streaming$StreamingStats$$latencyHistogram;
    private long ai$chronon$spark$streaming$StreamingStats$$latencyMsTotal;
    private long writesTotal;
    private long keyBytesTotal;
    private long valueBytesTotal;
    private long startMs;
    private final DateTimeFormatter utc;
    private volatile transient boolean bitmap$trans$0;

    private Logger logger$lzycompute() {
        StreamingStats streamingStats = this;
        synchronized (streamingStats) {
            if (!this.bitmap$trans$0) {
                this.logger = LoggerFactory.getLogger(this.getClass());
                this.bitmap$trans$0 = true;
            }
            return this.logger;
        }
    }

    public int publishDelaySeconds() {
        return this.publishDelaySeconds;
    }

    public Logger logger() {
        return this.bitmap$trans$0 ? this.logger : this.logger$lzycompute();
    }

    public KllFloatsSketch ai$chronon$spark$streaming$StreamingStats$$latencyHistogram() {
        return this.ai$chronon$spark$streaming$StreamingStats$$latencyHistogram;
    }

    private void ai$chronon$spark$streaming$StreamingStats$$latencyHistogram_$eq(KllFloatsSketch x$1) {
        this.ai$chronon$spark$streaming$StreamingStats$$latencyHistogram = x$1;
    }

    public long ai$chronon$spark$streaming$StreamingStats$$latencyMsTotal() {
        return this.ai$chronon$spark$streaming$StreamingStats$$latencyMsTotal;
    }

    public void ai$chronon$spark$streaming$StreamingStats$$latencyMsTotal_$eq(long x$1) {
        this.ai$chronon$spark$streaming$StreamingStats$$latencyMsTotal = x$1;
    }

    private long writesTotal() {
        return this.writesTotal;
    }

    private void writesTotal_$eq(long x$1) {
        this.writesTotal = x$1;
    }

    private long keyBytesTotal() {
        return this.keyBytesTotal;
    }

    private void keyBytesTotal_$eq(long x$1) {
        this.keyBytesTotal = x$1;
    }

    private long valueBytesTotal() {
        return this.valueBytesTotal;
    }

    private void valueBytesTotal_$eq(long x$1) {
        this.valueBytesTotal = x$1;
    }

    private long startMs() {
        return this.startMs;
    }

    private void startMs_$eq(long x$1) {
        this.startMs = x$1;
    }

    private DateTimeFormatter utc() {
        return this.utc;
    }

    private String timeString(DateTimeFormatter formatter, long ts) {
        return formatter.format(Instant.ofEpochMilli(ts));
    }

    public void printStatus() {
        if (this.writesTotal() > 0L) {
            long now = System.currentTimeMillis();
            String threadName = new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Thread-", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToLong((long)Thread.currentThread().getId())}));
            float medianLatency = this.ai$chronon$spark$streaming$StreamingStats$$latencyHistogram().getQuantile(0.5);
            float p95Latency = this.ai$chronon$spark$streaming$StreamingStats$$latencyHistogram().getQuantile(0.95);
            float p99Latency = this.ai$chronon$spark$streaming$StreamingStats$$latencyHistogram().getQuantile(0.99);
            this.logger().info(new StringOps(Predef$.MODULE$.augmentString(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"\n         |[", "][", "] Wrote ", " records in last ", " ms.         \n         | Latency ms: ", " (avg) / ", " (median) / ", " (p95) / ", " (p99) \n         |   Key Size: ", " bytes (avg) / ", " (total)\n         | Value Size: ", " bytes (avg) / ", " (total)\n         |"})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{threadName, this.timeString(this.utc(), now), BoxesRunTime.boxToLong((long)this.writesTotal()), BoxesRunTime.boxToLong((long)(now - this.startMs())), BoxesRunTime.boxToLong((long)(this.ai$chronon$spark$streaming$StreamingStats$$latencyMsTotal() / this.writesTotal())), BoxesRunTime.boxToFloat((float)medianLatency), BoxesRunTime.boxToFloat((float)p95Latency), BoxesRunTime.boxToFloat((float)p99Latency), BoxesRunTime.boxToLong((long)(this.keyBytesTotal() / this.writesTotal())), this.readable$1().apply((Object)BoxesRunTime.boxToLong((long)this.keyBytesTotal())), BoxesRunTime.boxToLong((long)(this.valueBytesTotal() / this.writesTotal())), this.readable$1().apply((Object)BoxesRunTime.boxToLong((long)this.valueBytesTotal()))})))).stripMargin());
            this.ai$chronon$spark$streaming$StreamingStats$$latencyMsTotal_$eq(0L);
            this.writesTotal_$eq(0L);
            this.keyBytesTotal_$eq(0L);
            this.valueBytesTotal_$eq(0L);
            this.ai$chronon$spark$streaming$StreamingStats$$latencyHistogram_$eq(new KllFloatsSketch());
            this.startMs_$eq(now);
        } else {
            this.logger().info("No writes registered");
        }
    }

    public void increment(KVStore.PutRequest putRequest) {
        putRequest.tsMillis().foreach((Function1)new Serializable(this){
            public static final long serialVersionUID = 0L;
            private final /* synthetic */ StreamingStats $outer;

            public final void apply(long queryTime) {
                this.apply$mcVJ$sp(queryTime);
            }

            public void apply$mcVJ$sp(long queryTime) {
                long latency = System.currentTimeMillis() - queryTime;
                this.$outer.ai$chronon$spark$streaming$StreamingStats$$latencyMsTotal_$eq(this.$outer.ai$chronon$spark$streaming$StreamingStats$$latencyMsTotal() + latency);
                this.$outer.ai$chronon$spark$streaming$StreamingStats$$latencyHistogram().update((float)latency);
            }
            {
                if ($outer == null) {
                    throw null;
                }
                this.$outer = $outer;
            }
        });
        this.writesTotal_$eq(this.writesTotal() + 1L);
        if (putRequest.keyBytes() != null) {
            this.keyBytesTotal_$eq(this.keyBytesTotal() + (long)putRequest.keyBytes().length);
        }
        if (putRequest.valueBytes() != null) {
            this.valueBytesTotal_$eq(this.valueBytesTotal() + (long)putRequest.valueBytes().length);
        }
        if (System.currentTimeMillis() - this.startMs() > (long)this.publishDelaySeconds() * 1000L) {
            this.printStatus();
        }
    }

    private final Function1 readable$1() {
        return new Serializable(this){
            public static final long serialVersionUID = 0L;

            public final String apply(long x) {
                return FileUtils.byteCountToDisplaySize((long)x);
            }
        };
    }

    public StreamingStats(int publishDelaySeconds) {
        this.publishDelaySeconds = publishDelaySeconds;
        this.ai$chronon$spark$streaming$StreamingStats$$latencyHistogram = new KllFloatsSketch();
        this.ai$chronon$spark$streaming$StreamingStats$$latencyMsTotal = 0L;
        this.writesTotal = 0L;
        this.keyBytesTotal = 0L;
        this.valueBytesTotal = 0L;
        this.startMs = System.currentTimeMillis();
        this.utc = DateTimeFormatter.ISO_LOCAL_DATE_TIME.withZone(ZoneId.from(ZoneOffset.UTC));
    }
}

