/*
 * Decompiled with CFR 0.152.
 */
package org.apache.spark.sql.connect.execution;

import java.io.Serializable;
import org.apache.spark.SparkEnv$;
import org.apache.spark.connect.proto.ExecutePlanRequest;
import org.apache.spark.connect.proto.ExecutePlanResponse;
import org.apache.spark.connect.proto.Plan;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Dataset$;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.QueryPlanningTracker;
import org.apache.spark.sql.connect.common.DataTypeProtoConverter$;
import org.apache.spark.sql.connect.common.LiteralValueProtoConverter$;
import org.apache.spark.sql.connect.config.Connect$;
import org.apache.spark.sql.connect.execution.ExecuteResponseObserver;
import org.apache.spark.sql.connect.planner.SparkConnectPlanner;
import org.apache.spark.sql.connect.service.ExecuteHolder;
import org.apache.spark.sql.connect.service.SessionHolder;
import org.apache.spark.sql.connect.utils.MetricGenerator$;
import org.apache.spark.sql.execution.LocalTableScanExec;
import org.apache.spark.sql.execution.SQLExecution$;
import org.apache.spark.sql.execution.SparkPlan;
import org.apache.spark.sql.execution.arrow.ArrowConverters;
import org.apache.spark.sql.execution.arrow.ArrowConverters$;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.util.ThreadUtils$;
import org.sparkproject.connect.grpc.stub.StreamObserver;
import org.sparkproject.connect.protobuf.ByteString;
import scala.Function0;
import scala.Function1;
import scala.Function2;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.PartialFunction;
import scala.Predef;
import scala.Predef$;
import scala.Some;
import scala.Tuple2;
import scala.collection.Iterable;
import scala.collection.Iterator;
import scala.collection.JavaConverters$;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.immutable.IndexedSeq;
import scala.collection.immutable.IndexedSeq$;
import scala.collection.immutable.Iterable$;
import scala.collection.mutable.ArrayOps;
import scala.concurrent.Awaitable;
import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.Duration$;
import scala.math.Integral;
import scala.math.Numeric;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.IntRef;
import scala.runtime.LongRef;
import scala.runtime.ObjectRef;
import scala.runtime.RichInt$;
import scala.runtime.java8.JFunction0;
import scala.util.Failure;
import scala.util.Success;
import scala.util.Try;

