/*
 * Decompiled with CFR 0.152.
 */
package org.apache.spark.sql.delta.sources;

import java.io.Serializable;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.spark.internal.Logging;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.delta.DeltaLog;
import org.apache.spark.sql.delta.DeltaOptions$;
import org.apache.spark.sql.delta.Snapshot;
import org.apache.spark.sql.delta.actions.Action;
import org.apache.spark.sql.delta.actions.FileAction;
import org.apache.spark.sql.delta.actions.Metadata;
import org.apache.spark.sql.delta.sources.DeltaSQLConf$;
import org.apache.spark.sql.delta.sources.DeltaSource$;
import org.apache.spark.sql.delta.sources.DeltaSourceSchemaEvolutionSupport$;
import org.apache.spark.sql.delta.sources.DeltaSourceSchemaTrackingLog;
import org.apache.spark.sql.delta.sources.PersistedSchema;
import org.apache.spark.sql.delta.sources.PersistedSchema$;
import org.apache.spark.sql.delta.storage.ClosableIterator$;
import org.apache.spark.sql.delta.storage.ClosableIterator$ClosableWrapper$;
import org.slf4j.Logger;
import scala.Function0;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Some;
import scala.Tuple2;
import scala.Tuple3;
import scala.collection.Iterator;
import scala.runtime.BooleanRef;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.ObjectRef;

