/*
 * Decompiled with CFR 0.152.
 */
package org.apache.spark.sql.connect.service;

import java.io.Serializable;
import org.apache.spark.SimpleFutureAction;
import org.apache.spark.SparkEnv$;
import org.apache.spark.connect.proto.ExecutePlanResponse;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.connect.common.DataTypeProtoConverter$;
import org.apache.spark.sql.connect.common.LiteralValueProtoConverter$;
import org.apache.spark.sql.connect.config.Connect$;
import org.apache.spark.sql.connect.service.MetricGenerator$;
import org.apache.spark.sql.execution.SQLExecution$;
import org.apache.spark.sql.execution.arrow.ArrowConverters;
import org.apache.spark.sql.execution.arrow.ArrowConverters$;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.util.ThreadUtils$;
import org.sparkproject.connect.grpc.stub.StreamObserver;
import org.sparkproject.connect.protobuf.ByteString;
import scala.Function0;
import scala.Function1;
import scala.Function2;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef;
import scala.Predef$;
import scala.Some;
import scala.Tuple2;
import scala.collection.Iterable;
import scala.collection.Iterator;
import scala.collection.JavaConverters$;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.immutable.IndexedSeq;
import scala.collection.immutable.IndexedSeq$;
import scala.collection.immutable.Iterable$;
import scala.collection.mutable.ArrayOps;
import scala.concurrent.ExecutionContext;
import scala.math.Integral;
import scala.math.Numeric;
import scala.reflect.ClassTag$;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.IntRef;
import scala.runtime.ObjectRef;
import scala.runtime.RichInt$;
import scala.runtime.java8.JFunction0;
import scala.util.Try;

public final class SparkConnectStreamHandler$ {
    public static SparkConnectStreamHandler$ MODULE$;

    static {
        new SparkConnectStreamHandler$();
    }