@ScalaSignature(bytes="\u0006\u0001\u0005Md!B\u0007\u000f\u00019Q\u0002\u0002C\u0011\u0001\u0005\u0003\u0005\u000b\u0011B\u0012\t\u000b%\u0002A\u0011\u0001\u0016\t\u000f9\u0002!\u0019!C\u0005_!11\u0007\u0001Q\u0001\nABq\u0001\u000e\u0001C\u0002\u0013%Q\u0007\u0003\u0004;\u0001\u0001\u0006IA\u000e\u0005\u0006w\u0001!\t\u0001P\u0003\u0005\u0019\u0002\u0001Q\nC\u0003Z\u0001\u0011\u0005!\fC\u0004\u0002$\u0001!\t!!\n\t\u000f\u0005\u0005\u0004\u0001\"\u0003\u0002d!9\u00111\u000e\u0001\u0005\n\u00055$!G*qCJ\\7i\u001c8oK\u000e$\b\u000b\\1o\u000bb,7-\u001e;j_:T!a\u0004\t\u0002\u0013\u0015DXmY;uS>t'BA\t\u0013\u0003\u001d\u0019wN\u001c8fGRT!a\u0005\u000b\u0002\u0007M\fHN\u0003\u0002\u0016-\u0005)1\u000f]1sW*\u0011q\u0003G\u0001\u0007CB\f7\r[3\u000b\u0003e\t1a\u001c:h'\t\u00011\u0004\u0005\u0002\u001d?5\tQDC\u0001\u001f\u0003\u0015\u00198-\u00197b\u0013\t\u0001SD\u0001\u0004B]f\u0014VMZ\u0001\u000eKb,7-\u001e;f\u0011>dG-\u001a:\u0004\u0001A\u0011AeJ\u0007\u0002K)\u0011a\u0005E\u0001\bg\u0016\u0014h/[2f\u0013\tASEA\u0007Fq\u0016\u001cW\u000f^3I_2$WM]\u0001\u0007y%t\u0017\u000e\u001e \u0015\u0005-j\u0003C\u0001\u0017\u0001\u001b\u0005q\u0001\"B\u0011\u0003\u0001\u0004\u0019\u0013!D:fgNLwN\u001c%pY\u0012,'/F\u00011!\t!\u0013'\u0003\u00023K\ti1+Z:tS>t\u0007j\u001c7eKJ\fab]3tg&|g\u000eS8mI\u0016\u0014\b%A\u0004tKN\u001c\u0018n\u001c8\u0016\u0003Y\u0002\"a\u000e\u001d\u000e\u0003II!!\u000f\n\u0003\u0019M\u0003\u0018M]6TKN\u001c\u0018n\u001c8\u0002\u0011M,7o]5p]\u0002\n!\u0002[1oI2,\u0007\u000b\\1o)\ti\u0004\t\u0005\u0002\u001d}%\u0011q(\b\u0002\u0005+:LG\u000fC\u0003B\u000f\u0001\u0007!)\u0001\tsKN\u0004xN\\:f\u001f\n\u001cXM\u001d<feB\u0019AfQ#\n\u0005\u0011s!aF#yK\u000e,H/\u001a*fgB|gn]3PEN,'O^3s!\t1%*D\u0001H\u0015\tA\u0015*A\u0003qe>$xN\u0003\u0002\u0012)%\u00111j\u0012\u0002\u0014\u000bb,7-\u001e;f!2\fgNU3ta>t7/\u001a\u0002\u0006\u0005\u0006$8\r\u001b\t\u000599\u0003f+\u0003\u0002P;\t1A+\u001e9mKJ\u00022\u0001H)T\u0013\t\u0011VDA\u0003BeJ\f\u0017\u0010\u0005\u0002\u001d)&\u0011Q+\b\u0002\u0005\u0005f$X\r\u0005\u0002\u001d/&\u0011\u0001,\b\u0002\u0005\u0019>tw-A\ns_^$v.\u0011:s_^\u001cuN\u001c<feR,'\u000fF\u0005\\gn\f\t!!\u0002\u0002\u001aA!A\u0004\u00180q\u0013\tiVDA\u0005Gk:\u001cG/[8ocA\u0019ql\u001a6\u000f\u0005\u0001,gBA1e\u001b\u0005\u0011'BA2#\u0003\u0019a$o\\8u}%\ta$\u0003\u0002g;\u00059\u0001/Y2lC\u001e,\u0017B\u00015j\u0005!IE/\u001a:bi>\u0014(B\u00014\u001e!\tYg.D\u0001m\u0015\ti'#\u0001\u0005dCR\fG._:u\u0013\tyGNA\u0006J]R,'O\\1m%><\bcA0hcB\u0011!\u000fC\u0007\u0002\u0001!)A/\u0003a\u0001k\u000611o\u00195f[\u0006\u0004\"A^=\u000e\u0003]T!\u0001\u001f\n\u0002\u000bQL\b/Z:\n\u0005i<(AC*ueV\u001cG\u000fV=qK\")A0\u0003a\u0001{\u0006\u0011R.\u0019=SK\u000e|'\u000fZ:QKJ\u0014\u0015\r^2i!\tab0\u0003\u0002\u0000;\t\u0019\u0011J\u001c;\t\r\u0005\r\u0011\u00021\u0001W\u00031i\u0017\r\u001f\"bi\u000eD7+\u001b>f\u0011\u001d\t9!\u0003a\u0001\u0003\u0013\t!\u0002^5nKj{g.Z%e!\u0011\tY!a\u0005\u000f\t\u00055\u0011q\u0002\t\u0003CvI1!!\u0005\u001e\u0003\u0019\u0001&/\u001a3fM&!\u0011QCA\f\u0005\u0019\u0019FO]5oO*\u0019\u0011\u0011C\u000f\t\u000f\u0005m\u0011\u00021\u0001\u0002\u001e\u0005YRM\u001d:pe>sG)\u001e9mS\u000e\fG/\u001a3GS\u0016dGMT1nKN\u00042\u0001HA\u0010\u0013\r\t\t#\b\u0002\b\u0005>|G.Z1o\u0003U\u0001(o\\2fgN\f5/\u0011:s_^\u0014\u0015\r^2iKN$r!PA\u0014\u0003\u000f\ni\u0006C\u0004\u0002*)\u0001\r!a\u000b\u0002\u0013\u0011\fG/\u00194sC6,\u0007\u0003BA\u0017\u0003\u0003rA!a\f\u0002@9!\u0011\u0011GA\u001f\u001d\u0011\t\u0019$a\u000f\u000f\t\u0005U\u0012\u0011\b\b\u0004C\u0006]\u0012\"A\r\n\u0005]A\u0012BA\u000b\u0017\u0013\t\u0019B#\u0003\u0002g%%!\u00111IA#\u0005%!\u0015\r^1Ge\u0006lWM\u0003\u0002g%!1\u0011I\u0003a\u0001\u0003\u0013\u0002R!a\u0013\u0002Z\u0015k!!!\u0014\u000b\t\u0005=\u0013\u0011K\u0001\u0005gR,(M\u0003\u0003\u0002T\u0005U\u0013\u0001B4sa\u000eT!!a\u0016\u0002\u0005%|\u0017\u0002BA.\u0003\u001b\u0012ab\u0015;sK\u0006lwJY:feZ,'\u000f\u0003\u0004\u0002`)\u0001\raI\u0001\fKb,7-\u001e;f!2\fg.\u0001\u000bde\u0016\fG/Z*dQ\u0016l\u0017MU3ta>t7/\u001a\u000b\u0006\u000b\u0006\u0015\u0014\u0011\u000e\u0005\b\u0003OZ\u0001\u0019AA\u0005\u0003%\u0019Xm]:j_:LE\rC\u0003u\u0017\u0001\u0007Q/A\u000fde\u0016\fG/Z(cg\u0016\u0014h/\u001a3NKR\u0014\u0018nY:SKN\u0004xN\\:f)\u0015)\u0015qNA9\u0011\u001d\t9\u0007\u0004a\u0001\u0003\u0013Aq!!\u000b\r\u0001\u0004\tY\u0003")
public class SparkConnectPlanExecution {
    private final ExecuteHolder executeHolder;
    private final SessionHolder sessionHolder;
    private final SparkSession session;

