/*
 * Decompiled with CFR 0.152.
 */
package ai.tripl.arc.transform;

import ai.tripl.arc.api.API;
import ai.tripl.arc.api.API$FailMode$FailFast$;
import ai.tripl.arc.api.API$FailMode$Permissive$;
import ai.tripl.arc.transform.HTTPTransform;
import ai.tripl.arc.transform.HTTPTransformStage;
import ai.tripl.arc.util.DetailException;
import ai.tripl.arc.util.log.logger.Logger;
import java.io.InputStream;
import java.io.Serializable;
import java.net.URI;
import org.apache.http.HttpEntity;
import org.apache.http.client.RedirectStrategy;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpUriRequest;
import org.apache.http.conn.HttpClientConnectionManager;
import org.apache.http.entity.ByteArrayEntity;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.impl.client.LaxRedirectStrategy;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.Row$;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;
import org.apache.spark.sql.catalyst.encoders.RowEncoder$;
import org.apache.spark.sql.functions$;
import org.apache.spark.sql.types.BinaryType;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.IntegerType$;
import org.apache.spark.sql.types.LongType$;
import org.apache.spark.sql.types.NullType$;
import org.apache.spark.sql.types.StringType;
import org.apache.spark.sql.types.StringType$;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructField$;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.types.StructType$;
import scala.Array$;
import scala.Function1;
import scala.Function2;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Option$;
import scala.Predef$;
import scala.Some;
import scala.Tuple16;
import scala.Tuple2;
import scala.collection.BufferedIterator;
import scala.collection.GenTraversableOnce;
import scala.collection.Iterator;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.TraversableLike;
import scala.collection.TraversableOnce;
import scala.collection.immutable.;
import scala.collection.immutable.List;
import scala.collection.immutable.List$;
import scala.collection.immutable.Map;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.ArrayOps;
import scala.io.Codec$;
import scala.io.Source$;
import scala.reflect.ClassTag$;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.ObjectRef;

