/*
 * Decompiled with CFR 0.152.
 */
package org.apache.spark.sql.hudi.streaming;

import java.io.Serializable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.AvroConversionUtils$;
import org.apache.hudi.DataSourceReadOptions$;
import org.apache.hudi.IncrementalRelation;
import org.apache.hudi.MergeOnReadIncrementalRelation;
import org.apache.hudi.MergeOnReadIncrementalRelation$;
import org.apache.hudi.SparkAdapterSupport;
import org.apache.hudi.cdc.CDCRelation$;
import org.apache.hudi.client.utils.SparkRowSerDe;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.cdc.HoodieCDCUtils;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineUtils;
import org.apache.hudi.common.util.TablePathUtils;
import org.apache.spark.internal.Logging;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.connector.read.streaming.Offset;
import org.apache.spark.sql.execution.streaming.Source;
import org.apache.spark.sql.hudi.SparkAdapter;
import org.apache.spark.sql.hudi.streaming.HoodieEarliestOffsetRangeLimit$;
import org.apache.spark.sql.hudi.streaming.HoodieLatestOffsetRangeLimit$;
import org.apache.spark.sql.hudi.streaming.HoodieMetadataLog;
import org.apache.spark.sql.hudi.streaming.HoodieOffsetRangeLimit;
import org.apache.spark.sql.hudi.streaming.HoodieSourceOffset;
import org.apache.spark.sql.hudi.streaming.HoodieSourceOffset$;
import org.apache.spark.sql.hudi.streaming.HoodieSpecifiedOffsetRangeLimit;
import org.apache.spark.sql.sources.Filter;
import org.apache.spark.sql.types.StructType;
import org.slf4j.Logger;
import scala.Array$;
import scala.Function0;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef;
import scala.Predef$;
import scala.Some;
import scala.Tuple2;
import scala.collection.GenTraversableOnce;
import scala.collection.Seq;
import scala.collection.immutable.Map;
import scala.collection.mutable.ArrayOps;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;