    private SessionHolder sessionHolder() {
        return this.sessionHolder;
    }

    private SparkSession session() {
        return this.session;
    }

    public void handlePlan(ExecuteResponseObserver<ExecutePlanResponse> responseObserver) {
        ExecutePlanRequest request = this.executeHolder.request();
        Plan.OpTypeCase opTypeCase = request.getPlan().getOpTypeCase();
        Plan.OpTypeCase opTypeCase2 = Plan.OpTypeCase.ROOT;
        if (opTypeCase == null ? opTypeCase2 != null : !opTypeCase.equals(opTypeCase2)) {
            throw new IllegalStateException(new StringBuilder(43).append("Illegal operation type ").append(request.getPlan().getOpTypeCase()).append(" to be handled here.").toString());
        }
        SparkConnectPlanner planner = new SparkConnectPlanner(this.sessionHolder());
        QueryPlanningTracker tracker = this.executeHolder.eventsManager().createQueryPlanningTracker();
        Dataset dataframe = Dataset$.MODULE$.ofRows(this.sessionHolder().session(), planner.transformRelation(request.getPlan().getRoot()), tracker);
        responseObserver.onNext(this.createSchemaResponse(request.getSessionId(), dataframe.schema()));
        this.processAsArrowBatches((Dataset<Row>)dataframe, responseObserver, this.executeHolder);
        responseObserver.onNext(MetricGenerator$.MODULE$.createMetricsResponse(request.getSessionId(), (Dataset<Row>)dataframe));
        if (dataframe.queryExecution().observedMetrics().nonEmpty()) {
            responseObserver.onNext(this.createObservedMetricsResponse(request.getSessionId(), (Dataset<Row>)dataframe));
            return;
        }
    }