public final class HTTPTransformStage$
implements scala.Serializable {
    public static HTTPTransformStage$ MODULE$;

    static {
        new HTTPTransformStage$();
    }

    public Option<Dataset<Row>> execute(HTTPTransformStage stage, SparkSession spark, Logger logger, API.ARCContext arcContext) {
        BoxedUnit boxedUnit;
        Dataset dataset;
        Dataset dataset2;
        StructType structType;
        int n;
        String signature = new StringBuilder(69).append("HTTPTransform requires a field named '").append(stage.inputField()).append("' of type 'string' or 'binary'.").toString();
        Dataset df = spark.table(stage.inputView());
        StructType schema = df.schema();
        URI stageUri = stage.uri();
        String stageInputField = stage.inputField();
        Map<String, String> stageHeaders = stage.headers();
        int stageBatchSize = stage.batchSize();
        String stageDelimiter = stage.delimiter();
        API.FailMode stageFailMode = stage.failMode();
        List<Object> stageValidStatusCodes = stage.validStatusCodes();
        try {
            n = schema.fieldIndex(stage.inputField());
        }
        catch (Exception e) {
            throw new DetailException(signature, df, stage){
                private final scala.collection.mutable.Map<String, Object> detail;

                public scala.collection.mutable.Map<String, Object> detail() {
                    return this.detail;
                }
                {
                    this.detail = stage$2.stageDetail();
                }
            };
        }
        int fieldIndex = n;
        DataType dataType = schema.fields()[fieldIndex].dataType();
        if (dataType instanceof StringType) {
            BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
        } else if (dataType instanceof BinaryType) {
            BoxedUnit boxedUnit3 = BoxedUnit.UNIT;
        } else {
            throw new DetailException(signature, stage, schema, fieldIndex){
                private final scala.collection.mutable.Map<String, Object> detail;

                public scala.collection.mutable.Map<String, Object> detail() {
                    return this.detail;
                }
                {
                    this.detail = stage$2.stageDetail();
                }
            };
        }
        API.FailMode failMode = stage.failMode();
        if (API$FailMode$Permissive$.MODULE$.equals(failMode)) {
            List list = new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[])df.schema().fields())).toList();
            StructField structField = new StructField("statusCode", (DataType)IntegerType$.MODULE$, false, StructField$.MODULE$.apply$default$4());
            StructField structField2 = new StructField("reasonPhrase", (DataType)StringType$.MODULE$, false, StructField$.MODULE$.apply$default$4());
            StructField structField3 = new StructField("contentType", (DataType)StringType$.MODULE$, false, StructField$.MODULE$.apply$default$4());
            StructField structField4 = new StructField("responseTime", (DataType)LongType$.MODULE$, false, StructField$.MODULE$.apply$default$4());
            structType = StructType$.MODULE$.apply((Seq)new .colon.colon((Object)new StructField("body", (DataType)StringType$.MODULE$, false, StructField$.MODULE$.apply$default$4()), (List)new .colon.colon((Object)new StructField("response", (DataType)StructType$.MODULE$.apply((Seq)Nil$.MODULE$.$colon$colon((Object)structField4).$colon$colon((Object)structField3).$colon$colon((Object)structField2).$colon$colon((Object)structField)), false, StructField$.MODULE$.apply$default$4()), (List)Nil$.MODULE$)).$colon$colon$colon(list));
        } else if (API$FailMode$FailFast$.MODULE$.equals(failMode)) {
            List list = new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[])df.schema().fields())).toList();
            structType = StructType$.MODULE$.apply((Seq)new .colon.colon((Object)new StructField("body", (DataType)StringType$.MODULE$, false, StructField$.MODULE$.apply$default$4()), (List)Nil$.MODULE$).$colon$colon$colon(list));
        } else {
            throw new MatchError((Object)failMode);
        }
        StructType typedSchema = structType;
        ExpressionEncoder typedEncoder = RowEncoder$.MODULE$.apply(typedSchema);
        try {
            dataset2 = df.mapPartitions((Function1 & Serializable & scala.Serializable)partition -> {
                NullType$ nullType$;
                int n;
                PoolingHttpClientConnectionManager poolingHttpClientConnectionManager = new PoolingHttpClientConnectionManager();
                poolingHttpClientConnectionManager.setMaxTotal(50);
                CloseableHttpClient httpClient = HttpClients.custom().setConnectionManager((HttpClientConnectionManager)poolingHttpClientConnectionManager).setRedirectStrategy((RedirectStrategy)new LaxRedirectStrategy()).build();
                String uri = stageUri.toString();
                BufferedIterator bufferedPartition = partition.buffered();
                boolean bl = bufferedPartition.hasNext();
                if (bl) {
                    n = ((Row)bufferedPartition.head()).fieldIndex(stageInputField);
                } else if (!bl) {
                    n = 0;
                } else {
                    throw new MatchError((Object)BoxesRunTime.boxToBoolean((boolean)bl));
                }
                int fieldIndex = n;
                boolean bl2 = bufferedPartition.hasNext();
                if (bl2) {
                    nullType$ = ((Row)bufferedPartition.head()).schema().apply(fieldIndex).dataType();
                } else if (!bl2) {
                    nullType$ = NullType$.MODULE$;
                } else {
                    throw new MatchError((Object)BoxesRunTime.boxToBoolean((boolean)bl2));
                }
                NullType$ dataType = nullType$;
                Iterator.GroupedIterator groupedPartition = bufferedPartition.grouped(stageBatchSize);
                return groupedPartition.flatMap(arg_0 -> HTTPTransformStage$.$anonfun$execute$2(uri, stageHeaders, (DataType)dataType, stageBatchSize, stageDelimiter, fieldIndex, httpClient, stageFailMode, stageValidStatusCodes, arg_0));
            }, (Encoder)typedEncoder);
        }
        catch (Exception e) {
            throw new DetailException(e, stage){
                private final scala.collection.mutable.Map<String, Object> detail;

                public scala.collection.mutable.Map<String, Object> detail() {
                    return this.detail;
                }
                {
                    this.detail = stage$2.stageDetail();
                }
            };
        }
        ObjectRef transformedDF = ObjectRef.create((Object)dataset2);
        new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[])df.schema().fields())).foreach((Function1 & Serializable & scala.Serializable)field -> {
            transformedDF.elem = ((Dataset)transformedDF.elem).withColumn(field.name(), functions$.MODULE$.col(field.name()).as(field.name(), field.metadata()));
            return BoxedUnit.UNIT;
        });
        List<String> list = stage.partitionBy();
        if (Nil$.MODULE$.equals(list)) {
            Dataset dataset3;
            Option<Object> option = stage.numPartitions();
            if (option instanceof Some) {
                Some some = (Some)option;
                int numPartitions = BoxesRunTime.unboxToInt((Object)some.value());
                dataset3 = ((Dataset)transformedDF.elem).repartition(numPartitions);
            } else if (None$.MODULE$.equals(option)) {
                dataset3 = (Dataset)transformedDF.elem;
            } else {
                throw new MatchError(option);
            }
            dataset = dataset3;
        } else {
            Dataset dataset4;
            List partitionCols = (List)list.map((Function1 & Serializable & scala.Serializable)col -> ((Dataset)transformedDF$1.elem).apply(col), List$.MODULE$.canBuildFrom());
            Option<Object> option = stage.numPartitions();
            if (option instanceof Some) {
                Some some = (Some)option;
                int numPartitions = BoxesRunTime.unboxToInt((Object)some.value());
                dataset4 = ((Dataset)transformedDF.elem).repartition(numPartitions, (Seq)partitionCols);
            } else if (None$.MODULE$.equals(option)) {
                dataset4 = ((Dataset)transformedDF.elem).repartition((Seq)partitionCols);
            } else {
                throw new MatchError(option);
            }
            dataset = dataset4;
        }
        Dataset repartitionedDF = dataset;
        if (arcContext.immutableViews()) {
            repartitionedDF.createTempView(stage.outputView());
        } else {
            repartitionedDF.createOrReplaceTempView(stage.outputView());
        }
        if (!repartitionedDF.isStreaming()) {
            stage.stageDetail().put((Object)"outputColumns", (Object)repartitionedDF.schema().length());
            stage.stageDetail().put((Object)"numPartitions", (Object)repartitionedDF.rdd().partitions().length);
            if (stage.persist()) {
                repartitionedDF.persist(arcContext.storageLevel());
                boxedUnit = stage.stageDetail().put((Object)"records", (Object)repartitionedDF.count());
            } else {
                boxedUnit = BoxedUnit.UNIT;
            }
        } else {
            boxedUnit = BoxedUnit.UNIT;
        }
        return Option$.MODULE$.apply((Object)repartitionedDF);
    }

    public HTTPTransformStage apply(HTTPTransform plugin, String name, Option<String> description, URI uri, Map<String, String> headers, List<Object> validStatusCodes, String inputView, String outputView, String inputField, Map<String, String> params, boolean persist, int batchSize, String delimiter, Option<Object> numPartitions, List<String> partitionBy, API.FailMode failMode) {
        return new HTTPTransformStage(plugin, name, description, uri, headers, validStatusCodes, inputView, outputView, inputField, params, persist, batchSize, delimiter, numPartitions, partitionBy, failMode);
    }

    public Option<Tuple16<HTTPTransform, String, Option<String>, URI, Map<String, String>, List<Object>, String, String, String, Map<String, String>, Object, Object, String, Option<Object>, List<String>, API.FailMode>> unapply(HTTPTransformStage x$0) {
        return x$0 == null ? None$.MODULE$ : new Some((Object)new Tuple16((Object)x$0.plugin(), (Object)x$0.name(), x$0.description(), (Object)x$0.uri(), x$0.headers(), x$0.validStatusCodes(), (Object)x$0.inputView(), (Object)x$0.outputView(), (Object)x$0.inputField(), x$0.params(), (Object)BoxesRunTime.boxToBoolean((boolean)x$0.persist()), (Object)BoxesRunTime.boxToInteger((int)x$0.batchSize()), (Object)x$0.delimiter(), x$0.numPartitions(), x$0.partitionBy(), (Object)x$0.failMode()));
    }

    private Object readResolve() {
        return MODULE$;
    }

    public static final /* synthetic */ boolean $anonfun$execute$3(Tuple2 check$ifrefutable$1) {
        Tuple2 tuple2 = check$ifrefutable$1;
        boolean bl = tuple2 != null;
        return bl;
    }

    public static final /* synthetic */ void $anonfun$execute$4(HttpPost post$1, Tuple2 x$32) {
        Tuple2 tuple2 = x$32;
        if (tuple2 == null) {
            throw new MatchError((Object)tuple2);
        }
        String k = (String)tuple2._1();
        String v = (String)tuple2._2();
        post$1.addHeader(k, v);
        BoxedUnit boxedUnit = BoxedUnit.UNIT;
    }

    public static final /* synthetic */ GenTraversableOnce $anonfun$execute$2(String uri$1, Map stageHeaders$1, DataType dataType$1, int stageBatchSize$1, String stageDelimiter$1, int fieldIndex$2, CloseableHttpClient httpClient$1, API.FailMode stageFailMode$1, List stageValidStatusCodes$1, Seq groupedRow) {
        GenTraversableOnce genTraversableOnce;
        StringEntity stringEntity;
        HttpPost post = new HttpPost(uri$1);
        stageHeaders$1.withFilter((Function1 & Serializable & scala.Serializable)check$ifrefutable$1 -> BoxesRunTime.boxToBoolean((boolean)HTTPTransformStage$.$anonfun$execute$3(check$ifrefutable$1))).foreach((Function1 & Serializable & scala.Serializable)x$32 -> {
            HTTPTransformStage$.$anonfun$execute$4(post, x$32);
            return BoxedUnit.UNIT;
        });
        DataType dataType = dataType$1;
        if (dataType instanceof StringType) {
            String delimiter = stageBatchSize$1 > 1 ? stageDelimiter$1 : "";
            stringEntity = new StringEntity(((TraversableOnce)groupedRow.map((Function1 & Serializable & scala.Serializable)row -> row.getString(fieldIndex$2), Seq$.MODULE$.canBuildFrom())).mkString(delimiter));
        } else if (dataType instanceof BinaryType) {
            byte[] delimiter = stageBatchSize$1 > 1 ? stageDelimiter$1.getBytes() : "".getBytes();
            stringEntity = new ByteArrayEntity((byte[])((TraversableOnce)groupedRow.map((Function1 & Serializable & scala.Serializable)row -> (byte[])row.get(fieldIndex$2), Seq$.MODULE$.canBuildFrom())).reduce((Function2 & Serializable & scala.Serializable)(x$33, x$34) -> (byte[])new ArrayOps.ofByte(Predef$.MODULE$.byteArrayOps((byte[])new ArrayOps.ofByte(Predef$.MODULE$.byteArrayOps(x$33)).$plus$plus((GenTraversableOnce)new ArrayOps.ofByte(Predef$.MODULE$.byteArrayOps(delimiter)), Array$.MODULE$.canBuildFrom(ClassTag$.MODULE$.Byte())))).$plus$plus((GenTraversableOnce)new ArrayOps.ofByte(Predef$.MODULE$.byteArrayOps(x$34)), Array$.MODULE$.canBuildFrom(ClassTag$.MODULE$.Byte()))));
        } else {
            throw new MatchError((Object)dataType);
        }
        StringEntity entity = stringEntity;
        post.setEntity((HttpEntity)entity);
        try {
            long requestStartTime = System.currentTimeMillis();
            CloseableHttpResponse response = httpClient$1.execute((HttpUriRequest)post);
            long responseTime = System.currentTimeMillis() - requestStartTime;
            API.FailMode failMode = stageFailMode$1;
            API$FailMode$FailFast$ aPI$FailMode$FailFast$ = API$FailMode$FailFast$.MODULE$;
            if (!(failMode != null ? !failMode.equals(aPI$FailMode$FailFast$) : aPI$FailMode$FailFast$ != null) && !stageValidStatusCodes$1.contains((Object)BoxesRunTime.boxToInteger((int)response.getStatusLine().getStatusCode()))) {
                throw new Exception(new StringBuilder(85).append("HTTPTransform expects all response StatusCode(s) in [").append(stageValidStatusCodes$1.mkString(", ")).append("] but server responded with ").append(response.getStatusLine().getStatusCode()).append(" (").append(response.getStatusLine().getReasonPhrase()).append(").").toString());
            }
            InputStream content = response.getEntity().getContent();
            String[] body = stageBatchSize$1 > 1 ? Source$.MODULE$.fromInputStream(content, Codec$.MODULE$.fallbackSystemCodec()).mkString().split(stageDelimiter$1) : (String[])((Object[])new String[]{Source$.MODULE$.fromInputStream(content, Codec$.MODULE$.fallbackSystemCodec()).mkString()});
            response.close();
            if (body.length != groupedRow.length()) {
                throw new Exception(new StringBuilder(114).append("HTTPTransform expects the response to contain same number of results as 'batchSize' (").append(stageBatchSize$1).append(") but server responded with ").append(body.length).append(".").toString());
            }
            genTraversableOnce = (GenTraversableOnce)((TraversableLike)groupedRow.zipWithIndex(Seq$.MODULE$.canBuildFrom())).map((Function1 & Serializable & scala.Serializable)x0$1 -> {
                Row row;
                Tuple2 tuple2 = x0$1;
                if (tuple2 == null) throw new MatchError((Object)tuple2);
                Row row2 = (Row)tuple2._1();
                int index = tuple2._2$mcI$sp();
                API.FailMode failMode = stageFailMode$1;
                if (API$FailMode$Permissive$.MODULE$.equals(failMode)) {
                    row = Row$.MODULE$.fromSeq((Seq)row2.toSeq().$plus$plus((GenTraversableOnce)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray(new Object[]{body[index], Row$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToInteger((int)response.getStatusLine().getStatusCode()), response.getStatusLine().getReasonPhrase(), response.getEntity().getContentType().toString().replace("Content-Type: ", ""), BoxesRunTime.boxToLong((long)responseTime)}))})), Seq$.MODULE$.canBuildFrom()));
                    return row;
                } else {
                    if (!API$FailMode$FailFast$.MODULE$.equals(failMode)) throw new MatchError((Object)failMode);
                    row = Row$.MODULE$.fromSeq((Seq)row2.toSeq().$plus$plus((GenTraversableOnce)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{body[index]})), Seq$.MODULE$.canBuildFrom()));
                }
                return row;
            }, Seq$.MODULE$.canBuildFrom());
        }
        finally {
            post.releaseConnection();
        }
        return genTraversableOnce;
    }

    private HTTPTransformStage$() {
        MODULE$ = this;
    }
}