@ScalaSignature(bytes="\u0006\u0001\u0005%g\u0001B\f\u0019\u0001\u0015B\u0001B\u0012\u0001\u0003\u0002\u0003\u0006Ia\u0012\u0005\t\u0017\u0002\u0011\t\u0011)A\u0005\u0019\"Aq\u000b\u0001B\u0001B\u0003%\u0001\f\u0003\u0005b\u0001\t\u0005\t\u0015!\u0003c\u0011!)\u0007A!A!\u0002\u00131\u0007\"\u00026\u0001\t\u0003Y\u0007b\u0002:\u0001\u0005\u0004%Ia\u001d\u0005\u0007y\u0002\u0001\u000b\u0011\u0002;\t\u0015\u0005\r\u0001\u0001#b\u0001\n\u0013\t)\u0001\u0003\u0006\u0002\u0014\u0001A)\u0019!C\u0005\u0003+A!\"a\n\u0001\u0011\u000b\u0007I\u0011BA\u0015\u0011%\t9\u0004\u0001b\u0001\n\u0013\tI\u0004\u0003\u0005\u0002B\u0001\u0001\u000b\u0011BA\u001e\u0011%\t\u0019\u0005\u0001b\u0001\n\u0013\t)\u0005\u0003\u0005\u0002r\u0001\u0001\u000b\u0011BA$\u0011)\t\u0019\b\u0001EC\u0002\u0013%\u0011Q\u000f\u0005\b\u0003\u007f\u0002A\u0011IAA\u0011\u001d\t\u0019\t\u0001C\u0005\u0003\u000bCq!!#\u0001\t\u0003\nY\tC\u0004\u0002\u0016\u0002!\t%a&\t\u000f\u0005]\u0006\u0001\"\u0003\u0002:\"9\u0011q\u0018\u0001\u0005B\u0005\u0005'A\u0005%p_\u0012LWm\u0015;sK\u0006l7k\\;sG\u0016T!!\u0007\u000e\u0002\u0013M$(/Z1nS:<'BA\u000e\u001d\u0003\u0011AW\u000fZ5\u000b\u0005uq\u0012aA:rY*\u0011q\u0004I\u0001\u0006gB\f'o\u001b\u0006\u0003C\t\na!\u00199bG\",'\"A\u0012\u0002\u0007=\u0014xm\u0001\u0001\u0014\r\u00011c&N\u001eB!\t9C&D\u0001)\u0015\tI#&\u0001\u0003mC:<'\"A\u0016\u0002\t)\fg/Y\u0005\u0003[!\u0012aa\u00142kK\u000e$\bCA\u00184\u001b\u0005\u0001$BA\r2\u0015\t\u0011D$A\u0005fq\u0016\u001cW\u000f^5p]&\u0011A\u0007\r\u0002\u0007'>,(oY3\u0011\u0005YJT\"A\u001c\u000b\u0005ar\u0012\u0001C5oi\u0016\u0014h.\u00197\n\u0005i:$a\u0002'pO\u001eLgn\u001a\t\u0003y}j\u0011!\u0010\u0006\u0002}\u0005)1oY1mC&\u0011\u0001)\u0010\u0002\r'\u0016\u0014\u0018.\u00197ju\u0006\u0014G.\u001a\t\u0003\u0005\u0012k\u0011a\u0011\u0006\u00037\u0001J!!R\"\u0003'M\u0003\u0018M]6BI\u0006\u0004H/\u001a:TkB\u0004xN\u001d;\u0002\u0015M\fHnQ8oi\u0016DH\u000f\u0005\u0002I\u00136\tA$\u0003\u0002K9\tQ1+\u0015'D_:$X\r\u001f;\u0002\u00195,G/\u00193bi\u0006\u0004\u0016\r\u001e5\u0011\u00055#fB\u0001(S!\tyU(D\u0001Q\u0015\t\tF%\u0001\u0004=e>|GOP\u0005\u0003'v\na\u0001\u0015:fI\u00164\u0017BA+W\u0005\u0019\u0019FO]5oO*\u00111+P\u0001\rg\u000eDW-\\1PaRLwN\u001c\t\u0004ye[\u0016B\u0001.>\u0005\u0019y\u0005\u000f^5p]B\u0011AlX\u0007\u0002;*\u0011a\fH\u0001\u0006if\u0004Xm]\u0005\u0003Av\u0013!b\u0015;sk\u000e$H+\u001f9f\u0003)\u0001\u0018M]1nKR,'o\u001d\t\u0005\u001b\u000edE*\u0003\u0002e-\n\u0019Q*\u00199\u0002!=4gm]3u%\u0006tw-\u001a'j[&$\bCA4i\u001b\u0005A\u0012BA5\u0019\u0005YAun\u001c3jK>3gm]3u%\u0006tw-\u001a'j[&$\u0018A\u0002\u001fj]&$h\b\u0006\u0004m[:|\u0007/\u001d\t\u0003O\u0002AQA\u0012\u0004A\u0002\u001dCQa\u0013\u0004A\u00021CQa\u0016\u0004A\u0002aCQ!\u0019\u0004A\u0002\tDQ!\u001a\u0004A\u0002\u0019\f!\u0002[1e_>\u00048i\u001c8g+\u0005!\bCA;{\u001b\u00051(BA<y\u0003\u0011\u0019wN\u001c4\u000b\u0005e\u0004\u0013A\u00025bI>|\u0007/\u0003\u0002|m\ni1i\u001c8gS\u001e,(/\u0019;j_:\f1\u0002[1e_>\u00048i\u001c8gA!\u0012\u0001B \t\u0003y}L1!!\u0001>\u0005%!(/\u00198tS\u0016tG/A\u0005uC\ndW\rU1uQV\u0011\u0011q\u0001\t\u0005\u0003\u0013\ty!\u0004\u0002\u0002\f)\u0019\u0011Q\u0002=\u0002\u0005\u0019\u001c\u0018\u0002BA\t\u0003\u0017\u0011A\u0001U1uQ\u0006QQ.\u001a;b\u00072LWM\u001c;\u0016\u0005\u0005]\u0001\u0003BA\r\u0003Gi!!a\u0007\u000b\t\u0005u\u0011qD\u0001\u0006i\u0006\u0014G.\u001a\u0006\u0004\u0003C\u0019\u0015AB2p[6|g.\u0003\u0003\u0002&\u0005m!!\u0006%p_\u0012LW\rV1cY\u0016lU\r^1DY&,g\u000e^\u0001\ni\u0006\u0014G.\u001a+za\u0016,\"!a\u000b\u0011\t\u00055\u00121G\u0007\u0003\u0003_QA!!\r\u0002 \u0005)Qn\u001c3fY&!\u0011QGA\u0018\u0005=Aun\u001c3jKR\u000b'\r\\3UsB,\u0017AC5t\u0007\u0012\u001b\u0015+^3ssV\u0011\u00111\b\t\u0004y\u0005u\u0012bAA {\t9!i\\8mK\u0006t\u0017aC5t\u0007\u0012\u001b\u0015+^3ss\u0002\nA\u0003[8mY><8i\\7nSRD\u0015M\u001c3mS:<WCAA$!\u0011\tI%a\u001b\u000f\t\u0005-\u0013Q\r\b\u0005\u0003\u001b\n\tG\u0004\u0003\u0002P\u0005}c\u0002BA)\u0003;rA!a\u0015\u0002\\9!\u0011QKA-\u001d\ry\u0015qK\u0005\u0002G%\u0011\u0011EI\u0005\u00037\u0001J1!!\tD\u0013\u0011\ti\"a\b\n\t\u0005\r\u00141D\u0001\ti&lW\r\\5oK&!\u0011qMA5\u00035!\u0016.\\3mS:,W\u000b^5mg*!\u00111MA\u000e\u0013\u0011\ti'a\u001c\u0003)!{G\u000e\\8x\u0007>lW.\u001b;IC:$G.\u001b8h\u0015\u0011\t9'!\u001b\u0002+!|G\u000e\\8x\u0007>lW.\u001b;IC:$G.\u001b8hA\u0005q\u0011N\\5uS\u0006dwJ\u001a4tKR\u001cXCAA<!\r9\u0017\u0011P\u0005\u0004\u0003wB\"A\u0005%p_\u0012LWmU8ve\u000e,wJ\u001a4tKRD#\u0001\u0005@\u0002\rM\u001c\u0007.Z7b+\u0005Y\u0016aD4fi2\u000bG/Z:u\u001f\u001a47/\u001a;\u0016\u0005\u0005\u001d\u0005\u0003\u0002\u001fZ\u0003o\n\u0011bZ3u\u001f\u001a47/\u001a;\u0016\u0005\u00055\u0005\u0003\u0002\u001fZ\u0003\u001f\u00032aLAI\u0013\r\t\u0019\n\r\u0002\u0007\u001f\u001a47/\u001a;\u0002\u0011\u001d,GOQ1uG\"$b!!'\u00020\u0006M\u0006\u0003BAN\u0003SsA!!(\u0002&:!\u0011qTAR\u001d\u0011\t\u0019&!)\n\u0005}\u0001\u0013BA\u000f\u001f\u0013\r\t9\u000bH\u0001\ba\u0006\u001c7.Y4f\u0013\u0011\tY+!,\u0003\u0013\u0011\u000bG/\u0019$sC6,'bAAT9!9\u0011\u0011\u0017\u000bA\u0002\u00055\u0015!B:uCJ$\bbBA[)\u0001\u0007\u0011qR\u0001\u0004K:$\u0017aD:uCJ$8i\\7nSR$\u0016.\\3\u0015\u00071\u000bY\fC\u0004\u0002>V\u0001\r!a\u001e\u0002\u0017M$\u0018M\u001d;PM\u001a\u001cX\r^\u0001\u0005gR|\u0007\u000f\u0006\u0002\u0002DB\u0019A(!2\n\u0007\u0005\u001dWH\u0001\u0003V]&$\b")
public class HoodieStreamSource
implements Source,
Logging,
scala.Serializable,
SparkAdapterSupport {
    private Path tablePath;
    private HoodieTableMetaClient metaClient;
    private HoodieTableType tableType;
    private transient HoodieSourceOffset initialOffsets;
    private final SQLContext sqlContext;
    private final String metadataPath;
    private final Option<StructType> schemaOption;
    private final Map<String, String> parameters;
    private final HoodieOffsetRangeLimit offsetRangeLimit;
    private final transient Configuration hadoopConf;
    private final boolean isCDCQuery;
    private final TimelineUtils.HollowCommitHandling hollowCommitHandling;
    private SparkAdapter sparkAdapter;
    private transient Logger org$apache$spark$internal$Logging$$log_;
    private volatile transient boolean bitmap$trans$0;
    private volatile byte bitmap$0;

    public String logName() {
        return Logging.logName$((Logging)this);
    }

    public Logger log() {
        return Logging.log$((Logging)this);
    }

    public void logInfo(Function0<String> msg) {
        Logging.logInfo$((Logging)this, msg);
    }

    public void logDebug(Function0<String> msg) {
        Logging.logDebug$((Logging)this, msg);
    }

    public void logTrace(Function0<String> msg) {
        Logging.logTrace$((Logging)this, msg);
    }

    public void logWarning(Function0<String> msg) {
        Logging.logWarning$((Logging)this, msg);
    }

    public void logError(Function0<String> msg) {
        Logging.logError$((Logging)this, msg);
    }

    public void logInfo(Function0<String> msg, Throwable throwable) {
        Logging.logInfo$((Logging)this, msg, (Throwable)throwable);
    }

    public void logDebug(Function0<String> msg, Throwable throwable) {
        Logging.logDebug$((Logging)this, msg, (Throwable)throwable);
    }

    public void logTrace(Function0<String> msg, Throwable throwable) {
        Logging.logTrace$((Logging)this, msg, (Throwable)throwable);
    }

    public void logWarning(Function0<String> msg, Throwable throwable) {
        Logging.logWarning$((Logging)this, msg, (Throwable)throwable);
    }

    public void logError(Function0<String> msg, Throwable throwable) {
        Logging.logError$((Logging)this, msg, (Throwable)throwable);
    }

    public boolean isTraceEnabled() {
        return Logging.isTraceEnabled$((Logging)this);
    }

    public void initializeLogIfNecessary(boolean isInterpreter) {
        Logging.initializeLogIfNecessary$((Logging)this, (boolean)isInterpreter);
    }

    public boolean initializeLogIfNecessary(boolean isInterpreter, boolean silent) {
        return Logging.initializeLogIfNecessary$((Logging)this, (boolean)isInterpreter, (boolean)silent);
    }

    public boolean initializeLogIfNecessary$default$2() {
        return Logging.initializeLogIfNecessary$default$2$((Logging)this);
    }

    public void initializeForcefully(boolean isInterpreter, boolean silent) {
        Logging.initializeForcefully$((Logging)this, (boolean)isInterpreter, (boolean)silent);
    }

    public void commit(org.apache.spark.sql.execution.streaming.Offset end) {
        Source.commit$((Source)this, (org.apache.spark.sql.execution.streaming.Offset)end);
    }

    public Offset initialOffset() {
        return Source.initialOffset$((Source)this);
    }

    public Offset deserializeOffset(String json) {
        return Source.deserializeOffset$((Source)this, (String)json);
    }

    public void commit(Offset end) {
        Source.commit$((Source)this, (Offset)end);
    }

    private SparkAdapter sparkAdapter$lzycompute() {
        HoodieStreamSource hoodieStreamSource = this;
        synchronized (hoodieStreamSource) {
            if ((byte)(this.bitmap$0 & 8) == 0) {
                this.sparkAdapter = SparkAdapterSupport.sparkAdapter$(this);
                this.bitmap$0 = (byte)(this.bitmap$0 | 8);
            }
        }
        return this.sparkAdapter;
    }

    @Override
    public SparkAdapter sparkAdapter() {
        return (byte)(this.bitmap$0 & 8) == 0 ? this.sparkAdapter$lzycompute() : this.sparkAdapter;
    }

    public Logger org$apache$spark$internal$Logging$$log_() {
        return this.org$apache$spark$internal$Logging$$log_;
    }

    public void org$apache$spark$internal$Logging$$log__$eq(Logger x$1) {
        this.org$apache$spark$internal$Logging$$log_ = x$1;
    }

    private Configuration hadoopConf() {
        return this.hadoopConf;
    }

    private Path tablePath$lzycompute() {
        HoodieStreamSource hoodieStreamSource = this;
        synchronized (hoodieStreamSource) {
            if ((byte)(this.bitmap$0 & 1) == 0) {
                Path path = new Path((String)this.parameters.getOrElse((Object)"path", (Function0 & Serializable & scala.Serializable)() -> "Missing 'path' option"));
                FileSystem fs = path.getFileSystem(this.hadoopConf());
                this.tablePath = TablePathUtils.getTablePath(fs, path).get();
                this.bitmap$0 = (byte)(this.bitmap$0 | 1);
            }
        }
        return this.tablePath;
    }

    private Path tablePath() {
        return (byte)(this.bitmap$0 & 1) == 0 ? this.tablePath$lzycompute() : this.tablePath;
    }

    private HoodieTableMetaClient metaClient$lzycompute() {
        HoodieStreamSource hoodieStreamSource = this;
        synchronized (hoodieStreamSource) {
            if ((byte)(this.bitmap$0 & 2) == 0) {
                this.metaClient = HoodieTableMetaClient.builder().setConf(this.hadoopConf()).setBasePath(this.tablePath().toString()).build();
                this.bitmap$0 = (byte)(this.bitmap$0 | 2);
            }
        }
        return this.metaClient;
    }

    private HoodieTableMetaClient metaClient() {
        return (byte)(this.bitmap$0 & 2) == 0 ? this.metaClient$lzycompute() : this.metaClient;
    }

    private HoodieTableType tableType$lzycompute() {
        HoodieStreamSource hoodieStreamSource = this;
        synchronized (hoodieStreamSource) {
            if ((byte)(this.bitmap$0 & 4) == 0) {
                this.tableType = this.metaClient().getTableType();
                this.bitmap$0 = (byte)(this.bitmap$0 | 4);
            }
        }
        return this.tableType;
    }

    private HoodieTableType tableType() {
        return (byte)(this.bitmap$0 & 4) == 0 ? this.tableType$lzycompute() : this.tableType;
    }

    private boolean isCDCQuery() {
        return this.isCDCQuery;
    }

    private TimelineUtils.HollowCommitHandling hollowCommitHandling() {
        return this.hollowCommitHandling;
    }

    private HoodieSourceOffset initialOffsets$lzycompute() {
        HoodieStreamSource hoodieStreamSource = this;
        synchronized (hoodieStreamSource) {
            if (!this.bitmap$trans$0) {
                HoodieMetadataLog metadataLog = new HoodieMetadataLog(this.sqlContext.sparkSession(), this.metadataPath);
                this.initialOffsets = (HoodieSourceOffset)((Object)metadataLog.get(0L).getOrElse((Function0 & Serializable & scala.Serializable)() -> {
                    void var3_6;
                    HoodieSourceOffset hoodieSourceOffset;
                    HoodieOffsetRangeLimit hoodieOffsetRangeLimit = $this.offsetRangeLimit;
                    if (HoodieEarliestOffsetRangeLimit$.MODULE$.equals(hoodieOffsetRangeLimit)) {
                        hoodieSourceOffset = HoodieSourceOffset$.MODULE$.INIT_OFFSET();
                    } else if (HoodieLatestOffsetRangeLimit$.MODULE$.equals(hoodieOffsetRangeLimit)) {
                        hoodieSourceOffset = (HoodieSourceOffset)((Object)((Object)this.getLatestOffset().getOrElse((Function0 & Serializable & scala.Serializable)() -> HoodieSourceOffset$.MODULE$.INIT_OFFSET())));
                    } else if (hoodieOffsetRangeLimit instanceof HoodieSpecifiedOffsetRangeLimit) {
                        HoodieSpecifiedOffsetRangeLimit hoodieSpecifiedOffsetRangeLimit = (HoodieSpecifiedOffsetRangeLimit)hoodieOffsetRangeLimit;
                        String instantTime = hoodieSpecifiedOffsetRangeLimit.instantTime();
                        hoodieSourceOffset = new HoodieSourceOffset(instantTime);
                    } else {
                        throw new MatchError((Object)hoodieOffsetRangeLimit);
                    }
                    HoodieSourceOffset offset = hoodieSourceOffset;
                    metadataLog.add(0L, (Object)offset);
                    this.logInfo((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(22).append("The initial offset is ").append((Object)offset).toString());
                    return var3_6;
                }));
                this.bitmap$trans$0 = true;
            }
        }
        return this.initialOffsets;
    }

    private HoodieSourceOffset initialOffsets() {
        return !this.bitmap$trans$0 ? this.initialOffsets$lzycompute() : this.initialOffsets;
    }

    public StructType schema() {
        return this.isCDCQuery() ? CDCRelation$.MODULE$.FULL_CDC_SPARK_SCHEMA() : (StructType)this.schemaOption.getOrElse((Function0 & Serializable & scala.Serializable)() -> {
            TableSchemaResolver schemaUtil = new TableSchemaResolver(this.metaClient());
            return AvroConversionUtils$.MODULE$.convertAvroSchemaToStructType(schemaUtil.getTableAvroSchema());
        });
    }

    private Option<HoodieSourceOffset> getLatestOffset() {
        None$ none$;
        HoodieTimeline filteredTimeline;
        this.metaClient().reloadActiveTimeline();
        HoodieTimeline hoodieTimeline = filteredTimeline = TimelineUtils.handleHollowCommitIfNeeded(this.metaClient().getActiveTimeline().filterCompletedInstants(), this.metaClient(), this.hollowCommitHandling());
        if (!hoodieTimeline.empty()) {
            TimelineUtils.HollowCommitHandling hollowCommitHandling = this.hollowCommitHandling();
            TimelineUtils.HollowCommitHandling hollowCommitHandling2 = TimelineUtils.HollowCommitHandling.USE_TRANSITION_TIME;
            String timestamp = !(hollowCommitHandling != null ? !((Object)((Object)hollowCommitHandling)).equals((Object)hollowCommitHandling2) : hollowCommitHandling2 != null) ? hoodieTimeline.getInstantsOrderedByStateTransitionTime().skip(hoodieTimeline.countInstants() - 1).findFirst().get().getStateTransitionTime() : hoodieTimeline.lastInstant().get().getTimestamp();
            none$ = new Some((Object)new HoodieSourceOffset(timestamp));
        } else {
            none$ = None$.MODULE$;
        }
        return none$;
    }

    public Option<org.apache.spark.sql.execution.streaming.Offset> getOffset() {
        return this.getLatestOffset();
    }

    public Dataset<Row> getBatch(Option<org.apache.spark.sql.execution.streaming.Offset> start2, org.apache.spark.sql.execution.streaming.Offset end) {
        Dataset dataset;
        HoodieSourceOffset startOffset = (HoodieSourceOffset)((Object)start2.map((Function1 & Serializable & scala.Serializable)x$1 -> HoodieSourceOffset$.MODULE$.apply((org.apache.spark.sql.execution.streaming.Offset)x$1)).getOrElse((Function0 & Serializable & scala.Serializable)() -> this.initialOffsets()));
        HoodieSourceOffset endOffset = HoodieSourceOffset$.MODULE$.apply(end);
        HoodieSourceOffset hoodieSourceOffset = startOffset;
        HoodieSourceOffset hoodieSourceOffset2 = endOffset;
        if (!(hoodieSourceOffset != null ? !((Object)((Object)hoodieSourceOffset)).equals((Object)hoodieSourceOffset2) : hoodieSourceOffset2 != null)) {
            dataset = this.sqlContext.internalCreateDataFrame(this.sqlContext.sparkContext().emptyRDD(ClassTag$.MODULE$.apply(InternalRow.class)).setName("empty"), this.schema(), true);
        } else if (this.isCDCQuery()) {
            Map cdcOptions = (Map)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)DataSourceReadOptions$.MODULE$.BEGIN_INSTANTTIME().key()), (Object)this.startCommitTime(startOffset)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)DataSourceReadOptions$.MODULE$.END_INSTANTTIME().key()), (Object)endOffset.commitTime())}));
            RDD<InternalRow> rdd = CDCRelation$.MODULE$.getCDCRelation(this.sqlContext, this.metaClient(), (Map<String, String>)cdcOptions).buildScan0(HoodieCDCUtils.CDC_COLUMNS, (Filter[])Array$.MODULE$.empty(ClassTag$.MODULE$.apply(Filter.class)));
            dataset = this.sqlContext.sparkSession().internalCreateDataFrame(rdd, CDCRelation$.MODULE$.FULL_CDC_SPARK_SCHEMA(), true);
        } else {
            RDD<Row> rDD;
            Map incParams = this.parameters.$plus$plus((GenTraversableOnce)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)DataSourceReadOptions$.MODULE$.QUERY_TYPE().key()), (Object)DataSourceReadOptions$.MODULE$.QUERY_TYPE_INCREMENTAL_OPT_VAL()), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)DataSourceReadOptions$.MODULE$.BEGIN_INSTANTTIME().key()), (Object)this.startCommitTime(startOffset)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)DataSourceReadOptions$.MODULE$.END_INSTANTTIME().key()), (Object)endOffset.commitTime()), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)DataSourceReadOptions$.MODULE$.INCREMENTAL_READ_HANDLE_HOLLOW_COMMIT().key()), (Object)this.hollowCommitHandling().name())})));
            HoodieTableType hoodieTableType = this.tableType();
            if (((Object)((Object)HoodieTableType.COPY_ON_WRITE)).equals((Object)hoodieTableType)) {
                SparkRowSerDe serDe = this.sparkAdapter().createSparkRowSerDe(this.schema());
                rDD = new IncrementalRelation(this.sqlContext, (Map<String, String>)incParams, (Option<StructType>)new Some((Object)this.schema()), this.metaClient()).buildScan().map((Function1 & Serializable & scala.Serializable)x$1 -> serDe.serializeRow((Row)x$1), ClassTag$.MODULE$.apply(InternalRow.class));
            } else if (((Object)((Object)HoodieTableType.MERGE_ON_READ)).equals((Object)hoodieTableType)) {
                String[] requiredColumns = (String[])new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[])this.schema().fields())).map((Function1 & Serializable & scala.Serializable)x$2 -> x$2.name(), Array$.MODULE$.canBuildFrom(ClassTag$.MODULE$.apply(String.class)));
                rDD = new MergeOnReadIncrementalRelation(this.sqlContext, (Map<String, String>)incParams, this.metaClient(), (Option<StructType>)new Some((Object)this.schema()), MergeOnReadIncrementalRelation$.MODULE$.$lessinit$greater$default$5()).buildScan(requiredColumns, (Filter[])Array$.MODULE$.empty(ClassTag$.MODULE$.apply(Filter.class)));
            } else {
                throw new IllegalArgumentException(new StringBuilder(21).append("UnSupport tableType: ").append((Object)this.tableType()).toString());
            }
            RDD<Row> rdd = rDD;
            dataset = this.sqlContext.internalCreateDataFrame((RDD)rdd, this.schema(), true);
        }
        return dataset;
    }

    private String startCommitTime(HoodieSourceOffset startOffset) {
        String string;
        HoodieSourceOffset hoodieSourceOffset = startOffset;
        HoodieSourceOffset hoodieSourceOffset2 = HoodieSourceOffset$.MODULE$.INIT_OFFSET();
        HoodieSourceOffset hoodieSourceOffset3 = hoodieSourceOffset;
        if (!(hoodieSourceOffset2 != null ? !((Object)((Object)hoodieSourceOffset2)).equals((Object)hoodieSourceOffset3) : hoodieSourceOffset3 != null)) {
            string = startOffset.commitTime();
        } else if (hoodieSourceOffset != null) {
            String commitTime;
            string = commitTime = hoodieSourceOffset.commitTime();
        } else {
            throw new IllegalStateException("UnKnow offset type.");
        }
        return string;
    }

    public void stop() {
    }

    public HoodieStreamSource(SQLContext sqlContext, String metadataPath, Option<StructType> schemaOption, Map<String, String> parameters, HoodieOffsetRangeLimit offsetRangeLimit) {
        this.sqlContext = sqlContext;
        this.metadataPath = metadataPath;
        this.schemaOption = schemaOption;
        this.parameters = parameters;
        this.offsetRangeLimit = offsetRangeLimit;
        Source.$init$((Source)this);
        Logging.$init$((Logging)this);
        SparkAdapterSupport.$init$(this);
        this.hadoopConf = sqlContext.sparkSession().sessionState().newHadoopConf();
        this.isCDCQuery = CDCRelation$.MODULE$.isCDCEnabled(this.metaClient()) && parameters.get((Object)DataSourceReadOptions$.MODULE$.QUERY_TYPE().key()).contains((Object)DataSourceReadOptions$.MODULE$.QUERY_TYPE_INCREMENTAL_OPT_VAL()) && parameters.get((Object)DataSourceReadOptions$.MODULE$.INCREMENTAL_FORMAT().key()).contains((Object)DataSourceReadOptions$.MODULE$.INCREMENTAL_FORMAT_CDC_VAL());
        this.hollowCommitHandling = (TimelineUtils.HollowCommitHandling)((Object)parameters.get((Object)DataSourceReadOptions$.MODULE$.INCREMENTAL_READ_HANDLE_HOLLOW_COMMIT().key()).map((Function1 & Serializable & scala.Serializable)x$1 -> TimelineUtils.HollowCommitHandling.valueOf(x$1)).getOrElse((Function0 & Serializable & scala.Serializable)() -> TimelineUtils.HollowCommitHandling.BLOCK));
    }
}