    public Function1<Iterator<InternalRow>, Iterator<Tuple2<byte[], Object>>> rowToArrowConverter(StructType schema, int maxRecordsPerBatch, long maxBatchSize, String timeZoneId, boolean errorOnDuplicatedFieldNames) {
        return (Function1 & Serializable & scala.Serializable)rows -> {
            ArrowConverters.ArrowBatchWithSchemaIterator batches = ArrowConverters$.MODULE$.toBatchWithSchemaIterator(rows, schema, (long)maxRecordsPerBatch, maxBatchSize, timeZoneId, errorOnDuplicatedFieldNames);
            return batches.map((Function1 & Serializable & scala.Serializable)b -> Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(b), (Object)BoxesRunTime.boxToLong((long)batches.rowCountInLastBatch())));
        };
    }

    public void processAsArrowBatches(Dataset<Row> dataframe, StreamObserver<ExecutePlanResponse> responseObserver, ExecuteHolder executePlan) {
        String sessionId = executePlan.sessionHolder().sessionId();
        SparkSession spark = dataframe.sparkSession();
        StructType schema = dataframe.schema();
        int maxRecordsPerBatch = spark.sessionState().conf().arrowMaxRecordsPerBatch();
        String timeZoneId = spark.sessionState().conf().sessionLocalTimeZone();
        long maxBatchSize = (long)((double)BoxesRunTime.unboxToLong((Object)SparkEnv$.MODULE$.get().conf().get(Connect$.MODULE$.CONNECT_GRPC_ARROW_MAX_BATCH_SIZE())) * 0.7);
        Function1<Iterator<InternalRow>, Iterator<Tuple2<byte[], Object>>> converter = this.rowToArrowConverter(schema, maxRecordsPerBatch, maxBatchSize, timeZoneId, false);
        IntRef numSent = IntRef.create((int)0);
        LongRef totalNumRows = LongRef.create((long)0L);
        SparkPlan sparkPlan = dataframe.queryExecution().executedPlan();
        if (sparkPlan instanceof LocalTableScanExec) {
            LocalTableScanExec localTableScanExec = (LocalTableScanExec)sparkPlan;
            Seq rows = localTableScanExec.rows();
            ((Iterator)converter.apply((Object)rows.iterator())).foreach((Function1 & Serializable & scala.Serializable)x0$1 -> {
                SparkConnectPlanExecution.$anonfun$processAsArrowBatches$1(sessionId, responseObserver, numSent, totalNumRows, x0$1);
                return BoxedUnit.UNIT;
            });
            executePlan.eventsManager().postFinished((Option<Object>)new Some((Object)BoxesRunTime.boxToLong((long)totalNumRows.elem)));
        } else {
            BoxedUnit cfr_ignored_0 = (BoxedUnit)SQLExecution$.MODULE$.withNewExecutionId(dataframe.queryExecution(), (Option)new Some((Object)"collectArrow"), (Function0)(JFunction0.mcV.sp & Serializable & scala.Serializable)() -> {
                RDD rows = dataframe.queryExecution().executedPlan().execute();
                int numPartitions = rows.getNumPartitions();
                if (numPartitions > 0) {
                    RDD batches = rows.mapPartitionsInternal(converter, rows.mapPartitionsInternal$default$2(), ClassTag$.MODULE$.apply(Tuple2.class));
                    Object signal = new Object();
                    Tuple2[][] partitions = new Tuple2[numPartitions][];
                    ObjectRef error = ObjectRef.create((Object)None$.MODULE$);
                    Function2 & Serializable & scala.Serializable resultHandler = (Function2 & Serializable & scala.Serializable)(partitionId, partition) -> {
                        SparkConnectPlanExecution.$anonfun$processAsArrowBatches$3(signal, partitions, BoxesRunTime.unboxToInt((Object)partitionId), partition);
                        return BoxedUnit.UNIT;
                    };
                    Future future = spark.sparkContext().submitJob(batches, (Function1 & Serializable & scala.Serializable)iter -> (Tuple2[])iter.toArray(ClassTag$.MODULE$.apply(Tuple2.class)), (Seq)Seq$.MODULE$.range((Object)BoxesRunTime.boxToInteger((int)0), (Object)BoxesRunTime.boxToInteger((int)numPartitions), (Integral)Numeric.IntIsIntegral$.MODULE$), (Function2)resultHandler, (Function0 & Serializable & scala.Serializable)() -> (JFunction0.mcV.sp & Serializable & scala.Serializable)() -> {}).andThen((PartialFunction)new scala.Serializable(null, signal, error){
                        public static final long serialVersionUID = 0L;
                        private final Object signal$1;
                        private final ObjectRef error$1;

                        public final <A1 extends Try<Function0<BoxedUnit>>, B1> B1 applyOrElse(A1 x1, Function1<A1, B1> function1) {
                            A1 A1 = x1;
                            if (A1 instanceof Success) {
                                return (B1)BoxedUnit.UNIT;
                            }
                            if (A1 instanceof Failure) {
                                Failure failure = (Failure)A1;
                                Throwable throwable = failure.exception();
                                Object object = this.signal$1;
                                synchronized (object) {
                                    this.error$1.elem = new Some((Object)throwable);
                                    this.signal$1.notify();
                                }
                                return (B1)BoxedUnit.UNIT;
                            }
                            return (B1)function1.apply(x1);
                        }

                        public final boolean isDefinedAt(Try<Function0<BoxedUnit>> x1) {
                            Try<Function0<BoxedUnit>> try_ = x1;
                            if (try_ instanceof Success) {
                                return true;
                            }
                            return try_ instanceof Failure;
                        }
                        {
                            this.signal$1 = signal$1;
                            this.error$1 = error$1;
                        }
                    }, (ExecutionContext)ThreadUtils$.MODULE$.sameThread());
                    for (int currentPartitionId = 0; currentPartitionId < numPartitions; ++currentPartitionId) {
                        Tuple2[] tuple2Array;
                        Object object = signal;
                        synchronized (object) {
                            Tuple2[] part = partitions[currentPartitionId];
                            while (part == null && ((Option)error.elem).isEmpty()) {
                                signal.wait();
                                part = partitions[currentPartitionId];
                            }
                            partitions[currentPartitionId] = null;
                            ((Option)error.elem).foreach((Function1 & Serializable & scala.Serializable)other -> {
                                throw other;
                            });
                            tuple2Array = part;
                        }
                        Tuple2[] partition2 = tuple2Array;
                        new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[])partition2)).foreach((Function1 & Serializable & scala.Serializable)x0$2 -> {
                            SparkConnectPlanExecution.$anonfun$processAsArrowBatches$8(sessionId, responseObserver, numSent, totalNumRows, x0$2);
                            return BoxedUnit.UNIT;
                        });
                    }
                    ThreadUtils$.MODULE$.awaitReady((Awaitable)future, (Duration)Duration$.MODULE$.Inf());
                    executePlan.eventsManager().postFinished((Option<Object>)new Some((Object)BoxesRunTime.boxToLong((long)totalNumRows$1.elem)));
                    return;
                }
                executePlan.eventsManager().postFinished((Option<Object>)new Some((Object)BoxesRunTime.boxToLong((long)totalNumRows$1.elem)));
            });
        }
        if (numSent.elem == 0) {
            SparkConnectPlanExecution.sendBatch$1(ArrowConverters$.MODULE$.createEmptyArrowBatch(schema, timeZoneId, false), 0L, sessionId, responseObserver, numSent, totalNumRows);
            return;
        }
    }

    private ExecutePlanResponse createSchemaResponse(String sessionId, StructType schema) {
        return ExecutePlanResponse.newBuilder().setSessionId(sessionId).setSchema(DataTypeProtoConverter$.MODULE$.toConnectProtoType((DataType)schema)).build();
    }

    private ExecutePlanResponse createObservedMetricsResponse(String sessionId, Dataset<Row> dataframe) {
        scala.collection.immutable.Iterable observedMetrics = (scala.collection.immutable.Iterable)dataframe.queryExecution().observedMetrics().map((Function1 & Serializable & scala.Serializable)x0$1 -> {
            Tuple2 tuple2 = x0$1;
            if (tuple2 != null) {
                String name = (String)tuple2._1();
                Row row = (Row)tuple2._2();
                IndexedSeq cols = (IndexedSeq)RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), row.length()).map((Function1 & Serializable & scala.Serializable)i -> LiteralValueProtoConverter$.MODULE$.toLiteralProto(row.apply(BoxesRunTime.unboxToInt((Object)i))), IndexedSeq$.MODULE$.canBuildFrom());
                return ExecutePlanResponse.ObservedMetrics.newBuilder().setName(name).addAllValues((java.lang.Iterable)JavaConverters$.MODULE$.seqAsJavaListConverter((Seq)cols).asJava()).build();
            }
            throw new MatchError((Object)tuple2);
        }, Iterable$.MODULE$.canBuildFrom());
        return ExecutePlanResponse.newBuilder().setSessionId(sessionId).addAllObservedMetrics((java.lang.Iterable)JavaConverters$.MODULE$.asJavaIterableConverter((Iterable)observedMetrics).asJava()).build();
    }

    private static final void sendBatch$1(byte[] bytes, long count, String sessionId$1, StreamObserver responseObserver$1, IntRef numSent$1, LongRef totalNumRows$1) {
        ExecutePlanResponse.Builder response = ExecutePlanResponse.newBuilder().setSessionId(sessionId$1);
        ExecutePlanResponse.ArrowBatch batch = ExecutePlanResponse.ArrowBatch.newBuilder().setRowCount(count).setData(ByteString.copyFrom(bytes)).build();
        response.setArrowBatch(batch);
        responseObserver$1.onNext(response.build());
        ++numSent$1.elem;
        totalNumRows$1.elem += count;
    }

    public static final /* synthetic */ void $anonfun$processAsArrowBatches$1(String sessionId$1, StreamObserver responseObserver$1, IntRef numSent$1, LongRef totalNumRows$1, Tuple2 x0$1) {
        Tuple2 tuple2 = x0$1;
        if (tuple2 != null) {
            byte[] bytes = (byte[])tuple2._1();
            long count = tuple2._2$mcJ$sp();
            SparkConnectPlanExecution.sendBatch$1(bytes, count, sessionId$1, responseObserver$1, numSent$1, totalNumRows$1);
            return;
        }
        throw new MatchError((Object)tuple2);
    }

    public static final /* synthetic */ void $anonfun$processAsArrowBatches$3(Object signal$1, Tuple2[][] partitions$1, int partitionId, Tuple2[] partition) {
        Object object = signal$1;
        synchronized (object) {
            partitions$1[partitionId] = partition;
            signal$1.notify();
        }
    }

    public static final /* synthetic */ void $anonfun$processAsArrowBatches$8(String sessionId$1, StreamObserver responseObserver$1, IntRef numSent$1, LongRef totalNumRows$1, Tuple2 x0$2) {
        Tuple2 tuple2 = x0$2;
        if (tuple2 != null) {
            byte[] bytes = (byte[])tuple2._1();
            long count = tuple2._2$mcJ$sp();
            SparkConnectPlanExecution.sendBatch$1(bytes, count, sessionId$1, responseObserver$1, numSent$1, totalNumRows$1);
            return;
        }
        throw new MatchError((Object)tuple2);
    }

    public SparkConnectPlanExecution(ExecuteHolder executeHolder) {
        this.executeHolder = executeHolder;
        this.sessionHolder = executeHolder.sessionHolder();
        this.session = executeHolder.session();
    }
}

