/*
 * Decompiled with CFR 0.152.
 */
package org.apache.spark.sql.execution.streaming.continuous;

import java.io.Serializable;
import org.apache.spark.Partition;
import org.apache.spark.Partitioner;
import org.apache.spark.SparkEnv$;
import org.apache.spark.TaskContext;
import org.apache.spark.internal.LogEntry$;
import org.apache.spark.internal.LogKey;
import org.apache.spark.internal.LogKeys;
import org.apache.spark.internal.MDC;
import org.apache.spark.rdd.RDD;
import org.apache.spark.rpc.RpcEndpointRef;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.connector.metric.CustomTaskMetric;
import org.apache.spark.sql.connector.write.DataWriter;
import org.apache.spark.sql.connector.write.WriterCommitMessage;
import org.apache.spark.sql.connector.write.streaming.StreamingDataWriterFactory;
import org.apache.spark.sql.execution.metric.CustomMetrics$;
import org.apache.spark.sql.execution.metric.SQLMetric;
import org.apache.spark.sql.execution.streaming.continuous.CommitPartitionEpoch;
import org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution$;
import org.apache.spark.sql.execution.streaming.continuous.EpochCoordinatorRef$;
import org.apache.spark.sql.execution.streaming.continuous.EpochTracker$;
import org.apache.spark.util.ArrayImplicits$;
import org.apache.spark.util.Utils$;
import scala.Function0;
import scala.Option;
import scala.Predef$;
import scala.StringContext;
import scala.collection.Iterator;
import scala.collection.StringOps$;
import scala.collection.immutable.Map;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.Seq;
import scala.package$;
import scala.reflect.ClassTag;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.ObjectRef;
import scala.runtime.ScalaRunTime$;
import scala.runtime.java8.JFunction0;

