/*
 * Decompiled with CFR 0.152.
 */
package ai.tripl.arc.load;

import ai.tripl.arc.api.API;
import ai.tripl.arc.load.HTTPLoad;
import ai.tripl.arc.load.HTTPLoadStage;
import ai.tripl.arc.load.HTTPLoadStage$;
import ai.tripl.arc.util.DetailException;
import ai.tripl.arc.util.log.logger.Logger;
import java.io.InputStream;
import java.io.Serializable;
import java.lang.invoke.MethodHandle;
import java.lang.invoke.SerializedLambda;
import java.net.URI;
import org.apache.http.HttpEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpUriRequest;
import org.apache.http.conn.HttpClientConnectionManager;
import org.apache.http.entity.ByteArrayEntity;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.ForeachWriter;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.BinaryType;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.NullType$;
import org.apache.spark.sql.types.StringType;
import org.apache.spark.sql.types.StringType$;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Option$;
import scala.Some;
import scala.Tuple2;
import scala.Tuple9;
import scala.collection.BufferedIterator;
import scala.collection.immutable.List;
import scala.collection.immutable.Map;
import scala.io.Codec$;
import scala.io.Source$;
import scala.reflect.api.JavaUniverse;
import scala.reflect.api.Mirror;
import scala.reflect.api.TypeCreator;
import scala.reflect.api.TypeTags;
import scala.reflect.api.Types;
import scala.reflect.api.Universe;
import scala.reflect.runtime.package$;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.LambdaDeserialize;

