/*
 * Decompiled with CFR 0.152.
 */
package org.apache.spark.sql.execution.streaming.continuous;

import java.io.Serializable;
import java.lang.invoke.MethodHandle;
import java.lang.invoke.SerializedLambda;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.UnaryOperator;
import org.apache.spark.SparkEnv$;
import org.apache.spark.rdd.RDD;
import org.apache.spark.rpc.RpcEndpointRef;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.expressions.Attribute;
import org.apache.spark.sql.catalyst.expressions.CurrentDate;
import org.apache.spark.sql.catalyst.expressions.CurrentTimestamp;
import org.apache.spark.sql.catalyst.expressions.Expression;
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
import org.apache.spark.sql.connector.catalog.SupportsRead;
import org.apache.spark.sql.connector.catalog.SupportsWrite;
import org.apache.spark.sql.connector.catalog.Table;
import org.apache.spark.sql.connector.catalog.TableCapability;
import org.apache.spark.sql.connector.catalog.TableProvider;
import org.apache.spark.sql.connector.read.Scan;
import org.apache.spark.sql.connector.read.streaming.ContinuousStream;
import org.apache.spark.sql.connector.read.streaming.Offset;
import org.apache.spark.sql.connector.read.streaming.PartitionOffset;
import org.apache.spark.sql.connector.read.streaming.SparkDataStream;
import org.apache.spark.sql.execution.SQLExecution$;
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits$;
import org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relation;
import org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relation$;
import org.apache.spark.sql.execution.streaming.ACTIVE$;
import org.apache.spark.sql.execution.streaming.CommitMetadata;
import org.apache.spark.sql.execution.streaming.CommitMetadata$;
import org.apache.spark.sql.execution.streaming.ContinuousTrigger;
import org.apache.spark.sql.execution.streaming.IncrementalExecution;
import org.apache.spark.sql.execution.streaming.OffsetSeq;
import org.apache.spark.sql.execution.streaming.OffsetSeq$;
import org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor;
import org.apache.spark.sql.execution.streaming.ProcessingTimeTrigger;
import org.apache.spark.sql.execution.streaming.RECONFIGURING$;
import org.apache.spark.sql.execution.streaming.State;
import org.apache.spark.sql.execution.streaming.StreamExecution;
import org.apache.spark.sql.execution.streaming.StreamExecution$;
import org.apache.spark.sql.execution.streaming.StreamingRelationV2;
import org.apache.spark.sql.execution.streaming.TERMINATED$;
import org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution$;
import org.apache.spark.sql.execution.streaming.continuous.EpochCoordinatorRef$;
import org.apache.spark.sql.execution.streaming.continuous.IncrementAndGetEpoch$;
import org.apache.spark.sql.execution.streaming.continuous.StopContinuousExecutionWrites$;
import org.apache.spark.sql.execution.streaming.continuous.WriteToContinuousDataSource;
import org.apache.spark.sql.streaming.OutputMode;
import org.apache.spark.sql.streaming.Trigger;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
import org.apache.spark.util.Clock;
import scala.Function0;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.PartialFunction;
import scala.Predef;
import scala.Predef$;
import scala.Some;
import scala.Tuple2;
import scala.collection.GenTraversableOnce;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.immutable.Map;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.Map$;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.IntRef;
import scala.runtime.LambdaDeserialize;
import scala.runtime.java8.JFunction0;

