/*
 * Decompiled with CFR 0.152.
 */
package org.apache.spark.sql.hudi.streaming;

import java.io.Serializable;
import org.apache.hudi.AvroConversionUtils$;
import org.apache.hudi.DataSourceReadOptions$;
import org.apache.hudi.IncrementalRelationV1;
import org.apache.hudi.MergeOnReadIncrementalRelationV1;
import org.apache.hudi.MergeOnReadIncrementalRelationV1$;
import org.apache.hudi.SparkAdapterSupport;
import org.apache.hudi.cdc.CDCRelation$;
import org.apache.hudi.client.utils.SparkRowSerDe;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.cdc.HoodieCDCUtils;
import org.apache.hudi.common.table.checkpoint.CheckpointUtils;
import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV1;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineUtils;
import org.apache.spark.internal.Logging;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.connector.read.streaming.Offset;
import org.apache.spark.sql.execution.streaming.Source;
import org.apache.spark.sql.hudi.SparkAdapter;
import org.apache.spark.sql.hudi.streaming.HoodieEarliestOffsetRangeLimit$;
import org.apache.spark.sql.hudi.streaming.HoodieLatestOffsetRangeLimit$;
import org.apache.spark.sql.hudi.streaming.HoodieMetadataLog;
import org.apache.spark.sql.hudi.streaming.HoodieOffsetRangeLimit;
import org.apache.spark.sql.hudi.streaming.HoodieSourceOffset;
import org.apache.spark.sql.hudi.streaming.HoodieSourceOffset$;
import org.apache.spark.sql.hudi.streaming.HoodieSpecifiedOffsetRangeLimit;
import org.apache.spark.sql.sources.Filter;
import org.apache.spark.sql.types.StructType;
import org.slf4j.Logger;
import scala.Array$;
import scala.Function0;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef;
import scala.Predef$;
import scala.Some;
import scala.Tuple2;
import scala.collection.GenTraversableOnce;
import scala.collection.Seq;
import scala.collection.immutable.Map;
import scala.collection.mutable.ArrayOps;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;