public final class HTTPLoadStage$
implements scala.Serializable {
    public static HTTPLoadStage$ MODULE$;

    static {
        new HTTPLoadStage$();
    }

    public Option<Dataset<Row>> execute(HTTPLoadStage stage, SparkSession spark, Logger logger, API.ARCContext arcContext) {
        None$ none$;
        List<Object> stageValidStatusCodes;
        Map<String, String> stageHeaders;
        URI stageOutputURI;
        Dataset df;
        block8: {
            String signature;
            block7: {
                signature = "HTTPLoad requires inputView to be dataset with [value: string] signature.";
                df = spark.table(stage.inputView());
                stageOutputURI = stage.outputURI();
                stageHeaders = stage.headers();
                stageValidStatusCodes = stage.validStatusCodes();
                if (df.schema().length() != 1) break block7;
                DataType dataType = df.schema().apply(0).dataType();
                StringType$ stringType$ = StringType$.MODULE$;
                if (!(dataType == null ? stringType$ != null : !dataType.equals(stringType$))) break block8;
            }
            throw new DetailException(signature, stage, df){
                private final scala.collection.mutable.Map<String, Object> detail;

                public scala.collection.mutable.Map<String, Object> detail() {
                    return this.detail;
                }
                {
                    this.detail = stage$1.stageDetail();
                }
            };
        }
        try {
            if (arcContext.isStreaming()) {
                df.writeStream().foreach((ForeachWriter)new ForeachWriter<Row>(stageOutputURI, stageHeaders, stageValidStatusCodes){
                    private PoolingHttpClientConnectionManager poolingHttpClientConnectionManager;
                    private CloseableHttpClient httpClient;
                    private final String uri;
                    private final Map stageHeaders$1;
                    private final List stageValidStatusCodes$1;

                    private PoolingHttpClientConnectionManager poolingHttpClientConnectionManager() {
                        return this.poolingHttpClientConnectionManager;
                    }

                    private void poolingHttpClientConnectionManager_$eq(PoolingHttpClientConnectionManager x$1) {
                        this.poolingHttpClientConnectionManager = x$1;
                    }

                    private CloseableHttpClient httpClient() {
                        return this.httpClient;
                    }

                    private void httpClient_$eq(CloseableHttpClient x$1) {
                        this.httpClient = x$1;
                    }

                    private String uri() {
                        return this.uri;
                    }

                    public boolean open(long partitionId, long epochId) {
                        this.poolingHttpClientConnectionManager_$eq(new PoolingHttpClientConnectionManager());
                        this.httpClient_$eq(HttpClients.custom().setConnectionManager((HttpClientConnectionManager)this.poolingHttpClientConnectionManager()).build());
                        return true;
                    }

                    public void process(Row row) {
                        HttpPost post = new HttpPost(this.uri());
                        this.stageHeaders$1.withFilter((Function1 & Serializable & scala.Serializable)check$ifrefutable$1 -> BoxesRunTime.boxToBoolean((boolean)anon.2.$anonfun$process$1(check$ifrefutable$1))).foreach((Function1 & Serializable & scala.Serializable)x$16 -> {
                            anon.2.$anonfun$process$2(post, x$16);
                            return BoxedUnit.UNIT;
                        });
                        post.setEntity((HttpEntity)new StringEntity(row.getString(0)));
                        CloseableHttpResponse response = this.httpClient().execute((HttpUriRequest)post);
                        if (!this.stageValidStatusCodes$1.contains((Object)BoxesRunTime.boxToInteger((int)response.getStatusLine().getStatusCode()))) {
                            throw new Exception(new StringBuilder(80).append("HTTPLoad expects all response StatusCode(s) in [").append(this.stageValidStatusCodes$1.mkString(", ")).append("] but server responded with ").append(response.getStatusLine().getStatusCode()).append(" (").append(response.getStatusLine().getReasonPhrase()).append(").").toString());
                        }
                        response.close();
                        post.releaseConnection();
                    }

                    public void close(Throwable errorOrNull) {
                        this.httpClient().close();
                        this.poolingHttpClientConnectionManager().close();
                        Throwable throwable = errorOrNull;
                        if (throwable == null) {
                            return;
                        }
                        throw new Exception(errorOrNull);
                    }

                    public static final /* synthetic */ boolean $anonfun$process$1(Tuple2 check$ifrefutable$1) {
                        Tuple2 tuple2 = check$ifrefutable$1;
                        return tuple2 != null;
                    }

                    public static final /* synthetic */ void $anonfun$process$2(HttpPost post$1, Tuple2 x$16) {
                        Tuple2 tuple2 = x$16;
                        if (tuple2 != null) {
                            String k = (String)tuple2._1();
                            String v = (String)tuple2._2();
                            post$1.addHeader(k, v);
                            return;
                        }
                        throw new MatchError((Object)tuple2);
                    }
                    {
                        this.stageHeaders$1 = stageHeaders$1;
                        this.stageValidStatusCodes$1 = stageValidStatusCodes$1;
                        this.uri = stageOutputURI$1.toString();
                    }

                    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
                        return LambdaDeserialize.bootstrap("lambdaDeserialize", new MethodHandle[]{$anonfun$process$1$adapted(scala.Tuple2 ), $anonfun$process$2$adapted(org.apache.http.client.methods.HttpPost scala.Tuple2 )}, serializedLambda);
                    }
                }).start();
                none$ = None$.MODULE$;
            } else {
                public final class Ai_tripl_arc_load_HTTPLoadStage$$typecreator5$1
                extends TypeCreator {
                    public <U extends Universe> Types.TypeApi apply(Mirror<U> $m$untyped) {
                        Universe $u = $m$untyped.universe();
                        Mirror<U> $m = $m$untyped;
                        return $m.staticClass("ai.tripl.arc.load.HTTPLoadStage.Response").asType().toTypeConstructor();
                    }

                    public Ai_tripl_arc_load_HTTPLoadStage$$typecreator5$1() {
                    }
                }
                JavaUniverse $u = package$.MODULE$.universe();
                JavaUniverse.JavaMirror $m = package$.MODULE$.universe().runtimeMirror(this.getClass().getClassLoader());
                Dataset writtenDS = df.mapPartitions((Function1 & Serializable & scala.Serializable)partition -> {
                    NullType$ nullType$;
                    int n;
                    PoolingHttpClientConnectionManager poolingHttpClientConnectionManager = new PoolingHttpClientConnectionManager();
                    poolingHttpClientConnectionManager.setMaxTotal(50);
                    CloseableHttpClient httpClient = HttpClients.custom().setConnectionManager((HttpClientConnectionManager)poolingHttpClientConnectionManager).build();
                    String uri = stageOutputURI.toString();
                    BufferedIterator bufferedPartition = partition.buffered();
                    boolean bl = bufferedPartition.hasNext();
                    if (bl) {
                        n = ((Row)bufferedPartition.head()).fieldIndex("value");
                    } else if (!bl) {
                        n = 0;
                    } else {
                        throw new MatchError((Object)BoxesRunTime.boxToBoolean((boolean)bl));
                    }
                    int fieldIndex = n;
                    boolean bl2 = bufferedPartition.hasNext();
                    if (bl2) {
                        nullType$ = ((Row)bufferedPartition.head()).schema().apply(fieldIndex).dataType();
                    } else if (!bl2) {
                        nullType$ = NullType$.MODULE$;
                    } else {
                        throw new MatchError((Object)BoxesRunTime.boxToBoolean((boolean)bl2));
                    }
                    NullType$ dataType = nullType$;
                    return bufferedPartition.map(arg_0 -> HTTPLoadStage$.$anonfun$execute$2(uri, stageHeaders, (DataType)dataType, fieldIndex, httpClient, stageValidStatusCodes, poolingHttpClientConnectionManager, arg_0));
                }, spark.implicits().newProductEncoder(((TypeTags)$u).TypeTag().apply((Mirror)$m, (TypeCreator)new Ai_tripl_arc_load_HTTPLoadStage$$typecreator5$1())));
                none$ = Option$.MODULE$.apply((Object)writtenDS.toDF());
            }
        }
        catch (Exception e) {
            throw new DetailException(e, stage){
                private final scala.collection.mutable.Map<String, Object> detail;

                public scala.collection.mutable.Map<String, Object> detail() {
                    return this.detail;
                }
                {
                    this.detail = stage$1.stageDetail();
                }
            };
        }
        None$ responses = none$;
        return responses;
    }

    public HTTPLoadStage apply(HTTPLoad plugin, Option<String> id, String name, Option<String> description, String inputView, URI outputURI, Map<String, String> headers, List<Object> validStatusCodes, Map<String, String> params) {
        return new HTTPLoadStage(plugin, id, name, description, inputView, outputURI, headers, validStatusCodes, params);
    }

    public Option<Tuple9<HTTPLoad, Option<String>, String, Option<String>, String, URI, Map<String, String>, List<Object>, Map<String, String>>> unapply(HTTPLoadStage x$0) {
        if (x$0 == null) {
            return None$.MODULE$;
        }
        return new Some((Object)new Tuple9((Object)x$0.plugin(), x$0.id(), (Object)x$0.name(), x$0.description(), (Object)x$0.inputView(), (Object)x$0.outputURI(), x$0.headers(), x$0.validStatusCodes(), x$0.params()));
    }

    private Object readResolve() {
        return MODULE$;
    }

    public static final /* synthetic */ boolean $anonfun$execute$3(Tuple2 check$ifrefutable$2) {
        Tuple2 tuple2 = check$ifrefutable$2;
        return tuple2 != null;
    }

    public static final /* synthetic */ void $anonfun$execute$4(HttpPost post$2, Tuple2 x$17) {
        Tuple2 tuple2 = x$17;
        if (tuple2 != null) {
            String k = (String)tuple2._1();
            String v = (String)tuple2._2();
            post$2.addHeader(k, v);
            return;
        }
        throw new MatchError((Object)tuple2);
    }

    public static final /* synthetic */ HTTPLoadStage.Response $anonfun$execute$2(String uri$1, Map stageHeaders$1, DataType dataType$1, int fieldIndex$1, CloseableHttpClient httpClient$1, List stageValidStatusCodes$1, PoolingHttpClientConnectionManager poolingHttpClientConnectionManager$1, Row row) {
        HTTPLoadStage.Response response;
        StringEntity stringEntity;
        HttpPost post = new HttpPost(uri$1);
        stageHeaders$1.withFilter((Function1 & Serializable & scala.Serializable)check$ifrefutable$2 -> BoxesRunTime.boxToBoolean((boolean)HTTPLoadStage$.$anonfun$execute$3(check$ifrefutable$2))).foreach((Function1 & Serializable & scala.Serializable)x$17 -> {
            HTTPLoadStage$.$anonfun$execute$4(post, x$17);
            return BoxedUnit.UNIT;
        });
        DataType dataType = dataType$1;
        if (dataType instanceof StringType) {
            stringEntity = new StringEntity(row.getString(fieldIndex$1));
        } else if (dataType instanceof BinaryType) {
            stringEntity = new ByteArrayEntity((byte[])row.get(fieldIndex$1));
        } else {
            throw new MatchError((Object)dataType);
        }
        StringEntity entity = stringEntity;
        post.setEntity((HttpEntity)entity);
        try {
            CloseableHttpResponse response2 = httpClient$1.execute((HttpUriRequest)post);
            if (!stageValidStatusCodes$1.contains((Object)BoxesRunTime.boxToInteger((int)response2.getStatusLine().getStatusCode()))) {
                throw new Exception(new StringBuilder(80).append("HTTPLoad expects all response StatusCode(s) in [").append(stageValidStatusCodes$1.mkString(", ")).append("] but server responded with ").append(response2.getStatusLine().getStatusCode()).append(" (").append(response2.getStatusLine().getReasonPhrase()).append(").").toString());
            }
            InputStream responseEntity = response2.getEntity().getContent();
            String body = Source$.MODULE$.fromInputStream(responseEntity, Codec$.MODULE$.fallbackSystemCodec()).mkString();
            response2.close();
            response = new HTTPLoadStage.Response(response2.getStatusLine().getStatusCode(), response2.getStatusLine().getReasonPhrase(), body);
        }
        finally {
            post.releaseConnection();
            poolingHttpClientConnectionManager$1.close();
        }
        return response;
    }

    private HTTPLoadStage$() {
        MODULE$ = this;
    }
}