@ScalaSignature(bytes="\u0006\u0005\u0005\u001da\u0001\u0002\u0007\u000e\u0001qA\u0001\"\u000b\u0001\u0003\u0002\u0004%\tA\u000b\u0005\te\u0001\u0011\t\u0019!C\u0001g!Aa\u0007\u0001B\u0001B\u0003&1\u0006\u0003\u00058\u0001\t\u0005\t\u0015!\u00039\u0011!\t\u0005A!A!\u0002\u0013\u0011\u0005\"\u0002,\u0001\t\u00039\u0006bB/\u0001\u0005\u0004%\tE\u0018\u0005\u0007M\u0002\u0001\u000b\u0011B0\t\u000b\u001d\u0004A\u0011\t5\t\u000b=\u0004A\u0011\t9\t\u000f\u0005\r\u0001\u0001\"\u0011\u0002\u0006\t\u00112i\u001c8uS:,x.^:Xe&$XM\u0015#E\u0015\tqq\"\u0001\u0006d_:$\u0018N\\;pkNT!\u0001E\t\u0002\u0013M$(/Z1nS:<'B\u0001\n\u0014\u0003%)\u00070Z2vi&|gN\u0003\u0002\u0015+\u0005\u00191/\u001d7\u000b\u0005Y9\u0012!B:qCJ\\'B\u0001\r\u001a\u0003\u0019\t\u0007/Y2iK*\t!$A\u0002pe\u001e\u001c\u0001a\u0005\u0002\u0001;A\u0019a$I\u0012\u000e\u0003}Q!\u0001I\u000b\u0002\u0007I$G-\u0003\u0002#?\t\u0019!\u000b\u0012#\u0011\u0005\u0011:S\"A\u0013\u000b\u0003\u0019\nQa]2bY\u0006L!\u0001K\u0013\u0003\tUs\u0017\u000e^\u0001\u0005aJ,g/F\u0001,!\rq\u0012\u0005\f\t\u0003[Aj\u0011A\f\u0006\u0003_M\t\u0001bY1uC2L8\u000f^\u0005\u0003c9\u00121\"\u00138uKJt\u0017\r\u001c*po\u0006A\u0001O]3w?\u0012*\u0017\u000f\u0006\u0002$i!9QGAA\u0001\u0002\u0004Y\u0013a\u0001=%c\u0005)\u0001O]3wA\u0005iqO]5uKJ4\u0015m\u0019;pef\u0004\"!O \u000e\u0003iR!\u0001E\u001e\u000b\u0005qj\u0014!B<sSR,'B\u0001 \u0014\u0003%\u0019wN\u001c8fGR|'/\u0003\u0002Au\tQ2\u000b\u001e:fC6Lgn\u001a#bi\u0006<&/\u001b;fe\u001a\u000b7\r^8ss\u0006i1-^:u_6lU\r\u001e:jGN\u0004Ba\u0011&N!:\u0011A\t\u0013\t\u0003\u000b\u0016j\u0011A\u0012\u0006\u0003\u000fn\ta\u0001\u0010:p_Rt\u0014BA%&\u0003\u0019\u0001&/\u001a3fM&\u00111\n\u0014\u0002\u0004\u001b\u0006\u0004(BA%&!\t\u0019e*\u0003\u0002P\u0019\n11\u000b\u001e:j]\u001e\u0004\"!\u0015+\u000e\u0003IS!aU\t\u0002\r5,GO]5d\u0013\t)&KA\u0005T#2kU\r\u001e:jG\u00061A(\u001b8jiz\"B\u0001\u0017.\\9B\u0011\u0011\fA\u0007\u0002\u001b!)\u0011F\u0002a\u0001W!)qG\u0002a\u0001q!)\u0011I\u0002a\u0001\u0005\u0006Y\u0001/\u0019:uSRLwN\\3s+\u0005y\u0006c\u0001\u0013aE&\u0011\u0011-\n\u0002\u0007\u001fB$\u0018n\u001c8\u0011\u0005\r$W\"A\u000b\n\u0005\u0015,\"a\u0003)beRLG/[8oKJ\fA\u0002]1si&$\u0018n\u001c8fe\u0002\nQbZ3u!\u0006\u0014H/\u001b;j_:\u001cX#A5\u0011\u0007\u0011RG.\u0003\u0002lK\t)\u0011I\u001d:bsB\u00111-\\\u0005\u0003]V\u0011\u0011\u0002U1si&$\u0018n\u001c8\u0002\u000f\r|W\u000e];uKR\u0019\u0011O\u001f?\u0011\u0007I<8E\u0004\u0002tk:\u0011Q\t^\u0005\u0002M%\u0011a/J\u0001\ba\u0006\u001c7.Y4f\u0013\tA\u0018P\u0001\u0005Ji\u0016\u0014\u0018\r^8s\u0015\t1X\u0005C\u0003|\u0015\u0001\u0007A.A\u0003ta2LG\u000fC\u0003~\u0015\u0001\u0007a0A\u0004d_:$X\r\u001f;\u0011\u0005\r|\u0018bAA\u0001+\tYA+Y:l\u0007>tG/\u001a=u\u0003E\u0019G.Z1s\t\u0016\u0004XM\u001c3f]\u000eLWm\u001d\u000b\u0002G\u0001")
public class ContinuousWriteRDD
extends RDD<BoxedUnit> {
    private RDD<InternalRow> prev;
    private final StreamingDataWriterFactory writerFactory;
    private final Map<String, SQLMetric> customMetrics;
    private final Option<Partitioner> partitioner;

    public RDD<InternalRow> prev() {
        return this.prev;
    }

    public void prev_$eq(RDD<InternalRow> x$1) {
        this.prev = x$1;
    }

    public Option<Partitioner> partitioner() {
        return this.partitioner;
    }

    public Partition[] getPartitions() {
        return this.prev().partitions();
    }

    public Iterator<BoxedUnit> compute(Partition split, TaskContext context) {
        RpcEndpointRef epochCoordinator = EpochCoordinatorRef$.MODULE$.get(context.getLocalProperty(ContinuousExecution$.MODULE$.EPOCH_COORDINATOR_ID_KEY()), SparkEnv$.MODULE$.get());
        EpochTracker$.MODULE$.initializeCurrentEpoch(StringOps$.MODULE$.toLong$extension(Predef$.MODULE$.augmentString(context.getLocalProperty(ContinuousExecution$.MODULE$.START_EPOCH_KEY()))));
        while (!context.isInterrupted() && !context.isCompleted()) {
            ObjectRef dataWriter = ObjectRef.create(null);
            Utils$.MODULE$.tryWithSafeFinallyAndFailureCallbacks((Function0)(JFunction0.mcV.sp & Serializable)() -> {
                try {
                    Iterator dataIterator = this.prev().compute(split, context);
                    dataWriter$1.elem = $this.writerFactory.createWriter(context.partitionId(), context.taskAttemptId(), BoxesRunTime.unboxToLong((Object)EpochTracker$.MODULE$.getCurrentEpoch().get()));
                    long count = 0L;
                    while (dataIterator.hasNext()) {
                        if (count % (long)CustomMetrics$.MODULE$.NUM_ROWS_PER_UPDATE() == 0L) {
                            CustomMetrics$.MODULE$.updateMetrics((Seq<CustomTaskMetric>)ArrayImplicits$.MODULE$.SparkArrayOps((Object)((DataWriter)dataWriter$1.elem).currentMetricsValues()).toImmutableArraySeq(), $this.customMetrics);
                        }
                        ++count;
                        ((DataWriter)dataWriter$1.elem).write(dataIterator.next());
                    }
                    CustomMetrics$.MODULE$.updateMetrics((Seq<CustomTaskMetric>)ArrayImplicits$.MODULE$.SparkArrayOps((Object)((DataWriter)dataWriter$1.elem).currentMetricsValues()).toImmutableArraySeq(), $this.customMetrics);
                    this.logInfo(LogEntry$.MODULE$.from((Function0 & Serializable)() -> this.LogStringContext(new StringContext((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new String[]{"Writer for partition ", " "}))).log((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new MDC[]{new MDC((LogKey)LogKeys.PARTITION_ID$.MODULE$, (Object)BoxesRunTime.boxToInteger((int)context.partitionId()))})).$plus(this.LogStringContext(new StringContext((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new String[]{"in epoch ", " is committing."}))).log((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new MDC[]{new MDC((LogKey)LogKeys.EPOCH$.MODULE$, EpochTracker$.MODULE$.getCurrentEpoch().get())})))));
                    WriterCommitMessage msg = ((DataWriter)dataWriter$1.elem).commit();
                    epochCoordinator.send((Object)new CommitPartitionEpoch(context.partitionId(), BoxesRunTime.unboxToLong((Object)EpochTracker$.MODULE$.getCurrentEpoch().get()), msg));
                    this.logInfo(LogEntry$.MODULE$.from((Function0 & Serializable)() -> this.LogStringContext(new StringContext((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new String[]{"Writer for partition ", " "}))).log((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new MDC[]{new MDC((LogKey)LogKeys.PARTITION_ID$.MODULE$, (Object)BoxesRunTime.boxToInteger((int)context.partitionId()))})).$plus(this.LogStringContext(new StringContext((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new String[]{"in epoch ", " committed."}))).log((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new MDC[]{new MDC((LogKey)LogKeys.EPOCH$.MODULE$, EpochTracker$.MODULE$.getCurrentEpoch().get())})))));
                    EpochTracker$.MODULE$.incrementCurrentEpoch();
                }
                catch (InterruptedException interruptedException) {}
            }, (Function0)(JFunction0.mcV.sp & Serializable)() -> {
                this.logError((Function0 & Serializable)() -> "Writer for partition " + context.partitionId() + " is aborting.");
                if ((DataWriter)dataWriter$1.elem != null) {
                    ((DataWriter)dataWriter$1.elem).abort();
                }
                this.logError((Function0 & Serializable)() -> "Writer for partition " + context.partitionId() + " aborted.");
            }, (Function0)(JFunction0.mcV.sp & Serializable)() -> {
                if ((DataWriter)dataWriter$1.elem != null) {
                    ((DataWriter)dataWriter$1.elem).close();
                    return;
                }
            });
        }
        return package$.MODULE$.Iterator().apply((Seq)Nil$.MODULE$);
    }

    public void clearDependencies() {
        super.clearDependencies();
        this.prev_$eq(null);
    }

    public ContinuousWriteRDD(RDD<InternalRow> prev, StreamingDataWriterFactory writerFactory, Map<String, SQLMetric> customMetrics) {
        this.prev = prev;
        this.writerFactory = writerFactory;
        this.customMetrics = customMetrics;
        super(prev, (ClassTag)ClassTag$.MODULE$.Unit());
        this.partitioner = this.prev().partitioner();
    }
}