@ScalaSignature(bytes="\u0006\u0001\u0005Ef\u0001\u0002\f\u0018\u0001\u0011B\u0001\"\u0012\u0001\u0003\u0002\u0003\u0006IA\u0012\u0005\t\u0015\u0002\u0011\t\u0011)A\u0005\u0017\"A1\u000b\u0001B\u0001B\u0003%A\u000b\u0003\u0005`\u0001\t\u0005\t\u0015!\u0003a\u0011!I\u0007A!A!\u0002\u0013Q\u0007\u0002C7\u0001\u0005\u0003\u0005\u000b\u0011\u00028\t\u0011I\u0004!\u0011!Q\u0001\nMDQA\u001e\u0001\u0005\u0002]D!\"!\u0001\u0001\u0011\u000b\u0007I\u0011BA\u0002\u0011%\t\t\u0002\u0001b\u0001\n\u0013\t\u0019\u0002\u0003\u0005\u0002\u001c\u0001\u0001\u000b\u0011BA\u000b\u0011%\ti\u0002\u0001b\u0001\n\u0013\ty\u0002\u0003\u0005\u0002L\u0001\u0001\u000b\u0011BA\u0011\u0011)\ti\u0005\u0001EC\u0002\u0013%\u0011q\n\u0005\b\u0003?\u0002A\u0011IA1\u0011\u001d\t\u0019\u0007\u0001C\u0005\u0003KBq!!\u001b\u0001\t\u0003\nY\u0007C\u0004\u0002v\u0001!\t%a\u001e\t\u000f\u0005]\u0005\u0001\"\u0003\u0002\u001a\"9\u0011q\u0014\u0001\u0005\n\u0005\u0005\u0006bBAT\u0001\u0011\u0005\u0013\u0011\u0016\u0002\u0015\u0011>|G-[3TiJ,\u0017-\\*pkJ\u001cWMV\u0019\u000b\u0005aI\u0012!C:ue\u0016\fW.\u001b8h\u0015\tQ2$\u0001\u0003ik\u0012L'B\u0001\u000f\u001e\u0003\r\u0019\u0018\u000f\u001c\u0006\u0003=}\tQa\u001d9be.T!\u0001I\u0011\u0002\r\u0005\u0004\u0018m\u00195f\u0015\u0005\u0011\u0013aA8sO\u000e\u00011C\u0002\u0001&[QR\u0004\t\u0005\u0002'W5\tqE\u0003\u0002)S\u0005!A.\u00198h\u0015\u0005Q\u0013\u0001\u00026bm\u0006L!\u0001L\u0014\u0003\r=\u0013'.Z2u!\tq#'D\u00010\u0015\tA\u0002G\u0003\u000227\u0005IQ\r_3dkRLwN\\\u0005\u0003g=\u0012aaU8ve\u000e,\u0007CA\u001b9\u001b\u00051$BA\u001c\u001e\u0003!Ig\u000e^3s]\u0006d\u0017BA\u001d7\u0005\u001daunZ4j]\u001e\u0004\"a\u000f \u000e\u0003qR\u0011!P\u0001\u0006g\u000e\fG.Y\u0005\u0003\u007fq\u0012AbU3sS\u0006d\u0017N_1cY\u0016\u0004\"!Q\"\u000e\u0003\tS!AG\u0010\n\u0005\u0011\u0013%aE*qCJ\\\u0017\tZ1qi\u0016\u00148+\u001e9q_J$\u0018AC:rY\u000e{g\u000e^3yiB\u0011q\tS\u0007\u00027%\u0011\u0011j\u0007\u0002\u000b'Fc5i\u001c8uKb$\u0018AC7fi\u0006\u001cE.[3oiB\u0011A*U\u0007\u0002\u001b*\u0011ajT\u0001\u0006i\u0006\u0014G.\u001a\u0006\u0003!\n\u000baaY8n[>t\u0017B\u0001*N\u0005UAun\u001c3jKR\u000b'\r\\3NKR\f7\t\\5f]R\fA\"\\3uC\u0012\fG/\u0019)bi\"\u0004\"!\u0016/\u000f\u0005YS\u0006CA,=\u001b\u0005A&BA-$\u0003\u0019a$o\\8u}%\u00111\fP\u0001\u0007!J,G-\u001a4\n\u0005us&AB*ue&twM\u0003\u0002\\y\u0005a1o\u00195f[\u0006|\u0005\u000f^5p]B\u00191(Y2\n\u0005\td$AB(qi&|g\u000e\u0005\u0002eO6\tQM\u0003\u0002g7\u0005)A/\u001f9fg&\u0011\u0001.\u001a\u0002\u000b'R\u0014Xo\u0019;UsB,\u0017A\u00039be\u0006lW\r^3sgB!Qk\u001b+U\u0013\tagLA\u0002NCB\f\u0001c\u001c4gg\u0016$(+\u00198hK2KW.\u001b;\u0011\u0005=\u0004X\"A\f\n\u0005E<\"A\u0006%p_\u0012LWm\u00144gg\u0016$(+\u00198hK2KW.\u001b;\u0002#]\u0014\u0018\u000e^3UC\ndWMV3sg&|g\u000e\u0005\u0002Mi&\u0011Q/\u0014\u0002\u0013\u0011>|G-[3UC\ndWMV3sg&|g.\u0001\u0004=S:LGO\u0010\u000b\tqfT8\u0010`?\u007f\u007fB\u0011q\u000e\u0001\u0005\u0006\u000b\"\u0001\rA\u0012\u0005\u0006\u0015\"\u0001\ra\u0013\u0005\u0006'\"\u0001\r\u0001\u0016\u0005\u0006?\"\u0001\r\u0001\u0019\u0005\u0006S\"\u0001\rA\u001b\u0005\u0006[\"\u0001\rA\u001c\u0005\u0006e\"\u0001\ra]\u0001\ni\u0006\u0014G.\u001a+za\u0016,\"!!\u0002\u0011\t\u0005\u001d\u0011QB\u0007\u0003\u0003\u0013Q1!a\u0003P\u0003\u0015iw\u000eZ3m\u0013\u0011\ty!!\u0003\u0003\u001f!{w\u000eZ5f)\u0006\u0014G.\u001a+za\u0016\f!\"[:D\t\u000e\u000bV/\u001a:z+\t\t)\u0002E\u0002<\u0003/I1!!\u0007=\u0005\u001d\u0011un\u001c7fC:\f1\"[:D\t\u000e\u000bV/\u001a:zA\u0005A\u0002n\u001c7m_^\u001cu.\\7ji\"\u000bg\u000e\u001a7j]\u001elu\u000eZ3\u0016\u0005\u0005\u0005\u0002\u0003BA\u0012\u0003\u000brA!!\n\u0002@9!\u0011qEA\u001e\u001d\u0011\tI#!\u000f\u000f\t\u0005-\u0012q\u0007\b\u0005\u0003[\t)D\u0004\u0003\u00020\u0005MbbA,\u00022%\t!%\u0003\u0002!C%\u0011!dH\u0005\u0003!\nK!AT(\n\u0007\u0005uR*\u0001\u0005uS6,G.\u001b8f\u0013\u0011\t\t%a\u0011\u0002\u001bQKW.\u001a7j]\u0016,F/\u001b7t\u0015\r\ti$T\u0005\u0005\u0003\u000f\nIE\u0001\u000bI_2dwn^\"p[6LG\u000fS1oI2Lgn\u001a\u0006\u0005\u0003\u0003\n\u0019%A\ri_2dwn^\"p[6LG\u000fS1oI2LgnZ'pI\u0016\u0004\u0013AD5oSRL\u0017\r\\(gMN,Go]\u000b\u0003\u0003#\u00022a\\A*\u0013\r\t)f\u0006\u0002\u0013\u0011>|G-[3T_V\u00148-Z(gMN,G\u000fK\u0002\u000f\u00033\u00022aOA.\u0013\r\ti\u0006\u0010\u0002\niJ\fgn]5f]R\faa]2iK6\fW#A2\u0002\u001f\u001d,G\u000fT1uKN$xJ\u001a4tKR,\"!a\u001a\u0011\tm\n\u0017\u0011K\u0001\nO\u0016$xJ\u001a4tKR,\"!!\u001c\u0011\tm\n\u0017q\u000e\t\u0004]\u0005E\u0014bAA:_\t1qJ\u001a4tKR\f\u0001bZ3u\u0005\u0006$8\r\u001b\u000b\u0007\u0003s\ny)a%\u0011\t\u0005m\u0014\u0011\u0012\b\u0005\u0003{\n)I\u0004\u0003\u0002\u0000\u0005\re\u0002BA\u0017\u0003\u0003K!AH\u0010\n\u0005qi\u0012bAAD7\u00059\u0001/Y2lC\u001e,\u0017\u0002BAF\u0003\u001b\u0013\u0011\u0002R1uC\u001a\u0013\u0018-\\3\u000b\u0007\u0005\u001d5\u0004C\u0004\u0002\u0012J\u0001\r!!\u001c\u0002\u000bM$\u0018M\u001d;\t\u000f\u0005U%\u00031\u0001\u0002p\u0005\u0019QM\u001c3\u0002\u001fM$\u0018M\u001d;D_6l\u0017\u000e\u001e+j[\u0016$2\u0001VAN\u0011\u001d\tij\u0005a\u0001\u0003#\n1b\u001d;beR|eMZ:fi\u0006\u0019BO]1og2\fG/Z\"iK\u000e\\\u0007o\\5oiR\u0019A+a)\t\r\u0005\u0015F\u00031\u0001U\u0003)\u0019w.\\7jiRKW.Z\u0001\u0005gR|\u0007\u000f\u0006\u0002\u0002,B\u00191(!,\n\u0007\u0005=FH\u0001\u0003V]&$\b")
public class HoodieStreamSourceV1
implements Source,
Logging,
scala.Serializable,
SparkAdapterSupport {
    private HoodieTableType tableType;
    private transient HoodieSourceOffset initialOffsets;
    private final SQLContext sqlContext;
    private final HoodieTableMetaClient metaClient;
    private final String metadataPath;
    private final Option<StructType> schemaOption;
    private final Map<String, String> parameters;
    private final HoodieOffsetRangeLimit offsetRangeLimit;
    private final HoodieTableVersion writeTableVersion;
    private final boolean isCDCQuery;
    private final TimelineUtils.HollowCommitHandling hollowCommitHandlingMode;
    private SparkAdapter sparkAdapter;
    private transient Logger org$apache$spark$internal$Logging$$log_;
    private volatile byte bitmap$0;
    private volatile transient boolean bitmap$trans$0;

    public String logName() {
        return Logging.logName$((Logging)this);
    }

    public Logger log() {
        return Logging.log$((Logging)this);
    }

    public void logInfo(Function0<String> msg) {
        Logging.logInfo$((Logging)this, msg);
    }

    public void logDebug(Function0<String> msg) {
        Logging.logDebug$((Logging)this, msg);
    }

    public void logTrace(Function0<String> msg) {
        Logging.logTrace$((Logging)this, msg);
    }

    public void logWarning(Function0<String> msg) {
        Logging.logWarning$((Logging)this, msg);
    }

    public void logError(Function0<String> msg) {
        Logging.logError$((Logging)this, msg);
    }

    public void logInfo(Function0<String> msg, Throwable throwable) {
        Logging.logInfo$((Logging)this, msg, (Throwable)throwable);
    }

    public void logDebug(Function0<String> msg, Throwable throwable) {
        Logging.logDebug$((Logging)this, msg, (Throwable)throwable);
    }

    public void logTrace(Function0<String> msg, Throwable throwable) {
        Logging.logTrace$((Logging)this, msg, (Throwable)throwable);
    }

    public void logWarning(Function0<String> msg, Throwable throwable) {
        Logging.logWarning$((Logging)this, msg, (Throwable)throwable);
    }

    public void logError(Function0<String> msg, Throwable throwable) {
        Logging.logError$((Logging)this, msg, (Throwable)throwable);
    }

    public boolean isTraceEnabled() {
        return Logging.isTraceEnabled$((Logging)this);
    }

    public void initializeLogIfNecessary(boolean isInterpreter) {
        Logging.initializeLogIfNecessary$((Logging)this, (boolean)isInterpreter);
    }

    public boolean initializeLogIfNecessary(boolean isInterpreter, boolean silent) {
        return Logging.initializeLogIfNecessary$((Logging)this, (boolean)isInterpreter, (boolean)silent);
    }

    public boolean initializeLogIfNecessary$default$2() {
        return Logging.initializeLogIfNecessary$default$2$((Logging)this);
    }

    public void initializeForcefully(boolean isInterpreter, boolean silent) {
        Logging.initializeForcefully$((Logging)this, (boolean)isInterpreter, (boolean)silent);
    }

    public void commit(org.apache.spark.sql.execution.streaming.Offset end) {
        Source.commit$((Source)this, (org.apache.spark.sql.execution.streaming.Offset)end);
    }

    public Offset initialOffset() {
        return Source.initialOffset$((Source)this);
    }

    public Offset deserializeOffset(String json) {
        return Source.deserializeOffset$((Source)this, (String)json);
    }

    public void commit(Offset end) {
        Source.commit$((Source)this, (Offset)end);
    }

    private SparkAdapter sparkAdapter$lzycompute() {
        HoodieStreamSourceV1 hoodieStreamSourceV1 = this;
        synchronized (hoodieStreamSourceV1) {
            if ((byte)(this.bitmap$0 & 2) == 0) {
                this.sparkAdapter = SparkAdapterSupport.sparkAdapter$(this);
                this.bitmap$0 = (byte)(this.bitmap$0 | 2);
            }
        }
        return this.sparkAdapter;
    }

    @Override
    public SparkAdapter sparkAdapter() {
        if ((byte)(this.bitmap$0 & 2) == 0) {
            return this.sparkAdapter$lzycompute();
        }
        return this.sparkAdapter;
    }

    public Logger org$apache$spark$internal$Logging$$log_() {
        return this.org$apache$spark$internal$Logging$$log_;
    }

    public void org$apache$spark$internal$Logging$$log__$eq(Logger x$1) {
        this.org$apache$spark$internal$Logging$$log_ = x$1;
    }

    private HoodieTableType tableType$lzycompute() {
        HoodieStreamSourceV1 hoodieStreamSourceV1 = this;
        synchronized (hoodieStreamSourceV1) {
            if ((byte)(this.bitmap$0 & 1) == 0) {
                this.tableType = this.metaClient.getTableType();
                this.bitmap$0 = (byte)(this.bitmap$0 | 1);
            }
        }
        return this.tableType;
    }

    private HoodieTableType tableType() {
        if ((byte)(this.bitmap$0 & 1) == 0) {
            return this.tableType$lzycompute();
        }
        return this.tableType;
    }

    private boolean isCDCQuery() {
        return this.isCDCQuery;
    }

    private TimelineUtils.HollowCommitHandling hollowCommitHandlingMode() {
        return this.hollowCommitHandlingMode;
    }

    private HoodieSourceOffset initialOffsets$lzycompute() {
        HoodieStreamSourceV1 hoodieStreamSourceV1 = this;
        synchronized (hoodieStreamSourceV1) {
            if (!this.bitmap$trans$0) {
                HoodieMetadataLog metadataLog = new HoodieMetadataLog(this.sqlContext.sparkSession(), this.metadataPath);
                this.initialOffsets = (HoodieSourceOffset)((Object)metadataLog.get(0L).getOrElse((Function0 & Serializable & scala.Serializable)() -> {
                    HoodieSourceOffset hoodieSourceOffset;
                    HoodieOffsetRangeLimit hoodieOffsetRangeLimit = $this.offsetRangeLimit;
                    if (HoodieEarliestOffsetRangeLimit$.MODULE$.equals(hoodieOffsetRangeLimit)) {
                        hoodieSourceOffset = HoodieSourceOffset$.MODULE$.INIT_OFFSET();
                    } else if (HoodieLatestOffsetRangeLimit$.MODULE$.equals(hoodieOffsetRangeLimit)) {
                        hoodieSourceOffset = (HoodieSourceOffset)((Object)((Object)this.getLatestOffset().getOrElse((Function0 & Serializable & scala.Serializable)() -> HoodieSourceOffset$.MODULE$.INIT_OFFSET())));
                    } else if (hoodieOffsetRangeLimit instanceof HoodieSpecifiedOffsetRangeLimit) {
                        HoodieSpecifiedOffsetRangeLimit hoodieSpecifiedOffsetRangeLimit = (HoodieSpecifiedOffsetRangeLimit)hoodieOffsetRangeLimit;
                        String instantTime = hoodieSpecifiedOffsetRangeLimit.instantTime();
                        hoodieSourceOffset = new HoodieSourceOffset(instantTime);
                    } else {
                        throw new MatchError((Object)hoodieOffsetRangeLimit);
                    }
                    HoodieSourceOffset offset = hoodieSourceOffset;
                    metadataLog.add(0L, (Object)offset);
                    this.logInfo((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(22).append("The initial offset is ").append((Object)offset).toString());
                    return offset;
                }));
                this.bitmap$trans$0 = true;
            }
        }
        return this.initialOffsets;
    }

    private HoodieSourceOffset initialOffsets() {
        if (!this.bitmap$trans$0) {
            return this.initialOffsets$lzycompute();
        }
        return this.initialOffsets;
    }

    public StructType schema() {
        if (this.isCDCQuery()) {
            return CDCRelation$.MODULE$.FULL_CDC_SPARK_SCHEMA();
        }
        return (StructType)this.schemaOption.getOrElse((Function0 & Serializable & scala.Serializable)() -> {
            TableSchemaResolver schemaUtil = new TableSchemaResolver($this.metaClient);
            return AvroConversionUtils$.MODULE$.convertAvroSchemaToStructType(schemaUtil.getTableAvroSchema());
        });
    }

    private Option<HoodieSourceOffset> getLatestOffset() {
        HoodieTimeline filteredTimeline;
        this.metaClient.reloadActiveTimeline();
        HoodieTimeline hoodieTimeline = filteredTimeline = TimelineUtils.handleHollowCommitIfNeeded(this.metaClient.getActiveTimeline().filterCompletedInstants(), this.metaClient, this.hollowCommitHandlingMode());
        if (!hoodieTimeline.empty()) {
            TimelineUtils.HollowCommitHandling hollowCommitHandling = this.hollowCommitHandlingMode();
            TimelineUtils.HollowCommitHandling hollowCommitHandling2 = TimelineUtils.HollowCommitHandling.USE_TRANSITION_TIME;
            String timestamp = !(hollowCommitHandling != null ? !((Object)((Object)hollowCommitHandling)).equals((Object)hollowCommitHandling2) : hollowCommitHandling2 != null) ? hoodieTimeline.getLatestCompletionTime().get() : hoodieTimeline.lastInstant().get().requestedTime();
            return new Some((Object)new HoodieSourceOffset(timestamp));
        }
        return None$.MODULE$;
    }

    public Option<org.apache.spark.sql.execution.streaming.Offset> getOffset() {
        return this.getLatestOffset();
    }

    public Dataset<Row> getBatch(Option<org.apache.spark.sql.execution.streaming.Offset> start2, org.apache.spark.sql.execution.streaming.Offset end) {
        RDD<Row> rDD;
        HoodieSourceOffset startOffset = (HoodieSourceOffset)((Object)start2.map((Function1 & Serializable & scala.Serializable)x$1 -> HoodieSourceOffset$.MODULE$.apply((org.apache.spark.sql.execution.streaming.Offset)x$1)).getOrElse((Function0 & Serializable & scala.Serializable)() -> this.initialOffsets()));
        HoodieSourceOffset endOffset = HoodieSourceOffset$.MODULE$.apply(end);
        startOffset = new HoodieSourceOffset(this.translateCheckpoint(startOffset.offsetCommitTime()));
        endOffset = new HoodieSourceOffset(this.translateCheckpoint(endOffset.offsetCommitTime()));
        HoodieSourceOffset hoodieSourceOffset = startOffset;
        HoodieSourceOffset hoodieSourceOffset2 = endOffset;
        if (!(hoodieSourceOffset != null ? !((Object)((Object)hoodieSourceOffset)).equals((Object)hoodieSourceOffset2) : hoodieSourceOffset2 != null)) {
            return this.sqlContext.internalCreateDataFrame(this.sqlContext.sparkContext().emptyRDD(ClassTag$.MODULE$.apply(InternalRow.class)).setName("empty"), this.schema(), true);
        }
        if (this.isCDCQuery()) {
            Map cdcOptions = (Map)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)DataSourceReadOptions$.MODULE$.START_COMMIT().key()), (Object)this.startCommitTime(startOffset)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)DataSourceReadOptions$.MODULE$.END_COMMIT().key()), (Object)endOffset.offsetCommitTime())}));
            RDD<InternalRow> rdd = CDCRelation$.MODULE$.getCDCRelation(this.sqlContext, this.metaClient, (Map<String, String>)cdcOptions, CDCRelation$.MODULE$.getCDCRelation$default$4()).buildScan0(HoodieCDCUtils.CDC_COLUMNS, (Filter[])Array$.MODULE$.empty(ClassTag$.MODULE$.apply(Filter.class)));
            return this.sqlContext.sparkSession().internalCreateDataFrame(rdd, CDCRelation$.MODULE$.FULL_CDC_SPARK_SCHEMA(), true);
        }
        Map incParams = this.parameters.$plus$plus((GenTraversableOnce)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)DataSourceReadOptions$.MODULE$.QUERY_TYPE().key()), (Object)DataSourceReadOptions$.MODULE$.QUERY_TYPE_INCREMENTAL_OPT_VAL()), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)DataSourceReadOptions$.MODULE$.START_COMMIT().key()), (Object)this.startCommitTime(startOffset)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)DataSourceReadOptions$.MODULE$.END_COMMIT().key()), (Object)endOffset.offsetCommitTime()), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)DataSourceReadOptions$.MODULE$.INCREMENTAL_READ_HANDLE_HOLLOW_COMMIT().key()), (Object)this.hollowCommitHandlingMode().name())})));
        HoodieTableType hoodieTableType = this.tableType();
        if (((Object)((Object)HoodieTableType.COPY_ON_WRITE)).equals((Object)hoodieTableType)) {
            SparkRowSerDe serDe = this.sparkAdapter().createSparkRowSerDe(this.schema());
            rDD = new IncrementalRelationV1(this.sqlContext, (Map<String, String>)incParams, (Option<StructType>)new Some((Object)this.schema()), this.metaClient).buildScan().map((Function1 & Serializable & scala.Serializable)x$1 -> serDe.serializeRow((Row)x$1), ClassTag$.MODULE$.apply(InternalRow.class));
        } else if (((Object)((Object)HoodieTableType.MERGE_ON_READ)).equals((Object)hoodieTableType)) {
            String[] requiredColumns = (String[])new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[])this.schema().fields())).map((Function1 & Serializable & scala.Serializable)x$2 -> x$2.name(), Array$.MODULE$.canBuildFrom(ClassTag$.MODULE$.apply(String.class)));
            rDD = new MergeOnReadIncrementalRelationV1(this.sqlContext, (Map<String, String>)incParams, this.metaClient, (Option<StructType>)new Some((Object)this.schema()), MergeOnReadIncrementalRelationV1$.MODULE$.$lessinit$greater$default$5()).buildScan(requiredColumns, (Filter[])Array$.MODULE$.empty(ClassTag$.MODULE$.apply(Filter.class)));
        } else {
            throw new IllegalArgumentException(new StringBuilder(21).append("UnSupport tableType: ").append((Object)this.tableType()).toString());
        }
        RDD<Row> rdd = rDD;
        return this.sqlContext.internalCreateDataFrame((RDD)rdd, this.schema(), true);
    }

    private String startCommitTime(HoodieSourceOffset startOffset) {
        HoodieSourceOffset hoodieSourceOffset = startOffset;
        HoodieSourceOffset hoodieSourceOffset2 = HoodieSourceOffset$.MODULE$.INIT_OFFSET();
        HoodieSourceOffset hoodieSourceOffset3 = hoodieSourceOffset;
        if (!(hoodieSourceOffset2 != null ? !((Object)((Object)hoodieSourceOffset2)).equals((Object)hoodieSourceOffset3) : hoodieSourceOffset3 != null)) {
            return startOffset.offsetCommitTime();
        }
        if (hoodieSourceOffset != null) {
            String commitTime = hoodieSourceOffset.offsetCommitTime();
            return commitTime;
        }
        throw new IllegalStateException("UnKnow offset type.");
    }

    private String translateCheckpoint(String commitTime) {
        if (this.writeTableVersion.greaterThanOrEquals(HoodieTableVersion.EIGHT)) {
            return CheckpointUtils.convertToCheckpointV2ForCommitTime(new StreamerCheckpointV1(commitTime), this.metaClient, this.hollowCommitHandlingMode()).getCheckpointKey();
        }
        return commitTime;
    }

    public void stop() {
    }

    public HoodieStreamSourceV1(SQLContext sqlContext, HoodieTableMetaClient metaClient, String metadataPath, Option<StructType> schemaOption, Map<String, String> parameters2, HoodieOffsetRangeLimit offsetRangeLimit, HoodieTableVersion writeTableVersion) {
        this.sqlContext = sqlContext;
        this.metaClient = metaClient;
        this.metadataPath = metadataPath;
        this.schemaOption = schemaOption;
        this.parameters = parameters2;
        this.offsetRangeLimit = offsetRangeLimit;
        this.writeTableVersion = writeTableVersion;
        Source.$init$((Source)this);
        Logging.$init$((Logging)this);
        SparkAdapterSupport.$init$(this);
        this.isCDCQuery = CDCRelation$.MODULE$.isCDCEnabled(metaClient) && parameters2.get((Object)DataSourceReadOptions$.MODULE$.QUERY_TYPE().key()).contains((Object)DataSourceReadOptions$.MODULE$.QUERY_TYPE_INCREMENTAL_OPT_VAL()) && parameters2.get((Object)DataSourceReadOptions$.MODULE$.INCREMENTAL_FORMAT().key()).contains((Object)DataSourceReadOptions$.MODULE$.INCREMENTAL_FORMAT_CDC_VAL());
        this.hollowCommitHandlingMode = (TimelineUtils.HollowCommitHandling)((Object)parameters2.get((Object)DataSourceReadOptions$.MODULE$.INCREMENTAL_READ_HANDLE_HOLLOW_COMMIT().key()).map((Function1 & Serializable & scala.Serializable)x$1 -> TimelineUtils.HollowCommitHandling.valueOf(x$1)).getOrElse((Function0 & Serializable & scala.Serializable)() -> TimelineUtils.HollowCommitHandling.BLOCK));
    }
}