    public Function1<Iterator<InternalRow>, Iterator<Tuple2<byte[], Object>>> rowToArrowConverter(StructType schema, int maxRecordsPerBatch, long maxBatchSize, String timeZoneId) {
        return (Function1 & Serializable & scala.Serializable)rows -> {
            ArrowConverters.ArrowBatchWithSchemaIterator batches = ArrowConverters$.MODULE$.toBatchWithSchemaIterator(rows, schema, (long)maxRecordsPerBatch, maxBatchSize, timeZoneId);
            return batches.map((Function1 & Serializable & scala.Serializable)b -> Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(b), (Object)BoxesRunTime.boxToLong((long)batches.rowCountInLastBatch())));
        };
    }

    public void processAsArrowBatches(String sessionId, Dataset<Row> dataframe, StreamObserver<ExecutePlanResponse> responseObserver) {
        SparkSession spark = dataframe.sparkSession();
        StructType schema = dataframe.schema();
        int maxRecordsPerBatch = spark.sessionState().conf().arrowMaxRecordsPerBatch();
        String timeZoneId = spark.sessionState().conf().sessionLocalTimeZone();
        long maxBatchSize = (long)((double)BoxesRunTime.unboxToLong((Object)SparkEnv$.MODULE$.get().conf().get(Connect$.MODULE$.CONNECT_GRPC_ARROW_MAX_BATCH_SIZE())) * 0.7);
        SQLExecution$.MODULE$.withNewExecutionId(dataframe.queryExecution(), (Option)new Some((Object)"collectArrow"), (Function0)(JFunction0.mcV.sp & Serializable & scala.Serializable)() -> {
            RDD rows = dataframe.queryExecution().executedPlan().execute();
            int numPartitions = rows.getNumPartitions();
            IntRef numSent = IntRef.create((int)0);
            if (numPartitions > 0) {
                RDD batches = rows.mapPartitionsInternal(MODULE$.rowToArrowConverter(schema, maxRecordsPerBatch, maxBatchSize, timeZoneId), rows.mapPartitionsInternal$default$2(), ClassTag$.MODULE$.apply(Tuple2.class));
                Object signal = new Object();
                Tuple2[][] partitions = new Tuple2[numPartitions][];
                ObjectRef error = ObjectRef.create((Object)None$.MODULE$);
                Function2 & Serializable & scala.Serializable resultHandler = (Function2 & Serializable & scala.Serializable)(partitionId, partition) -> {
                    SparkConnectStreamHandler$.$anonfun$processAsArrowBatches$2(signal, partitions, BoxesRunTime.unboxToInt((Object)partitionId), partition);
                    return BoxedUnit.UNIT;
                };
                SimpleFutureAction future = spark.sparkContext().submitJob(batches, (Function1 & Serializable & scala.Serializable)iter -> (Tuple2[])iter.toArray(ClassTag$.MODULE$.apply(Tuple2.class)), (Seq)Seq$.MODULE$.range((Object)BoxesRunTime.boxToInteger((int)0), (Object)BoxesRunTime.boxToInteger((int)numPartitions), (Integral)Numeric.IntIsIntegral$.MODULE$), (Function2)resultHandler, (Function0 & Serializable & scala.Serializable)() -> (JFunction0.mcV.sp & Serializable & scala.Serializable)() -> {});
                future.onComplete((Function1 & Serializable & scala.Serializable)result -> {
                    SparkConnectStreamHandler$.$anonfun$processAsArrowBatches$6(signal, error, result);
                    return BoxedUnit.UNIT;
                }, (ExecutionContext)ThreadUtils$.MODULE$.sameThread());
                for (int currentPartitionId = 0; currentPartitionId < numPartitions; ++currentPartitionId) {
                    Tuple2[] tuple2Array;
                    Object object = signal;
                    synchronized (object) {
                        Tuple2[] part = partitions[currentPartitionId];
                        while (part == null && ((Option)error.elem).isEmpty()) {
                            signal.wait();
                            part = partitions[currentPartitionId];
                        }
                        partitions[currentPartitionId] = null;
                        ((Option)error.elem).foreach((Function1 & Serializable & scala.Serializable)x0$1 -> {
                            Throwable throwable = x0$1;
                            throw throwable;
                        });
                        tuple2Array = part;
                    }
                    Tuple2[] partition2 = tuple2Array;
                    new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[])partition2)).foreach((Function1 & Serializable & scala.Serializable)x0$2 -> {
                        SparkConnectStreamHandler$.$anonfun$processAsArrowBatches$9(sessionId, responseObserver, numSent, x0$2);
                        return BoxedUnit.UNIT;
                    });
                }
            }
            if (numSent.elem == 0) {
                byte[] bytes = ArrowConverters$.MODULE$.createEmptyArrowBatch(schema, timeZoneId);
                ExecutePlanResponse.Builder response = ExecutePlanResponse.newBuilder().setSessionId(sessionId);
                ExecutePlanResponse.ArrowBatch batch = ExecutePlanResponse.ArrowBatch.newBuilder().setRowCount(0L).setData(ByteString.copyFrom(bytes)).build();
                response.setArrowBatch(batch);
                responseObserver.onNext(response.build());
                return;
            }
        });
    }

    public ExecutePlanResponse sendSchemaToResponse(String sessionId, StructType schema) {
        return ExecutePlanResponse.newBuilder().setSessionId(sessionId).setSchema(DataTypeProtoConverter$.MODULE$.toConnectProtoType((DataType)schema)).build();
    }

    public ExecutePlanResponse createMetricsResponse(String sessionId, Dataset<Row> rows) {
        return ExecutePlanResponse.newBuilder().setSessionId(sessionId).setMetrics(MetricGenerator$.MODULE$.buildMetrics(rows.queryExecution().executedPlan())).build();
    }

    public ExecutePlanResponse sendObservedMetricsToResponse(String sessionId, Dataset<Row> dataframe) {
        scala.collection.immutable.Iterable observedMetrics = (scala.collection.immutable.Iterable)dataframe.queryExecution().observedMetrics().map((Function1 & Serializable & scala.Serializable)x0$1 -> {
            Tuple2 tuple2 = x0$1;
            if (tuple2 != null) {
                String name = (String)tuple2._1();
                Row row = (Row)tuple2._2();
                IndexedSeq cols = (IndexedSeq)RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), row.length()).map((Function1 & Serializable & scala.Serializable)i -> LiteralValueProtoConverter$.MODULE$.toLiteralProto(row.apply(BoxesRunTime.unboxToInt((Object)i))), IndexedSeq$.MODULE$.canBuildFrom());
                return ExecutePlanResponse.ObservedMetrics.newBuilder().setName(name).addAllValues((java.lang.Iterable)JavaConverters$.MODULE$.seqAsJavaListConverter((Seq)cols).asJava()).build();
            }
            throw new MatchError((Object)tuple2);
        }, Iterable$.MODULE$.canBuildFrom());
        return ExecutePlanResponse.newBuilder().setSessionId(sessionId).addAllObservedMetrics((java.lang.Iterable)JavaConverters$.MODULE$.asJavaIterableConverter((Iterable)observedMetrics).asJava()).build();
    }

    public static final /* synthetic */ void $anonfun$processAsArrowBatches$2(Object signal$1, Tuple2[][] partitions$1, int partitionId, Tuple2[] partition) {
        Object object = signal$1;
        synchronized (object) {
            partitions$1[partitionId] = partition;
            signal$1.notify();
        }
    }

    public static final /* synthetic */ void $anonfun$processAsArrowBatches$7(Object signal$1, ObjectRef error$1, Throwable throwable) {
        Object object = signal$1;
        synchronized (object) {
            error$1.elem = new Some((Object)throwable);
            signal$1.notify();
        }
    }

    public static final /* synthetic */ void $anonfun$processAsArrowBatches$6(Object signal$1, ObjectRef error$1, Try result) {
        result.failed().foreach((Function1 & Serializable & scala.Serializable)throwable -> {
            SparkConnectStreamHandler$.$anonfun$processAsArrowBatches$7(signal$1, error$1, throwable);
            return BoxedUnit.UNIT;
        });
    }

    public static final /* synthetic */ void $anonfun$processAsArrowBatches$9(String sessionId$1, StreamObserver responseObserver$1, IntRef numSent$1, Tuple2 x0$2) {
        Tuple2 tuple2 = x0$2;
        if (tuple2 != null) {
            byte[] bytes = (byte[])tuple2._1();
            long count = tuple2._2$mcJ$sp();
            ExecutePlanResponse.Builder response = ExecutePlanResponse.newBuilder().setSessionId(sessionId$1);
            ExecutePlanResponse.ArrowBatch batch = ExecutePlanResponse.ArrowBatch.newBuilder().setRowCount(count).setData(ByteString.copyFrom(bytes)).build();
            response.setArrowBatch(batch);
            responseObserver$1.onNext(response.build());
            ++numSent$1.elem;
            return;
        }
        throw new MatchError((Object)tuple2);
    }

    private SparkConnectStreamHandler$() {
        MODULE$ = this;
    }
}

