/*
 * Decompiled with CFR 0.152.
 */
package ai.starlake.job.sink.http;

import ai.starlake.job.sink.http.SinkTransformer;
import ai.starlake.utils.Utils$;
import com.typesafe.scalalogging.Logger;
import com.typesafe.scalalogging.StrictLogging;
import java.io.Serializable;
import java.util.Iterator;
import org.apache.http.HttpEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpUriRequest;
import org.apache.http.conn.HttpClientConnectionManager;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.apache.http.util.EntityUtils;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import scala.Function0;
import scala.Function1;
import scala.Option$;
import scala.Predef$;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.ArrayBuffer;
import scala.collection.mutable.ArrayBuffer$;
import scala.collection.mutable.ArrayOps;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichInt$;

@ScalaSignature(bytes="\u0006\u0001\u0005\u0015a!\u0002\u0006\f\u0001-)\u0002\u0002\u0003\u0014\u0001\u0005\u0003\u0005\u000b\u0011\u0002\u0015\t\u0011M\u0002!\u0011!Q\u0001\nQB\u0001b\u000e\u0001\u0003\u0002\u0003\u0006I\u0001\u000f\u0005\u0006y\u0001!\t!\u0010\u0005\b\u0005\u0002\u0011\r\u0011\"\u0001D\u0011\u0019\t\u0006\u0001)A\u0005\t\")!\u000b\u0001C\u0005'\")\u0011\f\u0001C\u00015\")\u0001\u000f\u0001C\u0005c\nq\u0001\n\u001e;q'&t7n\u00117jK:$(B\u0001\u0007\u000e\u0003\u0011AG\u000f\u001e9\u000b\u00059y\u0011\u0001B:j].T!\u0001E\t\u0002\u0007)|'M\u0003\u0002\u0013'\u0005A1\u000f^1sY\u0006\\WMC\u0001\u0015\u0003\t\t\u0017nE\u0002\u0001-q\u0001\"a\u0006\u000e\u000e\u0003aQ\u0011!G\u0001\u0006g\u000e\fG.Y\u0005\u00037a\u0011a!\u00118z%\u00164\u0007CA\u000f%\u001b\u0005q\"BA\u0010!\u00031\u00198-\u00197bY><w-\u001b8h\u0015\t\t#%\u0001\u0005usB,7/\u00194f\u0015\u0005\u0019\u0013aA2p[&\u0011QE\b\u0002\u000e'R\u0014\u0018n\u0019;M_\u001e<\u0017N\\4\u0002\u0007U\u0014Hn\u0001\u0001\u0011\u0005%\u0002dB\u0001\u0016/!\tY\u0003$D\u0001-\u0015\tis%\u0001\u0004=e>|GOP\u0005\u0003_a\ta\u0001\u0015:fI\u00164\u0017BA\u00193\u0005\u0019\u0019FO]5oO*\u0011q\u0006G\u0001\f[\u0006DX*Z:tC\u001e,7\u000f\u0005\u0002\u0018k%\u0011a\u0007\u0007\u0002\u0004\u0013:$\u0018a\u0003;sC:\u001chm\u001c:nKJ\u0004\"!\u000f\u001e\u000e\u0003-I!aO\u0006\u0003\u001fMKgn\u001b+sC:\u001chm\u001c:nKJ\fa\u0001P5oSRtD\u0003\u0002 @\u0001\u0006\u0003\"!\u000f\u0001\t\u000b\u0019\"\u0001\u0019\u0001\u0015\t\u000bM\"\u0001\u0019\u0001\u001b\t\u000b]\"\u0001\u0019\u0001\u001d\u0002#\r|gN\\3di&|g.T1oC\u001e,'/F\u0001E!\t)u*D\u0001G\u0015\t9\u0005*\u0001\u0003d_:t'BA%K\u0003\u0011IW\u000e\u001d7\u000b\u00051Y%B\u0001'N\u0003\u0019\t\u0007/Y2iK*\ta*A\u0002pe\u001eL!\u0001\u0015$\u0003EA{w\u000e\\5oO\"#H\u000f]\"mS\u0016tGoQ8o]\u0016\u001cG/[8o\u001b\u0006t\u0017mZ3s\u0003I\u0019wN\u001c8fGRLwN\\'b]\u0006<WM\u001d\u0011\u0002\r\rd\u0017.\u001a8u+\u0005!\u0006CA+X\u001b\u00051&B\u0001*I\u0013\tAfKA\nDY>\u001cX-\u00192mK\"#H\u000f]\"mS\u0016tG/\u0001\u0003tK:$GC\u0001\u001b\\\u0011\u0015a\u0006\u00021\u0001^\u0003%!\u0017\r^1Ge\u0006lW\r\u0005\u0002_[:\u0011qL\u001b\b\u0003A\u001et!!Y3\u000f\u0005\t$gBA\u0016d\u0013\u0005q\u0015B\u0001'N\u0013\t17*A\u0003ta\u0006\u00148.\u0003\u0002iS\u0006\u00191/\u001d7\u000b\u0005\u0019\\\u0015BA6m\u0003\u001d\u0001\u0018mY6bO\u0016T!\u0001[5\n\u00059|'!\u0003#bi\u00064%/Y7f\u0015\tYG.\u0001\u0003q_N$HC\u0001:v!\t92/\u0003\u0002u1\t!QK\\5u\u0011\u00151\u0018\u00021\u0001x\u0003\u0011\u0011xn^:\u0011\u0007]A(0\u0003\u0002z1\t)\u0011I\u001d:bsB\u00191p \u0015\u000f\u0005qthBA\u0016~\u0013\u0005I\u0012BA6\u0019\u0013\u0011\t\t!a\u0001\u0003\u0007M+\u0017O\u0003\u0002l1\u0001")
public class HttpSinkClient
implements StrictLogging {
    private final String url;
    private final int maxMessages;
    private final SinkTransformer transformer;
    private final PoolingHttpClientConnectionManager connectionManager;
    private final Logger logger;

    public Logger logger() {
        return this.logger;
    }

    public void com$typesafe$scalalogging$StrictLogging$_setter_$logger_$eq(Logger x$1) {
        this.logger = x$1;
    }

    public PoolingHttpClientConnectionManager connectionManager() {
        return this.connectionManager;
    }

    private CloseableHttpClient client() {
        return HttpClients.custom().setConnectionManager((HttpClientConnectionManager)this.connectionManager()).build();
    }

    public int send(Dataset<Row> dataFrame) {
        int count;
        block1: {
            Iterator iter = dataFrame.toLocalIterator();
            ArrayBuffer buffer = (ArrayBuffer)ArrayBuffer$.MODULE$.apply((Seq)Nil$.MODULE$);
            count = 0;
            while (iter.hasNext()) {
                Seq row;
                Seq transformed = row = (Seq)((Row)iter.next()).toSeq().map((Function1 & Serializable & scala.Serializable)x$1 -> Option$.MODULE$.apply(x$1).getOrElse((Function0 & Serializable & scala.Serializable)() -> "").toString(), Seq$.MODULE$.canBuildFrom());
                buffer.$plus$eq((Object)transformed);
                if (++count % this.maxMessages != 0) continue;
                this.post((Seq[])buffer.toArray(ClassTag$.MODULE$.apply(Seq.class)));
                buffer.clear();
            }
            if (!buffer.nonEmpty()) break block1;
            this.post((Seq[])buffer.toArray(ClassTag$.MODULE$.apply(Seq.class)));
        }
        return count;
    }

    private void post(Seq<String>[] rows) {
        BoxedUnit boxedUnit;
        if (this.logger().underlying().isDebugEnabled()) {
            this.logger().underlying().debug("request: {}", new Object[]{new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[])rows)).mkString("Array(", ", ", ")")});
            boxedUnit = BoxedUnit.UNIT;
        } else {
            boxedUnit = BoxedUnit.UNIT;
        }
        this.transformer.requestUris(this.url, rows).foreach((Function1 & Serializable & scala.Serializable)requestUri -> BoxesRunTime.boxToInteger((int)HttpSinkClient.$anonfun$post$1(this, requestUri)));
    }

    public static final /* synthetic */ int $anonfun$post$3(HttpSinkClient $this, CloseableHttpResponse response) {
        BoxedUnit boxedUnit;
        boolean ok = RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(200), 299).contains(response.getStatusLine().getStatusCode());
        if (!ok) {
            throw new RuntimeException(response.getStatusLine().getReasonPhrase());
        }
        String responseBody = EntityUtils.toString((HttpEntity)response.getEntity(), (String)"UTF-8");
        if ($this.logger().underlying().isDebugEnabled()) {
            $this.logger().underlying().debug(new StringBuilder(25).append("Response from HTTP Sink: ").append(responseBody).toString());
            boxedUnit = BoxedUnit.UNIT;
        } else {
            boxedUnit = BoxedUnit.UNIT;
        }
        return response.getStatusLine().getStatusCode();
    }

    public static final /* synthetic */ int $anonfun$post$1(HttpSinkClient $this, HttpUriRequest requestUri) {
        return BoxesRunTime.unboxToInt(Utils$.MODULE$.withResources((Function0 & Serializable & scala.Serializable)() -> $this.client().execute(requestUri), (Function1 & Serializable & scala.Serializable)response -> BoxesRunTime.boxToInteger((int)HttpSinkClient.$anonfun$post$3($this, response))));
    }

    public HttpSinkClient(String url, int maxMessages, SinkTransformer transformer) {
        this.url = url;
        this.maxMessages = maxMessages;
        this.transformer = transformer;
        StrictLogging.$init$((StrictLogging)this);
        this.connectionManager = new PoolingHttpClientConnectionManager();
        this.connectionManager().setMaxTotal(200);
        this.connectionManager().setDefaultMaxPerRoute(20);
    }
}