@ScalaSignature(bytes="\u0006\u0001\tMb\u0001\u0002\u0017.\u0001qB\u0011\"\u0011\u0001\u0003\u0002\u0003\u0006IA\u0011$\t\u0013\u001d\u0003!\u0011!Q\u0001\n!+\u0006\u0002\u0003,\u0001\u0005\u0003\u0005\u000b\u0011\u0002%\t\u0011]\u0003!\u0011!Q\u0001\naC\u0011B\u0019\u0001\u0003\u0002\u0003\u0006IaY6\t\u00131\u0004!\u0011!Q\u0001\n5\u0014\b\"C:\u0001\u0005\u0003\u0005\u000b\u0011\u0002;{\u0011%Y\bA!A!\u0002\u0013ax\u0010\u0003\u0006\u0002\u0002\u0001\u0011\t\u0011)A\u0005\u0003\u0007A!\"!\u0003\u0001\u0005\u0003\u0005\u000b\u0011BA\u0006\u0011\u001d\t\u0019\u0002\u0001C\u0001\u0003+A\u0011\"a\f\u0001\u0001\u0004%\t\"!\r\t\u0013\u0005M\u0003\u00011A\u0005\u0012\u0005U\u0003\u0002CA1\u0001\u0001\u0006K!a\r\t\u0019\u0005-\u0004\u00011AA\u0002\u0013\u00051'!\u001c\t\u0019\u0005=\u0004\u00011AA\u0002\u0013\u00051'!\u001d\t\u0015\u0005U\u0004\u00011A\u0001B\u0003&\u0001\nC\u0005\u0002x\u0001\u0011\r\u0011\"\u0003\u0002z!A\u0011q\u0013\u0001!\u0002\u0013\tY\bC\u0005\u0002\u001a\u0002\u0011\r\u0011\"\u0011\u0002\u001c\"A\u00111\u0015\u0001!\u0002\u0013\ti\nC\u0005\u0002&\u0002\u0011\r\u0011\"\u0003\u0002(\"A\u0011q\u0016\u0001!\u0002\u0013\tI\u000bC\u0004\u00022\u0002!\t&a-\t\u000f\u0005e\u0006\u0001\"\u0003\u0002<\"9\u0011q\u0019\u0001\u0005\n\u0005%\u0007bBAh\u0001\u0011\u0005\u0011\u0011\u001b\u0005\b\u0003[\u0004A\u0011AAx\u0011!\t\u0019\u0010\u0001C\u0001g\u0005U\bbBA}\u0001\u0011\u0005\u00111 \u0005\b\u0003s\u0004A\u0011\u0002B\u0001\u0011\u001d\u0011\u0019\u0001\u0001C!\u0005\u0003AQB!\u0002\u0001!\u0003\r\t\u0011!C\u0005\u0005\u000fy\b\"\u0004B\u0005\u0001A\u0005\u0019\u0011!A\u0005\n\t-aiB\u0004\u0003\u000e5B\tAa\u0004\u0007\r1j\u0003\u0012\u0001B\t\u0011\u001d\t\u0019\u0002\nC\u0001\u00053A\u0011Ba\u0007%\u0005\u0004%\tA!\b\t\u0011\t%B\u0005)A\u0005\u0005?A\u0011Ba\u000b%\u0005\u0004%\tA!\b\t\u0011\t5B\u0005)A\u0005\u0005?A\u0011Ba\f%\u0005\u0004%\tA!\b\t\u0011\tEB\u0005)A\u0005\u0005?\u00111cQ8oi&tWo\\;t\u000bb,7-\u001e;j_:T!AL\u0018\u0002\u0015\r|g\u000e^5ok>,8O\u0003\u00021c\u0005I1\u000f\u001e:fC6Lgn\u001a\u0006\u0003eM\n\u0011\"\u001a=fGV$\u0018n\u001c8\u000b\u0005Q*\u0014aA:rY*\u0011agN\u0001\u0006gB\f'o\u001b\u0006\u0003qe\na!\u00199bG\",'\"\u0001\u001e\u0002\u0007=\u0014xm\u0001\u0001\u0014\u0005\u0001i\u0004C\u0001 @\u001b\u0005y\u0013B\u0001!0\u0005=\u0019FO]3b[\u0016CXmY;uS>t\u0017\u0001D:qCJ\\7+Z:tS>t\u0007CA\"E\u001b\u0005\u0019\u0014BA#4\u00051\u0019\u0006/\u0019:l'\u0016\u001c8/[8o\u0013\t\tu(\u0001\u0003oC6,\u0007CA%S\u001d\tQ\u0005\u000b\u0005\u0002L\u001d6\tAJ\u0003\u0002Nw\u00051AH]8pizR\u0011aT\u0001\u0006g\u000e\fG.Y\u0005\u0003#:\u000ba\u0001\u0015:fI\u00164\u0017BA*U\u0005\u0019\u0019FO]5oO*\u0011\u0011KT\u0005\u0003\u000f~\nab\u00195fG.\u0004x.\u001b8u%>|G/\u0001\u0007b]\u0006d\u0017P_3e!2\fg\u000e\u0005\u0002ZA6\t!L\u0003\u0002\\9\u00069An\\4jG\u0006d'BA/_\u0003\u0015\u0001H.\u00198t\u0015\ty6'\u0001\u0005dCR\fG._:u\u0013\t\t'LA\u0006M_\u001eL7-\u00197QY\u0006t\u0017\u0001B:j].\u0004\"\u0001Z5\u000e\u0003\u0015T!AZ4\u0002\u000f\r\fG/\u00197pO*\u0011\u0001nM\u0001\nG>tg.Z2u_JL!A[3\u0003\u001bM+\b\u000f]8siN<&/\u001b;f\u0013\t\u0011w(A\u0004ue&<w-\u001a:\u0011\u00059\u0004X\"A8\u000b\u0005A\u001a\u0014BA9p\u0005\u001d!&/[4hKJL!\u0001\\ \u0002\u0019Q\u0014\u0018nZ4fe\u000ecwnY6\u0011\u0005UDX\"\u0001<\u000b\u0005],\u0014\u0001B;uS2L!!\u001f<\u0003\u000b\rcwnY6\n\u0005M|\u0014AC8viB,H/T8eKB\u0011a.`\u0005\u0003}>\u0014!bT;uaV$Xj\u001c3f\u0013\tYx(\u0001\u0007fqR\u0014\u0018m\u00149uS>t7\u000fE\u0003J\u0003\u000bA\u0005*C\u0002\u0002\bQ\u00131!T1q\u0003Y!W\r\\3uK\u000eCWmY6q_&tGo\u00148Ti>\u0004\b\u0003BA\u0007\u0003\u001fi\u0011AT\u0005\u0004\u0003#q%a\u0002\"p_2,\u0017M\\\u0001\u0007y%t\u0017\u000e\u001e \u0015-\u0005]\u00111DA\u000f\u0003?\t\t#a\t\u0002&\u0005\u001d\u0012\u0011FA\u0016\u0003[\u00012!!\u0007\u0001\u001b\u0005i\u0003\"B!\f\u0001\u0004\u0011\u0005\"B$\f\u0001\u0004A\u0005\"\u0002,\f\u0001\u0004A\u0005\"B,\f\u0001\u0004A\u0006\"\u00022\f\u0001\u0004\u0019\u0007\"\u00027\f\u0001\u0004i\u0007\"B:\f\u0001\u0004!\b\"B>\f\u0001\u0004a\bbBA\u0001\u0017\u0001\u0007\u00111\u0001\u0005\b\u0003\u0013Y\u0001\u0019AA\u0006\u0003\u001d\u0019x.\u001e:dKN,\"!a\r\u0011\r\u0005U\u0012qHA#\u001d\u0011\t9$a\u000f\u000f\u0007-\u000bI$C\u0001P\u0013\r\tiDT\u0001\ba\u0006\u001c7.Y4f\u0013\u0011\t\t%a\u0011\u0003\u0007M+\u0017OC\u0002\u0002>9\u0003B!a\u0012\u0002P5\u0011\u0011\u0011\n\u0006\u0004a\u0005-#bAA'O\u0006!!/Z1e\u0013\u0011\t\t&!\u0013\u0003!\r{g\u000e^5ok>,8o\u0015;sK\u0006l\u0017aC:pkJ\u001cWm]0%KF$B!a\u0016\u0002^A!\u0011QBA-\u0013\r\tYF\u0014\u0002\u0005+:LG\u000fC\u0005\u0002`5\t\t\u00111\u0001\u00024\u0005\u0019\u0001\u0010J\u0019\u0002\u0011M|WO]2fg\u0002B3ADA3!\u0011\ti!a\u001a\n\u0007\u0005%dJ\u0001\u0005w_2\fG/\u001b7f\u0003e\u0019WO\u001d:f]R,\u0005o\\2i\u0007>|'\u000fZ5oCR|'/\u00133\u0016\u0003!\u000bQdY;se\u0016tG/\u00129pG\"\u001cun\u001c:eS:\fGo\u001c:JI~#S-\u001d\u000b\u0005\u0003/\n\u0019\b\u0003\u0005\u0002`A\t\t\u00111\u0001I\u0003i\u0019WO\u001d:f]R,\u0005o\\2i\u0007>|'\u000fZ5oCR|'/\u00133!\u0003\u001d1\u0017-\u001b7ve\u0016,\"!a\u001f\u0011\r\u0005u\u0014QRAI\u001b\t\tyH\u0003\u0003\u0002\u0002\u0006\r\u0015AB1u_6L7M\u0003\u0003\u0002\u0006\u0006\u001d\u0015AC2p]\u000e,(O]3oi*\u0019q/!#\u000b\u0005\u0005-\u0015\u0001\u00026bm\u0006LA!a$\u0002\u0000\ty\u0011\t^8nS\u000e\u0014VMZ3sK:\u001cW\r\u0005\u0003\u00026\u0005M\u0015\u0002BAK\u0003\u0007\u0012\u0011\u0002\u00165s_^\f'\r\\3\u0002\u0011\u0019\f\u0017\u000e\\;sK\u0002\n1\u0002\\8hS\u000e\fG\u000e\u00157b]V\u0011\u0011Q\u0014\t\u0005\u00033\ty*C\u0002\u0002\"6\u00121d\u0016:ji\u0016$vnQ8oi&tWo\\;t\t\u0006$\u0018mU8ve\u000e,\u0017\u0001\u00047pO&\u001c\u0017\r\u001c)mC:\u0004\u0013a\u0004;sS\u001e<WM]#yK\u000e,Ho\u001c:\u0016\u0005\u0005%\u0006c\u0001 \u0002,&\u0019\u0011QV\u0018\u0003-A\u0013xnY3tg&tw\rV5nK\u0016CXmY;u_J\f\u0001\u0003\u001e:jO\u001e,'/\u0012=fGV$xN\u001d\u0011\u0002%I,h.Q2uSZ\fG/\u001a3TiJ,\u0017-\u001c\u000b\u0005\u0003/\n)\f\u0003\u0004\u00028b\u0001\rAQ\u0001\u0016gB\f'o[*fgNLwN\u001c$peN#(/Z1n\u0003=9W\r^*uCJ$xJ\u001a4tKR\u001cH\u0003BA_\u0003\u0007\u00042APA`\u0013\r\t\tm\f\u0002\n\u001f\u001a47/\u001a;TKFDa!!2\u001a\u0001\u0004\u0011\u0015\u0001G:qCJ\\7+Z:tS>tGk\u001c*v]\n\u000bGo\u00195fg\u0006i!/\u001e8D_:$\u0018N\\;pkN$B!a\u0016\u0002L\"1\u0011Q\u001a\u000eA\u0002\t\u000bAc\u001d9be.\u001cVm]:j_:4uN])vKJL\u0018!C1eI>3gm]3u)!\t9&a5\u0002^\u0006\u0005\bbBAk7\u0001\u0007\u0011q[\u0001\u0006KB|7\r\u001b\t\u0005\u0003\u001b\tI.C\u0002\u0002\\:\u0013A\u0001T8oO\"9\u0011q\\\u000eA\u0002\u0005\u0015\u0013AB:ue\u0016\fW\u000eC\u0004\u0002dn\u0001\r!!:\u0002!A\f'\u000f^5uS>twJ\u001a4tKR\u001c\bCBA\u001b\u0003\u007f\t9\u000f\u0005\u0003\u0002H\u0005%\u0018\u0002BAv\u0003\u0013\u0012q\u0002U1si&$\u0018n\u001c8PM\u001a\u001cX\r^\u0001\u0007G>lW.\u001b;\u0015\t\u0005]\u0013\u0011\u001f\u0005\b\u0003+d\u0002\u0019AAl\u0003)\tw/Y5u\u000bB|7\r\u001b\u000b\u0005\u0003/\n9\u0010C\u0004\u0002Vv\u0001\r!a6\u0002\u001fM$x\u000e]%o\u001d\u0016<H\u000b\u001b:fC\u0012$B!a\u0016\u0002~\"9\u0011q \u0010A\u0002\u0005E\u0015!B3se>\u0014HCAA,\u0003\u0011\u0019Ho\u001c9\u0002!M,\b/\u001a:%_V$\b/\u001e;N_\u0012,W#\u0001?\u0002%M,\b/\u001a:%gB\f'o[*fgNLwN\\\u000b\u0002\u0005\u0006\u00192i\u001c8uS:,x.^:Fq\u0016\u001cW\u000f^5p]B\u0019\u0011\u0011\u0004\u0013\u0014\u0007\u0011\u0012\u0019\u0002\u0005\u0003\u0002\u000e\tU\u0011b\u0001B\f\u001d\n1\u0011I\\=SK\u001a$\"Aa\u0004\u0002\u001fM#\u0016I\u0015+`\u000bB{5\tS0L\u000bf+\"Aa\b\u0011\t\t\u0005\"qE\u0007\u0003\u0005GQAA!\n\u0002\n\u0006!A.\u00198h\u0013\r\u0019&1E\u0001\u0011'R\u000b%\u000bV0F!>\u001b\u0005jX&F3\u0002\n\u0001$\u0012)P\u0007\"{6iT(S\t&s\u0015\tV(S?&#ulS#Z\u0003e)\u0005kT\"I?\u000e{uJ\u0015#J\u001d\u0006#vJU0J\t~[U)\u0017\u0011\u0002%\u0015\u0003vj\u0011%`\u0013:#VI\u0015,B\u0019~[U)W\u0001\u0014\u000bB{5\tS0J\u001dR+%KV!M?.+\u0015\f\t")
public class ContinuousExecution
extends StreamExecution {
    private volatile Seq<ContinuousStream> sources = (Seq)Seq$.MODULE$.apply((Seq)Nil$.MODULE$);
    private String currentEpochCoordinatorId;
    private final AtomicReference<Throwable> failure = new AtomicReference<Object>(null);
    private final WriteToContinuousDataSource logicalPlan;
    private final ProcessingTimeExecutor org$apache$spark$sql$execution$streaming$continuous$ContinuousExecution$$triggerExecutor;

    public static String EPOCH_INTERVAL_KEY() {
        return ContinuousExecution$.MODULE$.EPOCH_INTERVAL_KEY();
    }

    public static String EPOCH_COORDINATOR_ID_KEY() {
        return ContinuousExecution$.MODULE$.EPOCH_COORDINATOR_ID_KEY();
    }

    public static String START_EPOCH_KEY() {
        return ContinuousExecution$.MODULE$.START_EPOCH_KEY();
    }

    private /* synthetic */ OutputMode super$outputMode() {
        return super.outputMode();
    }

    private /* synthetic */ SparkSession super$sparkSession() {
        return super.sparkSession();
    }

    public Seq<ContinuousStream> sources() {
        return this.sources;
    }

    public void sources_$eq(Seq<ContinuousStream> x$1) {
        this.sources = x$1;
    }

    public String currentEpochCoordinatorId() {
        return this.currentEpochCoordinatorId;
    }

    public void currentEpochCoordinatorId_$eq(String x$1) {
        this.currentEpochCoordinatorId = x$1;
    }

    private AtomicReference<Throwable> failure() {
        return this.failure;
    }

    @Override
    public WriteToContinuousDataSource logicalPlan() {
        return this.logicalPlan;
    }

    public ProcessingTimeExecutor org$apache$spark$sql$execution$streaming$continuous$ContinuousExecution$$triggerExecutor() {
        return this.org$apache$spark$sql$execution$streaming$continuous$ContinuousExecution$$triggerExecutor;
    }

    @Override
    public void runActivatedStream(SparkSession sparkSessionForStream) {
        UnaryOperator<State> stateUpdate = new UnaryOperator<State>(null){

            public State apply(State s) {
                State state = s;
                State state2 = RECONFIGURING$.MODULE$.equals(state) ? ACTIVE$.MODULE$ : s;
                return state2;
            }
        };
        while (true) {
            this.runContinuous(sparkSessionForStream);
            State state = this.state().updateAndGet(stateUpdate);
            ACTIVE$ aCTIVE$ = ACTIVE$.MODULE$;
            if (state == null) {
                if (aCTIVE$ == null) continue;
                break;
            }
            if (!state.equals(aCTIVE$)) break;
        }
        this.stopSources();
    }

    private OffsetSeq getStartOffsets(SparkSession sparkSessionToRunBatches) {
        OffsetSeq offsetSeq;
        Some some;
        Tuple2 tuple2;
        Option option = this.commitLog().getLatest();
        if (option instanceof Some && (tuple2 = (Tuple2)(some = (Some)option).value()) != null) {
            long latestEpochId = tuple2._1$mcJ$sp();
            this.updateStatusMessage(new StringBuilder(67).append("Starting new streaming query ").append("and getting offsets from latest epoch ").append(latestEpochId).toString());
            OffsetSeq nextOffsets = (OffsetSeq)this.offsetLog().get(latestEpochId).getOrElse((Function0 & Serializable & scala.Serializable)() -> {
                throw new IllegalStateException(new StringBuilder(47).append("Batch ").append(latestEpochId).append(" was committed without end epoch offsets!").toString());
            });
            this.committedOffsets_$eq(nextOffsets.toStreamProgress(this.sources()));
            this.currentBatchId_$eq(latestEpochId + 1L);
            this.logDebug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(42).append("Resuming at epoch ").append(this.currentBatchId()).append(" with committed offsets ").append(this.committedOffsets()).toString());
            offsetSeq = nextOffsets;
        } else if (None$.MODULE$.equals(option)) {
            this.updateStatusMessage("Starting new streaming query");
            this.logInfo((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> "Starting new streaming query.");
            this.currentBatchId_$eq(0L);
            offsetSeq = OffsetSeq$.MODULE$.fill((Seq<Offset>)((Seq)this.sources().map((Function1 & Serializable & scala.Serializable)x$1 -> null, Seq$.MODULE$.canBuildFrom())));
        } else {
            throw new MatchError(option);
        }
        return offsetSeq;
    }

    private void runContinuous(SparkSession sparkSessionForQuery) {
        block8: {
            OffsetSeq offsets = this.getStartOffsets(sparkSessionForQuery);
            LogicalPlan withNewSources = (LogicalPlan)this.logicalPlan().transform((PartialFunction)new scala.Serializable(null, offsets){
                public static final long serialVersionUID = 0L;
                private final OffsetSeq offsets$1;

                public final <A1 extends LogicalPlan, B1> B1 applyOrElse(A1 x1, Function1<A1, B1> function1) {
                    Object object;
                    A1 A1 = x1;
                    if (A1 instanceof StreamingDataSourceV2Relation) {
                        StreamingDataSourceV2Relation streamingDataSourceV2Relation = (StreamingDataSourceV2Relation)A1;
                        Option loggedOffset = (Option)this.offsets$1.offsets().apply(0);
                        Option realOffset = loggedOffset.map((Function1 & Serializable & scala.Serializable)off -> streamingDataSourceV2Relation.stream().deserializeOffset(off.json()));
                        Offset startOffset = (Offset)realOffset.getOrElse((Function0 & Serializable & scala.Serializable)() -> streamingDataSourceV2Relation.stream().initialOffset());
                        Some x$1 = new Some((Object)startOffset);
                        Seq x$2 = streamingDataSourceV2Relation.copy$default$1();
                        Scan x$3 = streamingDataSourceV2Relation.copy$default$2();
                        SparkDataStream x$4 = streamingDataSourceV2Relation.copy$default$3();
                        Option x$5 = streamingDataSourceV2Relation.copy$default$5();
                        object = streamingDataSourceV2Relation.copy(x$2, x$3, x$4, (Option)x$1, x$5);
                    } else {
                        object = function1.apply(x1);
                    }
                    return (B1)object;
                }

                public final boolean isDefinedAt(LogicalPlan x1) {
                    LogicalPlan logicalPlan2 = x1;
                    boolean bl = logicalPlan2 instanceof StreamingDataSourceV2Relation;
                    return bl;
                }
                {
                    this.offsets$1 = offsets$1;
                }

                private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
                    return LambdaDeserialize.bootstrap("lambdaDeserialize", new MethodHandle[]{$anonfun$applyOrElse$3(org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relation org.apache.spark.sql.connector.read.streaming.Offset ), $anonfun$applyOrElse$4(org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relation )}, serializedLambda);
                }
            });
            withNewSources.transformAllExpressions((PartialFunction)new scala.Serializable(null){
                public static final long serialVersionUID = 0L;

                public final <A1 extends Expression, B1> B1 applyOrElse(A1 x2, Function1<A1, B1> function1) {
                    A1 A1 = x2;
                    boolean bl = A1 instanceof CurrentTimestamp ? true : A1 instanceof CurrentDate;
                    if (bl) {
                        throw new IllegalStateException("CurrentTimestamp and CurrentDate not yet supported for continuous processing");
                    }
                    Object object = function1.apply(x2);
                    return (B1)object;
                }

                public final boolean isDefinedAt(Expression x2) {
                    Expression expression = x2;
                    boolean bl = expression instanceof CurrentTimestamp ? true : expression instanceof CurrentDate;
                    boolean bl2 = bl;
                    return bl2;
                }
            });
            this.reportTimeTaken("queryPlanning", (Function0 & Serializable & scala.Serializable)() -> {
                this.lastExecution_$eq(new IncrementalExecution(sparkSessionForQuery, withNewSources, this.super$outputMode(), this.checkpointFile("state"), this.id(), this.runId(), this.currentBatchId(), this.offsetSeqMetadata()));
                return this.lastExecution().executedPlan();
            });
            ContinuousStream stream = (ContinuousStream)withNewSources.collect((PartialFunction)new scala.Serializable(null){
                public static final long serialVersionUID = 0L;

                public final <A1 extends LogicalPlan, B1> B1 applyOrElse(A1 x3, Function1<A1, B1> function1) {
                    Object object;
                    A1 A1 = x3;
                    if (A1 instanceof StreamingDataSourceV2Relation) {
                        StreamingDataSourceV2Relation streamingDataSourceV2Relation = (StreamingDataSourceV2Relation)A1;
                        object = (ContinuousStream)streamingDataSourceV2Relation.stream();
                    } else {
                        object = function1.apply(x3);
                    }
                    return (B1)object;
                }

                public final boolean isDefinedAt(LogicalPlan x3) {
                    LogicalPlan logicalPlan2 = x3;
                    boolean bl = logicalPlan2 instanceof StreamingDataSourceV2Relation;
                    return bl;
                }
            }).head();
            sparkSessionForQuery.sparkContext().setLocalProperty(StreamExecution$.MODULE$.IS_CONTINUOUS_PROCESSING(), ((Object)BoxesRunTime.boxToBoolean((boolean)true)).toString());
            sparkSessionForQuery.sparkContext().setLocalProperty(ContinuousExecution$.MODULE$.START_EPOCH_KEY(), ((Object)BoxesRunTime.boxToLong((long)this.currentBatchId())).toString());
            String epochCoordinatorId = new StringBuilder(2).append(this.runId()).append("--").append(UUID.randomUUID()).toString();
            this.currentEpochCoordinatorId_$eq(epochCoordinatorId);
            sparkSessionForQuery.sparkContext().setLocalProperty(ContinuousExecution$.MODULE$.EPOCH_COORDINATOR_ID_KEY(), epochCoordinatorId);
            sparkSessionForQuery.sparkContext().setLocalProperty(ContinuousExecution$.MODULE$.EPOCH_INTERVAL_KEY(), ((Object)BoxesRunTime.boxToLong((long)((ContinuousTrigger)super.trigger()).intervalMs())).toString());
            RpcEndpointRef epochEndpoint = EpochCoordinatorRef$.MODULE$.create(this.logicalPlan().write(), stream, this, epochCoordinatorId, this.currentBatchId(), super.sparkSession(), SparkEnv$.MODULE$.get());
            Thread epochUpdateThread = new Thread(new Runnable(this, stream, epochEndpoint){
                private final /* synthetic */ ContinuousExecution $outer;
                private final ContinuousStream stream$1;
                private final RpcEndpointRef epochEndpoint$1;

                public void run() {
                    try {
                        this.$outer.org$apache$spark$sql$execution$streaming$continuous$ContinuousExecution$$triggerExecutor().execute((Function0<Object>)(JFunction0.mcZ.sp & Serializable & scala.Serializable)() -> {
                            boolean bl;
                            $this.$outer.startTrigger();
                            if ($this.stream$1.needsReconfiguration() && $this.$outer.state().compareAndSet(ACTIVE$.MODULE$, RECONFIGURING$.MODULE$)) {
                                if ($this.$outer.queryExecutionThread().isAlive()) {
                                    $this.$outer.queryExecutionThread().interrupt();
                                }
                                bl = false;
                            } else if ($this.$outer.isActive()) {
                                $this.$outer.currentBatchId_$eq(BoxesRunTime.unboxToLong((Object)$this.epochEndpoint$1.askSync((Object)IncrementAndGetEpoch$.MODULE$, ClassTag$.MODULE$.Long())));
                                $this.$outer.logInfo((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(23).append("New epoch ").append($this.$outer.currentBatchId()).append(" is starting.").toString());
                                bl = true;
                            } else {
                                bl = false;
                            }
                            return bl;
                        });
                    }
                    catch (InterruptedException interruptedException) {
                        return;
                    }
                }
                {
                    if ($outer == null) {
                        throw null;
                    }
                    this.$outer = $outer;
                    this.stream$1 = stream$1;
                    this.epochEndpoint$1 = epochEndpoint$1;
                }

                private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
                    return LambdaDeserialize.bootstrap("lambdaDeserialize", new MethodHandle[]{$anonfun$run$1(org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution$$anon$2 ), $anonfun$run$2(org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution$$anon$2 )}, serializedLambda);
                }
            }, new StringBuilder(24).append("epoch update thread for ").append(this.prettyIdString()).toString());
            try {
                try {
                    epochUpdateThread.setDaemon(true);
                    epochUpdateThread.start();
                    this.updateStatusMessage("Running");
                    this.reportTimeTaken("runContinuous", (Function0 & Serializable & scala.Serializable)() -> (RDD)SQLExecution$.MODULE$.withNewExecutionId(sparkSessionForQuery, this.lastExecution(), SQLExecution$.MODULE$.withNewExecutionId$default$3(), (Function0 & Serializable & scala.Serializable)() -> this.lastExecution().executedPlan().execute()));
                    Throwable f = this.failure().get();
                    if (f != null) {
                        throw f;
                    }
                }
                catch (Throwable throwable) {
                    Throwable throwable2;
                    Throwable throwable3 = throwable;
                    if (throwable3 != null && StreamExecution$.MODULE$.isInterruptionException(throwable2 = throwable3, super.sparkSession().sparkContext())) {
                        State state = this.state().get();
                        RECONFIGURING$ rECONFIGURING$ = RECONFIGURING$.MODULE$;
                        if (!(state != null ? !state.equals(rECONFIGURING$) : rECONFIGURING$ != null)) {
                            this.logInfo((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(46).append("Query ").append(this.id()).append(" ignoring exception from reconfiguring: ").append(throwable2).toString());
                            BoxedUnit boxedUnit = BoxedUnit.UNIT;
                            break block8;
                        }
                    }
                    throw throwable;
                }
            }
            finally {
                this.queryExecutionThread().runUninterruptibly((Function0)(JFunction0.mcV.sp & Serializable & scala.Serializable)() -> {
                    try {
                        epochEndpoint.askSync((Object)StopContinuousExecutionWrites$.MODULE$, ClassTag$.MODULE$.Unit());
                    }
                    finally {
                        SparkEnv$.MODULE$.get().rpcEnv().stop(epochEndpoint);
                        epochUpdateThread.interrupt();
                        epochUpdateThread.join();
                        this.super$sparkSession().sparkContext().cancelJobGroup(this.runId().toString());
                    }
                });
                Thread.interrupted();
            }
        }
    }

    public void addOffset(long epoch, ContinuousStream stream, Seq<PartitionOffset> partitionOffsets) {
        Option option;
        Predef$.MODULE$.assert(this.sources().length() == 1, (Function0 & Serializable & scala.Serializable)() -> "only one continuous source supported currently");
        Offset globalOffset = stream.mergeOffsets((PartitionOffset[])partitionOffsets.toArray(ClassTag$.MODULE$.apply(PartitionOffset.class)));
        ContinuousExecution continuousExecution = this;
        synchronized (continuousExecution) {
            this.offsetLog().add(epoch, OffsetSeq$.MODULE$.fill((Seq<Offset>)Predef$.MODULE$.wrapRefArray((Object[])new Offset[]{globalOffset})));
            option = this.offsetLog().get(epoch - 1L);
        }
        Option oldOffset = option;
        if (oldOffset.contains((Object)OffsetSeq$.MODULE$.fill((Seq<Offset>)Predef$.MODULE$.wrapRefArray((Object[])new Offset[]{globalOffset})))) {
            this.noNewData_$eq(true);
        }
        this.awaitProgressLock().lock();
        try {
            this.awaitProgressLockCondition().signalAll();
        }
        finally {
            this.awaitProgressLock().unlock();
        }
    }

    public void commit(long epoch) {
        block8: {
            this.updateStatusMessage(new StringBuilder(17).append("Committing epoch ").append(epoch).toString());
            Predef$.MODULE$.assert(this.sources().length() == 1, (Function0 & Serializable & scala.Serializable)() -> "only one continuous source supported currently");
            Predef$.MODULE$.assert(this.offsetLog().get(epoch).isDefined(), (Function0 & Serializable & scala.Serializable)() -> new StringBuilder(44).append("offset for epoch ").append(epoch).append(" not reported before commit").toString());
            ContinuousExecution continuousExecution = this;
            synchronized (continuousExecution) {
                block7: {
                    this.recordTriggerOffsets(this.committedOffsets(), this.availableOffsets());
                    if (!this.queryExecutionThread().isAlive()) break block7;
                    this.commitLog().add(epoch, new CommitMetadata(CommitMetadata$.MODULE$.apply$default$1()));
                    Offset offset = ((SparkDataStream)this.sources().apply(0)).deserializeOffset(((Offset)((Option)((OffsetSeq)this.offsetLog().get(epoch).get()).offsets().apply(0)).get()).json());
                    this.committedOffsets_$eq(this.committedOffsets().$plus$plus((GenTraversableOnce<Tuple2<SparkDataStream, Offset>>)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(this.sources().apply(0)), (Object)offset)}))));
                    ((SparkDataStream)this.sources().apply(0)).commit(offset);
                    break block8;
                }
                return;
            }
        }
        if ((long)this.minLogEntriesToMaintain() <= epoch) {
            this.purge(epoch + 1L - (long)this.minLogEntriesToMaintain());
        }
        this.awaitProgressLock().lock();
        try {
            this.awaitProgressLockCondition().signalAll();
        }
        finally {
            this.awaitProgressLock().unlock();
        }
    }

    public void awaitEpoch(long epoch) {
        while (this.notDone$1(epoch)) {
            this.awaitProgressLock().lock();
            try {
                this.awaitProgressLockCondition().await(100L, TimeUnit.MILLISECONDS);
                if (this.streamDeathCause() == null) continue;
                throw this.streamDeathCause();
            }
            finally {
                this.awaitProgressLock().unlock();
            }
        }
    }

    public void stopInNewThread(Throwable error) {
        block0: {
            if (!this.failure().compareAndSet(null, error)) break block0;
            this.logError((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(26).append("Query ").append(this.prettyIdString()).append(" received exception ").append(error).toString());
            this.stopInNewThread();
        }
    }

    private void stopInNewThread() {
        new Thread(this){
            private final /* synthetic */ ContinuousExecution $outer;

            public void run() {
                try {
                    this.$outer.stop();
                }
                catch (Throwable e) {
                    this.$outer.logError((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> e.getMessage(), e);
                    throw e;
                }
            }
            {
                if ($outer == null) {
                    throw null;
                }
                this.$outer = $outer;
                super("stop-continuous-execution");
                this.setDaemon(true);
            }

            private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
                return LambdaDeserialize.bootstrap("lambdaDeserialize", new MethodHandle[]{$anonfun$run$3(java.lang.Throwable )}, serializedLambda);
            }
        }.start();
    }

    @Override
    public void stop() {
        this.state().set(TERMINATED$.MODULE$);
        if (this.queryExecutionThread().isAlive()) {
            this.queryExecutionThread().interrupt();
            this.queryExecutionThread().join();
        }
        this.logInfo((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(18).append("Query ").append(this.prettyIdString()).append(" was stopped").toString());
    }

    private final boolean notDone$1(long epoch$2) {
        boolean bl;
        Some some;
        Tuple2 tuple2;
        Option latestCommit = this.commitLog().getLatest();
        Option option = latestCommit;
        if (option instanceof Some && (tuple2 = (Tuple2)(some = (Some)option).value()) != null) {
            long latestEpoch = tuple2._1$mcJ$sp();
            bl = latestEpoch < epoch$2;
        } else if (None$.MODULE$.equals(option)) {
            bl = true;
        } else {
            throw new MatchError(option);
        }
        return bl;
    }

    public ContinuousExecution(SparkSession sparkSession, String name, String checkpointRoot, LogicalPlan analyzedPlan, SupportsWrite sink, Trigger trigger, Clock triggerClock, OutputMode outputMode, Map<String, String> extraOptions, boolean deleteCheckpointOnStop) {
        super(sparkSession, name, checkpointRoot, analyzedPlan, (Table)sink, trigger, triggerClock, outputMode, deleteCheckpointOnStop);
        scala.collection.mutable.Map v2ToRelationMap = (scala.collection.mutable.Map)Map$.MODULE$.apply((Seq)Nil$.MODULE$);
        IntRef nextSourceId = IntRef.create((int)0);
        LogicalPlan _logicalPlan = (LogicalPlan)analyzedPlan.transform((PartialFunction)new scala.Serializable(this, v2ToRelationMap, nextSourceId){
            public static final long serialVersionUID = 0L;
            private final /* synthetic */ ContinuousExecution $outer;
            private final scala.collection.mutable.Map v2ToRelationMap$1;
            private final IntRef nextSourceId$1;

            /*
             * Enabled aggressive block sorting
             */
            public final <A1 extends LogicalPlan, B1> B1 applyOrElse(A1 x1, Function1<A1, B1> function1) {
                Object object;
                A1 A1 = x1;
                if (A1 instanceof StreamingRelationV2) {
                    StreamingRelationV2 streamingRelationV2 = (StreamingRelationV2)A1;
                    TableProvider ds = streamingRelationV2.source();
                    String sourceName = streamingRelationV2.sourceName();
                    Table table = streamingRelationV2.table();
                    CaseInsensitiveStringMap options = streamingRelationV2.extraOptions();
                    Seq<Attribute> output = streamingRelationV2.output();
                    if (table instanceof SupportsRead) {
                        SupportsRead supportsRead = (SupportsRead)table;
                        if (!DataSourceV2Implicits$.MODULE$.TableHelper((Table)supportsRead).supports(TableCapability.CONTINUOUS_READ)) {
                            throw new UnsupportedOperationException(new StringBuilder(52).append("Data source ").append(sourceName).append(" does not support continuous processing.").toString());
                        }
                        object = this.v2ToRelationMap$1.getOrElseUpdate((Object)((Object)streamingRelationV2), (Function0 & Serializable & scala.Serializable)() -> {
                            String metadataPath = new StringBuilder(9).append($this.$outer.resolvedCheckpointRoot()).append("/sources/").append($this.nextSourceId$1.elem).toString();
                            ++$this.nextSourceId$1.elem;
                            $this.$outer.logInfo((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(46).append("Reading table [").append(supportsRead).append("] from DataSourceV2 named '").append(sourceName).append("' [").append(ds).append("]").toString());
                            Scan scan = supportsRead.newScanBuilder(options).build();
                            ContinuousStream stream = scan.toContinuousStream(metadataPath);
                            return new StreamingDataSourceV2Relation(output, scan, (SparkDataStream)stream, StreamingDataSourceV2Relation$.MODULE$.apply$default$4(), StreamingDataSourceV2Relation$.MODULE$.apply$default$5());
                        });
                        return (B1)object;
                    }
                }
                object = function1.apply(x1);
                return (B1)object;
            }

            public final boolean isDefinedAt(LogicalPlan x1) {
                StreamingRelationV2 streamingRelationV2;
                Table table;
                LogicalPlan logicalPlan2 = x1;
                boolean bl = logicalPlan2 instanceof StreamingRelationV2 && (table = (streamingRelationV2 = (StreamingRelationV2)logicalPlan2).table()) instanceof SupportsRead;
                return bl;
            }
            {
                if ($outer == null) {
                    throw null;
                }
                this.$outer = $outer;
                this.v2ToRelationMap$1 = v2ToRelationMap$1;
                this.nextSourceId$1 = nextSourceId$1;
            }

            private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
                return LambdaDeserialize.bootstrap("lambdaDeserialize", new MethodHandle[]{$anonfun$applyOrElse$1(org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution$$anonfun$1 org.apache.spark.sql.connector.catalog.SupportsRead java.lang.String org.apache.spark.sql.connector.catalog.TableProvider org.apache.spark.sql.util.CaseInsensitiveStringMap scala.collection.Seq ), $anonfun$applyOrElse$2(org.apache.spark.sql.connector.catalog.SupportsRead java.lang.String org.apache.spark.sql.connector.catalog.TableProvider )}, serializedLambda);
            }
        });
        this.sources_$eq((Seq<ContinuousStream>)_logicalPlan.collect((PartialFunction)new scala.Serializable(null){
            public static final long serialVersionUID = 0L;

            public final <A1 extends LogicalPlan, B1> B1 applyOrElse(A1 x2, Function1<A1, B1> function1) {
                Object object;
                A1 A1 = x2;
                if (A1 instanceof StreamingDataSourceV2Relation) {
                    StreamingDataSourceV2Relation streamingDataSourceV2Relation = (StreamingDataSourceV2Relation)A1;
                    object = (ContinuousStream)streamingDataSourceV2Relation.stream();
                } else {
                    object = function1.apply(x2);
                }
                return (B1)object;
            }

            public final boolean isDefinedAt(LogicalPlan x2) {
                LogicalPlan logicalPlan2 = x2;
                boolean bl = logicalPlan2 instanceof StreamingDataSourceV2Relation;
                return bl;
            }
        }));
        this.uniqueSources_$eq((Seq<SparkDataStream>)((Seq)this.sources().distinct()));
        this.logicalPlan = new WriteToContinuousDataSource(this.createStreamingWrite((SupportsWrite)super.sink(), extraOptions, _logicalPlan), _logicalPlan);
        Trigger trigger2 = super.trigger();
        if (!(trigger2 instanceof ContinuousTrigger)) {
            throw new IllegalStateException(new StringBuilder(29).append("Unsupported type of trigger: ").append(super.trigger()).toString());
        }
        ContinuousTrigger continuousTrigger = (ContinuousTrigger)trigger2;
        long t = continuousTrigger.intervalMs();
        ProcessingTimeExecutor processingTimeExecutor = new ProcessingTimeExecutor(new ProcessingTimeTrigger(t), super.triggerClock());
        this.org$apache$spark$sql$execution$streaming$continuous$ContinuousExecution$$triggerExecutor = processingTimeExecutor;
    }
}