public final class DeltaSourceSchemaTrackingLog$
implements Logging {
    public static DeltaSourceSchemaTrackingLog$ MODULE$;
    private transient Logger org$apache$spark$internal$Logging$$log_;

    static {
        new DeltaSourceSchemaTrackingLog$();
    }

    public String logName() {
        return Logging.logName$((Logging)this);
    }

    public Logger log() {
        return Logging.log$((Logging)this);
    }

    public void logInfo(Function0<String> msg) {
        Logging.logInfo$((Logging)this, msg);
    }

    public void logDebug(Function0<String> msg) {
        Logging.logDebug$((Logging)this, msg);
    }

    public void logTrace(Function0<String> msg) {
        Logging.logTrace$((Logging)this, msg);
    }

    public void logWarning(Function0<String> msg) {
        Logging.logWarning$((Logging)this, msg);
    }

    public void logError(Function0<String> msg) {
        Logging.logError$((Logging)this, msg);
    }

    public void logInfo(Function0<String> msg, Throwable throwable) {
        Logging.logInfo$((Logging)this, msg, (Throwable)throwable);
    }

    public void logDebug(Function0<String> msg, Throwable throwable) {
        Logging.logDebug$((Logging)this, msg, (Throwable)throwable);
    }

    public void logTrace(Function0<String> msg, Throwable throwable) {
        Logging.logTrace$((Logging)this, msg, (Throwable)throwable);
    }

    public void logWarning(Function0<String> msg, Throwable throwable) {
        Logging.logWarning$((Logging)this, msg, (Throwable)throwable);
    }

    public void logError(Function0<String> msg, Throwable throwable) {
        Logging.logError$((Logging)this, msg, (Throwable)throwable);
    }

    public boolean isTraceEnabled() {
        return Logging.isTraceEnabled$((Logging)this);
    }

    public void initializeLogIfNecessary(boolean isInterpreter) {
        Logging.initializeLogIfNecessary$((Logging)this, (boolean)isInterpreter);
    }

    public boolean initializeLogIfNecessary(boolean isInterpreter, boolean silent) {
        return Logging.initializeLogIfNecessary$((Logging)this, (boolean)isInterpreter, (boolean)silent);
    }

    public boolean initializeLogIfNecessary$default$2() {
        return Logging.initializeLogIfNecessary$default$2$((Logging)this);
    }

    public void initializeForcefully(boolean isInterpreter, boolean silent) {
        Logging.initializeForcefully$((Logging)this, (boolean)isInterpreter, (boolean)silent);
    }

    public Logger org$apache$spark$internal$Logging$$log_() {
        return this.org$apache$spark$internal$Logging$$log_;
    }

    public void org$apache$spark$internal$Logging$$log__$eq(Logger x$1) {
        this.org$apache$spark$internal$Logging$$log_ = x$1;
    }

    private Option<String> $lessinit$greater$default$4() {
        return None$.MODULE$;
    }

    private boolean $lessinit$greater$default$5() {
        return true;
    }

    public String fullSchemaTrackingLocation(String rootSchemaTrackingLocation, String tableId, Option<String> sourceTrackingId) {
        String subdir = new StringBuilder(12).append("_schema_log_").append(tableId).append(sourceTrackingId.map((Function1 & Serializable & scala.Serializable)n -> new StringBuilder(1).append("_").append((String)n).toString()).getOrElse((Function0 & Serializable & scala.Serializable)() -> "")).toString();
        return new Path(rootSchemaTrackingLocation, subdir).toString();
    }

    public Option<String> fullSchemaTrackingLocation$default$3() {
        return None$.MODULE$;
    }

    /*
     * Enabled aggressive block sorting
     */
    public DeltaSourceSchemaTrackingLog create(SparkSession sparkSession, String rootSchemaLocation, Snapshot sourceSnapshot, Option<String> sourceTrackingId, Option<String> sourceMetadataPathOpt, boolean mergeConsecutiveSchemaChanges, boolean initSchemaLogEagerly) {
        Tuple3 tuple3;
        String schemaTrackingLocation = this.fullSchemaTrackingLocation(rootSchemaLocation, sourceSnapshot.deltaLog().tableId(), sourceTrackingId);
        DeltaSourceSchemaTrackingLog log = new DeltaSourceSchemaTrackingLog(sparkSession, schemaTrackingLocation, sourceSnapshot, sourceMetadataPathOpt, initSchemaLogEagerly);
        log.getCurrentTrackedSchema().foreach((Function1 & Serializable & scala.Serializable)schema -> {
            DeltaSourceSchemaTrackingLog$.$anonfun$create$1(sourceSnapshot, sparkSession, sourceMetadataPathOpt, schema);
            return BoxedUnit.UNIT;
        });
        if (mergeConsecutiveSchemaChanges && log.getCurrentTrackedSchema().isDefined()) {
            this.getMergedConsecutiveSchemaChange(sparkSession, sourceSnapshot.deltaLog(), (PersistedSchema)log.getCurrentTrackedSchema().get()).foreach((Function1 & Serializable & scala.Serializable)newSchema -> {
                log.replaceCurrentSchema(newSchema);
                return BoxedUnit.UNIT;
            });
        }
        if ((tuple3 = new Tuple3(log.getPreviousTrackedSchema(), log.getCurrentTrackedSchema(), sourceMetadataPathOpt)) != null) {
            Option option = (Option)tuple3._1();
            Option option2 = (Option)tuple3._2();
            Option option3 = (Option)tuple3._3();
            if (option instanceof Some) {
                Some some = (Some)option;
                PersistedSchema prev = (PersistedSchema)some.value();
                if (option2 instanceof Some) {
                    Some some2 = (Some)option2;
                    PersistedSchema curr = (PersistedSchema)some2.value();
                    if (option3 instanceof Some) {
                        Some some3 = (Some)option3;
                        String metadataPath = (String)some3.value();
                        DeltaSourceSchemaEvolutionSupport$.MODULE$.validateIfSchemaChangeCanBeUnblockedWithSQLConf(sparkSession, metadataPath, curr, prev);
                        BoxedUnit boxedUnit = BoxedUnit.UNIT;
                        return log;
                    }
                }
            }
        }
        BoxedUnit boxedUnit = BoxedUnit.UNIT;
        return log;
    }

    public Option<String> create$default$4() {
        return None$.MODULE$;
    }

    public Option<String> create$default$5() {
        return None$.MODULE$;
    }

    public boolean create$default$6() {
        return false;
    }

    public boolean create$default$7() {
        return true;
    }

    private Option<PersistedSchema> getMergedConsecutiveSchemaChange(SparkSession spark, DeltaLog deltaLog, PersistedSchema currentSchema) {
        long currentSchemaVersion = currentSchema.deltaCommitVersion();
        Iterator untilSchemaChange = deltaLog.getChangeLogFiles(currentSchemaVersion, deltaLog.getChangeLogFiles$default$2()).map((Function1 & Serializable & scala.Serializable)x0$1 -> {
            Tuple2 tuple2 = x0$1;
            if (tuple2 == null) {
                throw new MatchError((Object)tuple2);
            }
            long version = tuple2._1$mcJ$sp();
            FileStatus fileStatus = (FileStatus)tuple2._2();
            Option schemaChange = (Option)DeltaSource$.MODULE$.createRewindableActionIterator(spark, deltaLog, fileStatus).processAndClose((Function1 & Serializable & scala.Serializable)actionsIter -> {
                ObjectRef metadataAction = ObjectRef.create((Object)None$.MODULE$);
                BooleanRef hasFileAction = BooleanRef.create((boolean)false);
                actionsIter.foreach((Function1 & Serializable & scala.Serializable)x0$2 -> {
                    DeltaSourceSchemaTrackingLog$.$anonfun$getMergedConsecutiveSchemaChange$3(metadataAction, hasFileAction, x0$2);
                    return BoxedUnit.UNIT;
                });
                return hasFileAction.elem ? None$.MODULE$ : (Option)metadataAction.elem;
            });
            Option option = schemaChange.map((Function1 & Serializable & scala.Serializable)m -> new Tuple2((Object)BoxesRunTime.boxToLong((long)version), m));
            return option;
        }).takeWhile((Function1 & Serializable & scala.Serializable)x$6 -> BoxesRunTime.boxToBoolean((boolean)x$6.isDefined()));
        return DeltaSource$.MODULE$.iteratorLast(ClosableIterator$ClosableWrapper$.MODULE$.toClosable$extension(ClosableIterator$.MODULE$.ClosableWrapper(untilSchemaChange))).flatten(Predef$.MODULE$.$conforms()).flatMap((Function1 & Serializable & scala.Serializable)x0$3 -> {
            None$ none$;
            Tuple2 tuple2 = x0$3;
            if (tuple2 != null) {
                long version = tuple2._1$mcJ$sp();
                Metadata metadata = (Metadata)tuple2._2();
                if (version == currentSchemaVersion) {
                    none$ = None$.MODULE$;
                } else {
                    MODULE$.log().info(new StringBuilder(80).append("Looked ahead from version ").append(currentSchemaVersion).append(" and ").append("will use schema at version ").append(version).append(" to read Delta stream.").toString());
                    none$ = new Some((Object)PersistedSchema$.MODULE$.apply(deltaLog.tableId(), version, metadata, currentSchema.sourceMetadataPath()));
                }
            } else {
                throw new MatchError((Object)tuple2);
            }
            None$ none$2 = none$;
            return none$2;
        });
    }

    public static final /* synthetic */ void $anonfun$create$2(PersistedSchema schema$1, String metadataPath) {
        String string = metadataPath;
        String string2 = schema$1.sourceMetadataPath();
        Predef$.MODULE$.require(!(string != null ? !string.equals(string2) : string2 != null), (Function0 & Serializable & scala.Serializable)() -> new StringBuilder(270).append("The Delta source metadata path used for execution '").append(metadataPath).append("' is different ").append("from the one persisted for previous processing '").append(schema$1.sourceMetadataPath()).append("'. ").append("Please check if the schema location has been reused across different streaming ").append("sources. Pick a new `").append(DeltaOptions$.MODULE$.SCHEMA_TRACKING_LOCATION()).append("` or use ").append("`").append(DeltaOptions$.MODULE$.STREAMING_SOURCE_TRACKING_ID()).append("` to ").append("distinguish between streaming sources.").toString());
    }

    public static final /* synthetic */ void $anonfun$create$1(Snapshot sourceSnapshot$1, SparkSession sparkSession$1, Option sourceMetadataPathOpt$1, PersistedSchema schema) {
        block0: {
            schema.validateAgainstSnapshot(sourceSnapshot$1);
            if (!BoxesRunTime.unboxToBoolean((Object)sparkSession$1.sessionState().conf().getConf(DeltaSQLConf$.MODULE$.DELTA_STREAMING_SCHEMA_TRACKING_METADATA_PATH_CHECK_ENABLED()))) break block0;
            sourceMetadataPathOpt$1.foreach((Function1 & Serializable & scala.Serializable)metadataPath -> {
                DeltaSourceSchemaTrackingLog$.$anonfun$create$2(schema, metadataPath);
                return BoxedUnit.UNIT;
            });
        }
    }

    public static final /* synthetic */ void $anonfun$getMergedConsecutiveSchemaChange$3(ObjectRef metadataAction$1, BooleanRef hasFileAction$1, Action x0$2) {
        Action action = x0$2;
        if (action instanceof Metadata) {
            Metadata metadata = (Metadata)action;
            metadataAction$1.elem = new Some((Object)metadata);
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        } else if (action instanceof FileAction) {
            hasFileAction$1.elem = true;
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        } else {
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        }
    }

    private DeltaSourceSchemaTrackingLog$() {
        MODULE$ = this;
        Logging.$init$((Logging)this);
    }
}

