/*
 * Decompiled with CFR 0.152.
 */
package ai.starlake.job.sink.http;

import ai.starlake.job.sink.http.DefaultSinkTransformer$;
import ai.starlake.job.sink.http.SinkTransformer;
import ai.starlake.utils.Utils$;
import com.typesafe.scalalogging.Logger;
import com.typesafe.scalalogging.StrictLogging;
import java.io.Serializable;
import java.util.Iterator;
import java.util.concurrent.TimeUnit;
import org.apache.http.HttpEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpUriRequest;
import org.apache.http.conn.HttpClientConnectionManager;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.apache.http.util.EntityUtils;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import scala.Function0;
import scala.Function1;
import scala.Option$;
import scala.Predef$;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.immutable.Map;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.StringOps;
import scala.collection.mutable.ArrayBuffer;
import scala.collection.mutable.ArrayBuffer$;
import scala.collection.mutable.ArrayOps;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichInt$;

@ScalaSignature(bytes="\u0006\u0001\u0005Ub!\u0002\u000b\u0016\u0001Uy\u0002\u0002\u0003\u0019\u0001\u0005\u0003\u0005\u000b\u0011\u0002\u001a\t\u000b\u0001\u0003A\u0011A!\t\u000f\u0015\u0003!\u0019!C\u0001\r\"1q\t\u0001Q\u0001\nuBq\u0001\u0013\u0001C\u0002\u0013\u0005\u0011\n\u0003\u0004N\u0001\u0001\u0006IA\u0013\u0005\b\u001d\u0002\u0011\r\u0011\"\u0001J\u0011\u0019y\u0005\u0001)A\u0005\u0015\"9\u0001\u000b\u0001b\u0001\n\u0003I\u0005BB)\u0001A\u0003%!\nC\u0004S\u0001\t\u0007I\u0011A%\t\rM\u0003\u0001\u0015!\u0003K\u0011\u001d!\u0006A1A\u0005\u0002UCa!\u0017\u0001!\u0002\u00131\u0006b\u0002.\u0001\u0005\u0004%\ta\u0017\u0005\u0007S\u0002\u0001\u000b\u0011\u0002/\t\u000b)\u0004A\u0011B6\t\u000bE\u0004A\u0011\u0001:\t\u000f\u0005E\u0001\u0001\"\u0003\u0002\u0014\tq\u0001\n\u001e;q'&t7n\u00117jK:$(B\u0001\f\u0018\u0003\u0011AG\u000f\u001e9\u000b\u0005aI\u0012\u0001B:j].T!AG\u000e\u0002\u0007)|'M\u0003\u0002\u001d;\u0005A1\u000f^1sY\u0006\\WMC\u0001\u001f\u0003\t\t\u0017nE\u0002\u0001A\u0019\u0002\"!\t\u0013\u000e\u0003\tR\u0011aI\u0001\u0006g\u000e\fG.Y\u0005\u0003K\t\u0012a!\u00118z%\u00164\u0007CA\u0014/\u001b\u0005A#BA\u0015+\u00031\u00198-\u00197bY><w-\u001b8h\u0015\tYC&\u0001\u0005usB,7/\u00194f\u0015\u0005i\u0013aA2p[&\u0011q\u0006\u000b\u0002\u000e'R\u0014\u0018n\u0019;M_\u001e<\u0017N\\4\u0002\u0015A\f'/Y7fi\u0016\u00148o\u0001\u0001\u0011\tMRT(\u0010\b\u0003ia\u0002\"!\u000e\u0012\u000e\u0003YR!aN\u0019\u0002\rq\u0012xn\u001c;?\u0013\tI$%\u0001\u0004Qe\u0016$WMZ\u0005\u0003wq\u00121!T1q\u0015\tI$\u0005\u0005\u00024}%\u0011q\b\u0010\u0002\u0007'R\u0014\u0018N\\4\u0002\rqJg.\u001b;?)\t\u0011E\t\u0005\u0002D\u00015\tQ\u0003C\u00031\u0005\u0001\u0007!'A\u0002ve2,\u0012!P\u0001\u0005kJd\u0007%A\u0006nCblUm]:bO\u0016\u001cX#\u0001&\u0011\u0005\u0005Z\u0015B\u0001'#\u0005\rIe\u000e^\u0001\r[\u0006DX*Z:tC\u001e,7\u000fI\u0001\u0010[\u0006DHk\u001c;bYRC'/Z1eg\u0006\u0001R.\u0019=U_R\fG\u000e\u00165sK\u0006$7\u000fI\u0001\u0013[\u0006DH\u000b\u001b:fC\u0012\u001c\b+\u001a:S_V$X-A\nnCb$\u0006N]3bIN\u0004VM\u001d*pkR,\u0007%\u0001\u000bnCbLE\r\\3US6,\u0017J\\*fG>tGm]\u0001\u0016[\u0006D\u0018\n\u001a7f)&lW-\u00138TK\u000e|g\u000eZ:!\u0003-!(/\u00198tM>\u0014X.\u001a:\u0016\u0003Y\u0003\"aQ,\n\u0005a+\"aD*j].$&/\u00198tM>\u0014X.\u001a:\u0002\u0019Q\u0014\u0018M\\:g_JlWM\u001d\u0011\u0002#\r|gN\\3di&|g.T1oC\u001e,'/F\u0001]!\tiv-D\u0001_\u0015\ty\u0006-\u0001\u0003d_:t'BA1c\u0003\u0011IW\u000e\u001d7\u000b\u0005Y\u0019'B\u00013f\u0003\u0019\t\u0007/Y2iK*\ta-A\u0002pe\u001eL!\u0001\u001b0\u0003EA{w\u000e\\5oO\"#H\u000f]\"mS\u0016tGoQ8o]\u0016\u001cG/[8o\u001b\u0006t\u0017mZ3s\u0003I\u0019wN\u001c8fGRLwN\\'b]\u0006<WM\u001d\u0011\u0002\r\rd\u0017.\u001a8u+\u0005a\u0007CA7p\u001b\u0005q'B\u00016a\u0013\t\u0001hNA\nDY>\u001cX-\u00192mK\"#H\u000f]\"mS\u0016tG/\u0001\u0003tK:$GC\u0001&t\u0011\u0015!(\u00031\u0001v\u0003%!\u0017\r^1Ge\u0006lW\rE\u0002w\u0003\u0017q1a^A\u0003\u001d\tAxP\u0004\u0002z{:\u0011!\u0010 \b\u0003kmL\u0011AZ\u0005\u0003I\u0016L!A`2\u0002\u000bM\u0004\u0018M]6\n\t\u0005\u0005\u00111A\u0001\u0004gFd'B\u0001@d\u0013\u0011\t9!!\u0003\u0002\u000fA\f7m[1hK*!\u0011\u0011AA\u0002\u0013\u0011\ti!a\u0004\u0003\u0013\u0011\u000bG/\u0019$sC6,'\u0002BA\u0004\u0003\u0013\tA\u0001]8tiR!\u0011QCA\u000e!\r\t\u0013qC\u0005\u0004\u00033\u0011#\u0001B+oSRDq!!\b\u0014\u0001\u0004\ty\"\u0001\u0003s_^\u001c\b#B\u0011\u0002\"\u0005\u0015\u0012bAA\u0012E\t)\u0011I\u001d:bsB)\u0011qEA\u0018{9!\u0011\u0011FA\u0017\u001d\r)\u00141F\u0005\u0002G%\u0019\u0011q\u0001\u0012\n\t\u0005E\u00121\u0007\u0002\u0004'\u0016\f(bAA\u0004E\u0001")
public class HttpSinkClient
implements StrictLogging {
    private final String url;
    private final int maxMessages;
    private final int maxTotalThreads;
    private final int maxThreadsPerRoute;
    private final int maxIdleTimeInSeconds;
    private final SinkTransformer transformer;
    private final PoolingHttpClientConnectionManager connectionManager;
    private final Logger logger;

    public Logger logger() {
        return this.logger;
    }

    public void com$typesafe$scalalogging$StrictLogging$_setter_$logger_$eq(Logger x$1) {
        this.logger = x$1;
    }

    public String url() {
        return this.url;
    }

    public int maxMessages() {
        return this.maxMessages;
    }

    public int maxTotalThreads() {
        return this.maxTotalThreads;
    }

    public int maxThreadsPerRoute() {
        return this.maxThreadsPerRoute;
    }

    public int maxIdleTimeInSeconds() {
        return this.maxIdleTimeInSeconds;
    }

    public SinkTransformer transformer() {
        return this.transformer;
    }

    public PoolingHttpClientConnectionManager connectionManager() {
        return this.connectionManager;
    }

    private CloseableHttpClient client() {
        return HttpClients.custom().setConnectionManager((HttpClientConnectionManager)this.connectionManager()).evictExpiredConnections().evictIdleConnections((long)this.maxIdleTimeInSeconds(), TimeUnit.SECONDS).build();
    }

    public int send(Dataset<Row> dataFrame) {
        int count;
        block1: {
            Iterator iter = dataFrame.toLocalIterator();
            ArrayBuffer buffer = (ArrayBuffer)ArrayBuffer$.MODULE$.apply((Seq)Nil$.MODULE$);
            count = 0;
            while (iter.hasNext()) {
                Seq row;
                Seq transformed = row = (Seq)((Row)iter.next()).toSeq().map((Function1 & Serializable & scala.Serializable)x$1 -> Option$.MODULE$.apply(x$1).getOrElse((Function0 & Serializable & scala.Serializable)() -> "").toString(), Seq$.MODULE$.canBuildFrom());
                buffer.$plus$eq((Object)transformed);
                if (++count % this.maxMessages() != 0) continue;
                this.post((Seq[])buffer.toArray(ClassTag$.MODULE$.apply(Seq.class)));
                buffer.clear();
            }
            if (!buffer.nonEmpty()) break block1;
            this.post((Seq[])buffer.toArray(ClassTag$.MODULE$.apply(Seq.class)));
        }
        return count;
    }

    private void post(Seq<String>[] rows) {
        BoxedUnit boxedUnit;
        if (this.logger().underlying().isDebugEnabled()) {
            this.logger().underlying().debug("request: {}", new Object[]{new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[])rows)).mkString("Array(", ", ", ")")});
            boxedUnit = BoxedUnit.UNIT;
        } else {
            boxedUnit = BoxedUnit.UNIT;
        }
        this.transformer().requestUris(this.url(), rows).foreach((Function1 & Serializable & scala.Serializable)requestUri -> BoxesRunTime.boxToInteger((int)HttpSinkClient.$anonfun$post$1(this, requestUri)));
    }

    public static final /* synthetic */ int $anonfun$post$3(HttpSinkClient $this, CloseableHttpResponse response) {
        int n;
        try {
            BoxedUnit boxedUnit;
            boolean ok = RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(200), 299).contains(response.getStatusLine().getStatusCode());
            if (!ok) {
                throw new RuntimeException(response.getStatusLine().getReasonPhrase());
            }
            String responseBody = EntityUtils.toString((HttpEntity)response.getEntity(), (String)"UTF-8");
            if ($this.logger().underlying().isDebugEnabled()) {
                $this.logger().underlying().debug(new StringBuilder(25).append("Response from HTTP Sink: ").append(responseBody).toString());
                boxedUnit = BoxedUnit.UNIT;
            } else {
                boxedUnit = BoxedUnit.UNIT;
            }
            n = response.getStatusLine().getStatusCode();
        }
        finally {
            response.close();
        }
        return n;
    }

    public static final /* synthetic */ int $anonfun$post$1(HttpSinkClient $this, HttpUriRequest requestUri) {
        return BoxesRunTime.unboxToInt(Utils$.MODULE$.withResources((Function0 & Serializable & scala.Serializable)() -> $this.client().execute(requestUri), (Function1 & Serializable & scala.Serializable)response -> BoxesRunTime.boxToInteger((int)HttpSinkClient.$anonfun$post$3($this, response))));
    }

    public HttpSinkClient(Map<String, String> parameters) {
        StrictLogging.$init$((StrictLogging)this);
        this.url = (String)parameters.apply((Object)"url");
        this.maxMessages = new StringOps(Predef$.MODULE$.augmentString((String)parameters.getOrElse((Object)"maxMessages", (Function0 & Serializable & scala.Serializable)() -> "1"))).toInt();
        this.maxTotalThreads = new StringOps(Predef$.MODULE$.augmentString((String)parameters.getOrElse((Object)"maxTotalThreads", (Function0 & Serializable & scala.Serializable)() -> "200"))).toInt();
        this.maxThreadsPerRoute = new StringOps(Predef$.MODULE$.augmentString((String)parameters.getOrElse((Object)"maxThreadsPerRoute", (Function0 & Serializable & scala.Serializable)() -> "20"))).toInt();
        this.maxIdleTimeInSeconds = new StringOps(Predef$.MODULE$.augmentString((String)parameters.getOrElse((Object)"maxIdleTimeInSeconds", (Function0 & Serializable & scala.Serializable)() -> "10"))).toInt();
        this.transformer = (SinkTransformer)parameters.get((Object)"transformer").map((Function1 & Serializable & scala.Serializable)objectName -> (SinkTransformer)Utils$.MODULE$.loadInstance((String)objectName)).getOrElse((Function0 & Serializable & scala.Serializable)() -> DefaultSinkTransformer$.MODULE$);
        this.connectionManager = new PoolingHttpClientConnectionManager();
        this.connectionManager().setMaxTotal(this.maxTotalThreads());
        this.connectionManager().setDefaultMaxPerRoute(this.maxThreadsPerRoute());
    }
}

