/*
 * Decompiled with CFR 0.152.
 */
package org.apache.spark.sql.execution.streaming.continuous;

import java.io.Serializable;
import org.apache.spark.Partition;
import org.apache.spark.Partitioner;
import org.apache.spark.SparkEnv$;
import org.apache.spark.TaskContext;
import org.apache.spark.rdd.RDD;
import org.apache.spark.rpc.RpcEndpointRef;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.execution.streaming.continuous.CommitPartitionEpoch;
import org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution$;
import org.apache.spark.sql.execution.streaming.continuous.EpochCoordinatorRef$;
import org.apache.spark.sql.execution.streaming.continuous.EpochTracker$;
import org.apache.spark.sql.sources.v2.writer.DataWriter;
import org.apache.spark.sql.sources.v2.writer.DataWriterFactory;
import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage;
import org.apache.spark.util.Utils$;
import scala.Function0;
import scala.Option;
import scala.Predef$;
import scala.collection.Iterator;
import scala.collection.Seq;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.StringOps;
import scala.package$;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.ObjectRef;
import scala.runtime.java8.JFunction0;

@ScalaSignature(bytes="\u0006\u0001A4Aa\u0003\u0007\u00017!A\u0001\u0006\u0001BA\u0002\u0013\u0005\u0011\u0006\u0003\u00052\u0001\t\u0005\r\u0011\"\u00013\u0011!)\u0004A!A!B\u0013Q\u0003\u0002\u0003\u001c\u0001\u0005\u0003\u0005\u000b\u0011B\u001c\t\u000b\u0005\u0003A\u0011\u0001\"\t\u000f\u001d\u0003!\u0019!C!\u0011\"1\u0001\u000b\u0001Q\u0001\n%CQ!\u0015\u0001\u0005BICQ!\u0017\u0001\u0005BiCQA\u001c\u0001\u0005B=\u0014!cQ8oi&tWo\\;t/JLG/\u001a*E\t*\u0011QBD\u0001\u000bG>tG/\u001b8v_V\u001c(BA\b\u0011\u0003%\u0019HO]3b[&twM\u0003\u0002\u0012%\u0005IQ\r_3dkRLwN\u001c\u0006\u0003'Q\t1a]9m\u0015\t)b#A\u0003ta\u0006\u00148N\u0003\u0002\u00181\u00051\u0011\r]1dQ\u0016T\u0011!G\u0001\u0004_J<7\u0001A\n\u0003\u0001q\u00012!\b\u0011#\u001b\u0005q\"BA\u0010\u0015\u0003\r\u0011H\rZ\u0005\u0003Cy\u00111A\u0015#E!\t\u0019c%D\u0001%\u0015\u0005)\u0013!B:dC2\f\u0017BA\u0014%\u0005\u0011)f.\u001b;\u0002\tA\u0014XM^\u000b\u0002UA\u0019Q\u0004I\u0016\u0011\u00051zS\"A\u0017\u000b\u00059\u0012\u0012\u0001C2bi\u0006d\u0017p\u001d;\n\u0005Aj#aC%oi\u0016\u0014h.\u00197S_^\f\u0001\u0002\u001d:fm~#S-\u001d\u000b\u0003EMBq\u0001\u000e\u0002\u0002\u0002\u0003\u0007!&A\u0002yIE\nQ\u0001\u001d:fm\u0002\n\u0011b\u001e:ji\u0016$\u0016m]6\u0011\u0007az4&D\u0001:\u0015\tQ4(\u0001\u0004xe&$XM\u001d\u0006\u0003yu\n!A\u001e\u001a\u000b\u0005y\u0012\u0012aB:pkJ\u001cWm]\u0005\u0003\u0001f\u0012\u0011\u0003R1uC^\u0013\u0018\u000e^3s\r\u0006\u001cGo\u001c:z\u0003\u0019a\u0014N\\5u}Q\u00191)\u0012$\u0011\u0005\u0011\u0003Q\"\u0001\u0007\t\u000b!*\u0001\u0019\u0001\u0016\t\u000bY*\u0001\u0019A\u001c\u0002\u0017A\f'\u000f^5uS>tWM]\u000b\u0002\u0013B\u00191E\u0013'\n\u0005-##AB(qi&|g\u000e\u0005\u0002N\u001d6\tA#\u0003\u0002P)\tY\u0001+\u0019:uSRLwN\\3s\u00031\u0001\u0018M\u001d;ji&|g.\u001a:!\u000359W\r\u001e)beRLG/[8ogV\t1\u000bE\u0002$)ZK!!\u0016\u0013\u0003\u000b\u0005\u0013(/Y=\u0011\u00055;\u0016B\u0001-\u0015\u0005%\u0001\u0016M\u001d;ji&|g.A\u0004d_6\u0004X\u000f^3\u0015\u0007m;\u0017\u000eE\u0002]I\nr!!\u00182\u000f\u0005y\u000bW\"A0\u000b\u0005\u0001T\u0012A\u0002\u001fs_>$h(C\u0001&\u0013\t\u0019G%A\u0004qC\u000e\\\u0017mZ3\n\u0005\u00154'\u0001C%uKJ\fGo\u001c:\u000b\u0005\r$\u0003\"\u00025\n\u0001\u00041\u0016!B:qY&$\b\"\u00026\n\u0001\u0004Y\u0017aB2p]R,\u0007\u0010\u001e\t\u0003\u001b2L!!\u001c\u000b\u0003\u0017Q\u000b7o[\"p]R,\u0007\u0010^\u0001\u0012G2,\u0017M\u001d#fa\u0016tG-\u001a8dS\u0016\u001cH#\u0001\u0012")
public class ContinuousWriteRDD
extends RDD<BoxedUnit> {
    private RDD<InternalRow> prev;
    private final DataWriterFactory<InternalRow> writeTask;
    private final Option<Partitioner> partitioner;

    public RDD<InternalRow> prev() {
        return this.prev;
    }

    public void prev_$eq(RDD<InternalRow> x$1) {
        this.prev = x$1;
    }

    public Option<Partitioner> partitioner() {
        return this.partitioner;
    }

    public Partition[] getPartitions() {
        return this.prev().partitions();
    }

    public Iterator<BoxedUnit> compute(Partition split, TaskContext context) {
        RpcEndpointRef epochCoordinator = EpochCoordinatorRef$.MODULE$.get(context.getLocalProperty(ContinuousExecution$.MODULE$.EPOCH_COORDINATOR_ID_KEY()), SparkEnv$.MODULE$.get());
        EpochTracker$.MODULE$.initializeCurrentEpoch(new StringOps(Predef$.MODULE$.augmentString(context.getLocalProperty(ContinuousExecution$.MODULE$.START_EPOCH_KEY()))).toLong());
        while (!context.isInterrupted() && !context.isCompleted()) {
            ObjectRef dataWriter = ObjectRef.create(null);
            JFunction0.mcV.sp & Serializable & scala.Serializable x$1 = (JFunction0.mcV.sp & Serializable & scala.Serializable)() -> {
                try {
                    Iterator dataIterator = this.prev().compute(split, context);
                    dataWriter$1.elem = $this.writeTask.createDataWriter(context.partitionId(), context.taskAttemptId(), BoxesRunTime.unboxToLong((Object)EpochTracker$.MODULE$.getCurrentEpoch().get()));
                    while (dataIterator.hasNext()) {
                        ((DataWriter)dataWriter$1.elem).write(dataIterator.next());
                    }
                    this.logInfo((Function0 & Serializable & scala.Serializable)() -> new StringBuilder(46).append("Writer for partition ").append(context.partitionId()).append(" ").append("in epoch ").append(EpochTracker$.MODULE$.getCurrentEpoch().get()).append(" is committing.").toString());
                    WriterCommitMessage msg = ((DataWriter)dataWriter$1.elem).commit();
                    epochCoordinator.send((Object)new CommitPartitionEpoch(context.partitionId(), BoxesRunTime.unboxToLong((Object)EpochTracker$.MODULE$.getCurrentEpoch().get()), msg));
                    this.logInfo((Function0 & Serializable & scala.Serializable)() -> new StringBuilder(42).append("Writer for partition ").append(context.partitionId()).append(" ").append("in epoch ").append(EpochTracker$.MODULE$.getCurrentEpoch().get()).append(" committed.").toString());
                    EpochTracker$.MODULE$.incrementCurrentEpoch();
                }
                catch (InterruptedException interruptedException) {}
            };
            JFunction0.mcV.sp & Serializable & scala.Serializable x$2 = (JFunction0.mcV.sp & Serializable & scala.Serializable)() -> {
                this.logError((Function0 & Serializable & scala.Serializable)() -> new StringBuilder(34).append("Writer for partition ").append(context.partitionId()).append(" is aborting.").toString());
                if ((DataWriter)dataWriter$1.elem != null) {
                    ((DataWriter)dataWriter$1.elem).abort();
                }
                this.logError((Function0 & Serializable & scala.Serializable)() -> new StringBuilder(30).append("Writer for partition ").append(context.partitionId()).append(" aborted.").toString());
            };
            JFunction0.mcV.sp & Serializable & scala.Serializable x$3 = () -> ContinuousWriteRDD.$anonfun$compute$7((Function0)x$1);
            Utils$.MODULE$.tryWithSafeFinallyAndFailureCallbacks((Function0)x$1, (Function0)x$2, (Function0)x$3);
        }
        return package$.MODULE$.Iterator().apply((Seq)Nil$.MODULE$);
    }

    public void clearDependencies() {
        super.clearDependencies();
        this.prev_$eq(null);
    }

    public static final /* synthetic */ void $anonfun$compute$7(Function0 x$1$1) {
        Utils$.MODULE$.tryWithSafeFinallyAndFailureCallbacks$default$3(x$1$1);
    }

    public ContinuousWriteRDD(RDD<InternalRow> prev, DataWriterFactory<InternalRow> writeTask) {
        this.prev = prev;
        this.writeTask = writeTask;
        super(prev, ClassTag$.MODULE$.Unit());
        this.partitioner = this.prev().partitioner();
    }
}

