/*
 * Decompiled with CFR 0.152.
 */
package org.apache.spark.sql.hudi.streaming;

import java.io.Serializable;
import java.util.Date;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.AvroConversionUtils$;
import org.apache.hudi.DataSourceReadOptions$;
import org.apache.hudi.IncrementalRelation;
import org.apache.hudi.MergeOnReadIncrementalRelation;
import org.apache.hudi.MergeOnReadIncrementalRelation$;
import org.apache.hudi.SparkAdapterSupport;
import org.apache.hudi.cdc.CDCRelation$;
import org.apache.hudi.client.utils.SparkRowSerDe;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.cdc.HoodieCDCUtils;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.TablePathUtils;
import org.apache.spark.internal.Logging;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.connector.read.streaming.Offset;
import org.apache.spark.sql.execution.streaming.Source;
import org.apache.spark.sql.hudi.SparkAdapter;
import org.apache.spark.sql.hudi.streaming.HoodieEarliestOffsetRangeLimit$;
import org.apache.spark.sql.hudi.streaming.HoodieLatestOffsetRangeLimit$;
import org.apache.spark.sql.hudi.streaming.HoodieMetadataLog;
import org.apache.spark.sql.hudi.streaming.HoodieOffsetRangeLimit;
import org.apache.spark.sql.hudi.streaming.HoodieSourceOffset;
import org.apache.spark.sql.hudi.streaming.HoodieSourceOffset$;
import org.apache.spark.sql.hudi.streaming.HoodieSpecifiedOffsetRangeLimit;
import org.apache.spark.sql.sources.Filter;
import org.apache.spark.sql.types.StructType;
import org.slf4j.Logger;
import scala.Array$;
import scala.Function0;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef;
import scala.Predef$;
import scala.Some;
import scala.Tuple2;
import scala.collection.GenTraversableOnce;
import scala.collection.Seq;
import scala.collection.immutable.Map;
import scala.collection.mutable.ArrayOps;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;

@ScalaSignature(bytes="\u0006\u0001\u0005ue\u0001B\u000b\u0017\u0001\rB\u0001\u0002\u0012\u0001\u0003\u0002\u0003\u0006I!\u0012\u0005\t\u0013\u0002\u0011\t\u0011)A\u0005\u0015\"AQ\u000b\u0001B\u0001B\u0003%a\u000b\u0003\u0005`\u0001\t\u0005\t\u0015!\u0003a\u0011!\u0019\u0007A!A!\u0002\u0013!\u0007\"\u00025\u0001\t\u0003I\u0007b\u00029\u0001\u0005\u0004%I!\u001d\u0005\u0007u\u0002\u0001\u000b\u0011\u0002:\t\u0013}\u0004\u0001R1A\u0005\n\u0005\u0005\u0001BCA\b\u0001!\u0015\r\u0011\"\u0003\u0002\u0012!Q\u00111\u0005\u0001\t\u0006\u0004%I!!\n\t\u0013\u0005M\u0002A1A\u0005\n\u0005U\u0002\u0002CA\u001f\u0001\u0001\u0006I!a\u000e\t\u0015\u0005}\u0002\u0001#b\u0001\n\u0013\t\t\u0005C\u0004\u0002L\u0001!\t%!\u0014\t\u000f\u0005=\u0003\u0001\"\u0003\u0002R!9\u0011Q\u000b\u0001\u0005B\u0005]\u0003bBA1\u0001\u0011\u0005\u00131\r\u0005\b\u0003\u0017\u0003A\u0011BAG\u0011\u001d\t\u0019\n\u0001C!\u0003+\u0013!\u0003S8pI&,7\u000b\u001e:fC6\u001cv.\u001e:dK*\u0011q\u0003G\u0001\ngR\u0014X-Y7j]\u001eT!!\u0007\u000e\u0002\t!,H-\u001b\u0006\u00037q\t1a]9m\u0015\tib$A\u0003ta\u0006\u00148N\u0003\u0002 A\u00051\u0011\r]1dQ\u0016T\u0011!I\u0001\u0004_J<7\u0001A\n\u0007\u0001\u0011b3'O \u0011\u0005\u0015RS\"\u0001\u0014\u000b\u0005\u001dB\u0013\u0001\u00027b]\u001eT\u0011!K\u0001\u0005U\u00064\u0018-\u0003\u0002,M\t1qJ\u00196fGR\u0004\"!L\u0019\u000e\u00039R!aF\u0018\u000b\u0005AR\u0012!C3yK\u000e,H/[8o\u0013\t\u0011dF\u0001\u0004T_V\u00148-\u001a\t\u0003i]j\u0011!\u000e\u0006\u0003mq\t\u0001\"\u001b8uKJt\u0017\r\\\u0005\u0003qU\u0012q\u0001T8hO&tw\r\u0005\u0002;{5\t1HC\u0001=\u0003\u0015\u00198-\u00197b\u0013\tq4H\u0001\u0007TKJL\u0017\r\\5{C\ndW\r\u0005\u0002A\u00056\t\u0011I\u0003\u0002\u001a=%\u00111)\u0011\u0002\u0014'B\f'o[!eCB$XM]*vaB|'\u000f^\u0001\u000bgFd7i\u001c8uKb$\bC\u0001$H\u001b\u0005Q\u0012B\u0001%\u001b\u0005)\u0019\u0016\u000bT\"p]R,\u0007\u0010^\u0001\r[\u0016$\u0018\rZ1uCB\u000bG\u000f\u001b\t\u0003\u0017Js!\u0001\u0014)\u0011\u00055[T\"\u0001(\u000b\u0005=\u0013\u0013A\u0002\u001fs_>$h(\u0003\u0002Rw\u00051\u0001K]3eK\u001aL!a\u0015+\u0003\rM#(/\u001b8h\u0015\t\t6(\u0001\u0007tG\",W.Y(qi&|g\u000eE\u0002;/fK!\u0001W\u001e\u0003\r=\u0003H/[8o!\tQV,D\u0001\\\u0015\ta&$A\u0003usB,7/\u0003\u0002_7\nQ1\u000b\u001e:vGR$\u0016\u0010]3\u0002\u0015A\f'/Y7fi\u0016\u00148\u000f\u0005\u0003LC*S\u0015B\u00012U\u0005\ri\u0015\r]\u0001\u0011_\u001a47/\u001a;SC:<W\rT5nSR\u0004\"!\u001a4\u000e\u0003YI!a\u001a\f\u0003-!{w\u000eZ5f\u001f\u001a47/\u001a;SC:<W\rT5nSR\fa\u0001P5oSRtDC\u00026lY6tw\u000e\u0005\u0002f\u0001!)AI\u0002a\u0001\u000b\")\u0011J\u0002a\u0001\u0015\")QK\u0002a\u0001-\")qL\u0002a\u0001A\")1M\u0002a\u0001I\u0006Q\u0001.\u00193p_B\u001cuN\u001c4\u0016\u0003I\u0004\"a\u001d=\u000e\u0003QT!!\u001e<\u0002\t\r|gN\u001a\u0006\u0003oz\ta\u0001[1e_>\u0004\u0018BA=u\u00055\u0019uN\u001c4jOV\u0014\u0018\r^5p]\u0006Y\u0001.\u00193p_B\u001cuN\u001c4!Q\tAA\u0010\u0005\u0002;{&\u0011ap\u000f\u0002\niJ\fgn]5f]R\f\u0011\u0002^1cY\u0016\u0004\u0016\r\u001e5\u0016\u0005\u0005\r\u0001\u0003BA\u0003\u0003\u0017i!!a\u0002\u000b\u0007\u0005%a/\u0001\u0002gg&!\u0011QBA\u0004\u0005\u0011\u0001\u0016\r\u001e5\u0002\u00155,G/Y\"mS\u0016tG/\u0006\u0002\u0002\u0014A!\u0011QCA\u0010\u001b\t\t9B\u0003\u0003\u0002\u001a\u0005m\u0011!\u0002;bE2,'bAA\u000f\u0003\u000611m\\7n_:LA!!\t\u0002\u0018\t)\u0002j\\8eS\u0016$\u0016M\u00197f\u001b\u0016$\u0018m\u00117jK:$\u0018!\u0003;bE2,G+\u001f9f+\t\t9\u0003\u0005\u0003\u0002*\u0005=RBAA\u0016\u0015\u0011\ti#a\u0007\u0002\u000b5|G-\u001a7\n\t\u0005E\u00121\u0006\u0002\u0010\u0011>|G-[3UC\ndW\rV=qK\u0006Q\u0011n]\"E\u0007F+XM]=\u0016\u0005\u0005]\u0002c\u0001\u001e\u0002:%\u0019\u00111H\u001e\u0003\u000f\t{w\u000e\\3b]\u0006Y\u0011n]\"E\u0007F+XM]=!\u00039Ig.\u001b;jC2|eMZ:fiN,\"!a\u0011\u0011\u0007\u0015\f)%C\u0002\u0002HY\u0011!\u0003S8pI&,7k\\;sG\u0016|eMZ:fi\"\u0012a\u0002`\u0001\u0007g\u000eDW-\\1\u0016\u0003e\u000bqbZ3u\u0019\u0006$Xm\u001d;PM\u001a\u001cX\r^\u000b\u0003\u0003'\u0002BAO,\u0002D\u0005Iq-\u001a;PM\u001a\u001cX\r^\u000b\u0003\u00033\u0002BAO,\u0002\\A\u0019Q&!\u0018\n\u0007\u0005}cF\u0001\u0004PM\u001a\u001cX\r^\u0001\tO\u0016$()\u0019;dQR1\u0011QMAB\u0003\u000f\u0003B!a\u001a\u0002~9!\u0011\u0011NA=\u001d\u0011\tY'a\u001e\u000f\t\u00055\u0014Q\u000f\b\u0005\u0003_\n\u0019HD\u0002N\u0003cJ\u0011!I\u0005\u0003?\u0001J!!\b\u0010\n\u0005ma\u0012bAA>5\u00059\u0001/Y2lC\u001e,\u0017\u0002BA@\u0003\u0003\u0013\u0011\u0002R1uC\u001a\u0013\u0018-\\3\u000b\u0007\u0005m$\u0004C\u0004\u0002\u0006J\u0001\r!!\u0017\u0002\u000bM$\u0018M\u001d;\t\u000f\u0005%%\u00031\u0001\u0002\\\u0005\u0019QM\u001c3\u0002\u001fM$\u0018M\u001d;D_6l\u0017\u000e\u001e+j[\u0016$2ASAH\u0011\u001d\t\tj\u0005a\u0001\u0003\u0007\n1b\u001d;beR|eMZ:fi\u0006!1\u000f^8q)\t\t9\nE\u0002;\u00033K1!a'<\u0005\u0011)f.\u001b;")
public class HoodieStreamSource
implements Source,
Logging,
scala.Serializable,
SparkAdapterSupport {
    private Path tablePath;
    private HoodieTableMetaClient metaClient;
    private HoodieTableType tableType;
    private transient HoodieSourceOffset initialOffsets;
    private final SQLContext sqlContext;
    private final String metadataPath;
    private final Option<StructType> schemaOption;
    private final Map<String, String> parameters;
    private final HoodieOffsetRangeLimit offsetRangeLimit;
    private final transient Configuration hadoopConf;
    private final boolean isCDCQuery;
    private SparkAdapter sparkAdapter;
    private transient Logger org$apache$spark$internal$Logging$$log_;
    private volatile transient boolean bitmap$trans$0;
    private volatile byte bitmap$0;

    public String logName() {
        return Logging.logName$((Logging)this);
    }

    public Logger log() {
        return Logging.log$((Logging)this);
    }

    public void logInfo(Function0<String> msg) {
        Logging.logInfo$((Logging)this, msg);
    }

    public void logDebug(Function0<String> msg) {
        Logging.logDebug$((Logging)this, msg);
    }

    public void logTrace(Function0<String> msg) {
        Logging.logTrace$((Logging)this, msg);
    }

    public void logWarning(Function0<String> msg) {
        Logging.logWarning$((Logging)this, msg);
    }

    public void logError(Function0<String> msg) {
        Logging.logError$((Logging)this, msg);
    }

    public void logInfo(Function0<String> msg, Throwable throwable) {
        Logging.logInfo$((Logging)this, msg, (Throwable)throwable);
    }

    public void logDebug(Function0<String> msg, Throwable throwable) {
        Logging.logDebug$((Logging)this, msg, (Throwable)throwable);
    }

    public void logTrace(Function0<String> msg, Throwable throwable) {
        Logging.logTrace$((Logging)this, msg, (Throwable)throwable);
    }

    public void logWarning(Function0<String> msg, Throwable throwable) {
        Logging.logWarning$((Logging)this, msg, (Throwable)throwable);
    }

    public void logError(Function0<String> msg, Throwable throwable) {
        Logging.logError$((Logging)this, msg, (Throwable)throwable);
    }

    public boolean isTraceEnabled() {
        return Logging.isTraceEnabled$((Logging)this);
    }

    public void initializeLogIfNecessary(boolean isInterpreter) {
        Logging.initializeLogIfNecessary$((Logging)this, (boolean)isInterpreter);
    }

    public boolean initializeLogIfNecessary(boolean isInterpreter, boolean silent) {
        return Logging.initializeLogIfNecessary$((Logging)this, (boolean)isInterpreter, (boolean)silent);
    }

    public boolean initializeLogIfNecessary$default$2() {
        return Logging.initializeLogIfNecessary$default$2$((Logging)this);
    }

    public void initializeForcefully(boolean isInterpreter, boolean silent) {
        Logging.initializeForcefully$((Logging)this, (boolean)isInterpreter, (boolean)silent);
    }

    public void commit(org.apache.spark.sql.execution.streaming.Offset end) {
        Source.commit$((Source)this, (org.apache.spark.sql.execution.streaming.Offset)end);
    }

    public Offset initialOffset() {
        return Source.initialOffset$((Source)this);
    }

    public Offset deserializeOffset(String json) {
        return Source.deserializeOffset$((Source)this, (String)json);
    }

    public void commit(Offset end) {
        Source.commit$((Source)this, (Offset)end);
    }

    private SparkAdapter sparkAdapter$lzycompute() {
        HoodieStreamSource hoodieStreamSource = this;
        synchronized (hoodieStreamSource) {
            if ((byte)(this.bitmap$0 & 8) == 0) {
                this.sparkAdapter = SparkAdapterSupport.sparkAdapter$(this);
                this.bitmap$0 = (byte)(this.bitmap$0 | 8);
            }
        }
        return this.sparkAdapter;
    }

    @Override
    public SparkAdapter sparkAdapter() {
        return (byte)(this.bitmap$0 & 8) == 0 ? this.sparkAdapter$lzycompute() : this.sparkAdapter;
    }

    public Logger org$apache$spark$internal$Logging$$log_() {
        return this.org$apache$spark$internal$Logging$$log_;
    }

    public void org$apache$spark$internal$Logging$$log__$eq(Logger x$1) {
        this.org$apache$spark$internal$Logging$$log_ = x$1;
    }

    private Configuration hadoopConf() {
        return this.hadoopConf;
    }

    private Path tablePath$lzycompute() {
        HoodieStreamSource hoodieStreamSource = this;
        synchronized (hoodieStreamSource) {
            if ((byte)(this.bitmap$0 & 1) == 0) {
                Path path = new Path((String)this.parameters.getOrElse((Object)"path", (Function0 & Serializable & scala.Serializable)() -> "Missing 'path' option"));
                FileSystem fs = path.getFileSystem(this.hadoopConf());
                this.tablePath = TablePathUtils.getTablePath(fs, path).get();
                this.bitmap$0 = (byte)(this.bitmap$0 | 1);
            }
        }
        return this.tablePath;
    }

    private Path tablePath() {
        return (byte)(this.bitmap$0 & 1) == 0 ? this.tablePath$lzycompute() : this.tablePath;
    }

    private HoodieTableMetaClient metaClient$lzycompute() {
        HoodieStreamSource hoodieStreamSource = this;
        synchronized (hoodieStreamSource) {
            if ((byte)(this.bitmap$0 & 2) == 0) {
                this.metaClient = HoodieTableMetaClient.builder().setConf(this.hadoopConf()).setBasePath(this.tablePath().toString()).build();
                this.bitmap$0 = (byte)(this.bitmap$0 | 2);
            }
        }
        return this.metaClient;
    }

    private HoodieTableMetaClient metaClient() {
        return (byte)(this.bitmap$0 & 2) == 0 ? this.metaClient$lzycompute() : this.metaClient;
    }

    private HoodieTableType tableType$lzycompute() {
        HoodieStreamSource hoodieStreamSource = this;
        synchronized (hoodieStreamSource) {
            if ((byte)(this.bitmap$0 & 4) == 0) {
                this.tableType = this.metaClient().getTableType();
                this.bitmap$0 = (byte)(this.bitmap$0 | 4);
            }
        }
        return this.tableType;
    }

    private HoodieTableType tableType() {
        return (byte)(this.bitmap$0 & 4) == 0 ? this.tableType$lzycompute() : this.tableType;
    }

    private boolean isCDCQuery() {
        return this.isCDCQuery;
    }

    private HoodieSourceOffset initialOffsets$lzycompute() {
        HoodieStreamSource hoodieStreamSource = this;
        synchronized (hoodieStreamSource) {
            if (!this.bitmap$trans$0) {
                HoodieMetadataLog metadataLog = new HoodieMetadataLog(this.sqlContext.sparkSession(), this.metadataPath);
                this.initialOffsets = (HoodieSourceOffset)((Object)metadataLog.get(0L).getOrElse((Function0 & Serializable & scala.Serializable)() -> {
                    void var3_6;
                    HoodieSourceOffset hoodieSourceOffset;
                    HoodieOffsetRangeLimit hoodieOffsetRangeLimit = $this.offsetRangeLimit;
                    if (HoodieEarliestOffsetRangeLimit$.MODULE$.equals(hoodieOffsetRangeLimit)) {
                        hoodieSourceOffset = HoodieSourceOffset$.MODULE$.INIT_OFFSET();
                    } else if (HoodieLatestOffsetRangeLimit$.MODULE$.equals(hoodieOffsetRangeLimit)) {
                        hoodieSourceOffset = (HoodieSourceOffset)((Object)((Object)this.getLatestOffset().getOrElse((Function0 & Serializable & scala.Serializable)() -> HoodieSourceOffset$.MODULE$.INIT_OFFSET())));
                    } else if (hoodieOffsetRangeLimit instanceof HoodieSpecifiedOffsetRangeLimit) {
                        HoodieSpecifiedOffsetRangeLimit hoodieSpecifiedOffsetRangeLimit = (HoodieSpecifiedOffsetRangeLimit)hoodieOffsetRangeLimit;
                        String instantTime = hoodieSpecifiedOffsetRangeLimit.instantTime();
                        hoodieSourceOffset = new HoodieSourceOffset(instantTime);
                    } else {
                        throw new MatchError((Object)hoodieOffsetRangeLimit);
                    }
                    HoodieSourceOffset offset = hoodieSourceOffset;
                    metadataLog.add(0L, (Object)offset);
                    this.logInfo((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(22).append("The initial offset is ").append((Object)offset).toString());
                    return var3_6;
                }));
                this.bitmap$trans$0 = true;
            }
        }
        return this.initialOffsets;
    }

    private HoodieSourceOffset initialOffsets() {
        return !this.bitmap$trans$0 ? this.initialOffsets$lzycompute() : this.initialOffsets;
    }

    public StructType schema() {
        return this.isCDCQuery() ? CDCRelation$.MODULE$.FULL_CDC_SPARK_SCHEMA() : (StructType)this.schemaOption.getOrElse((Function0 & Serializable & scala.Serializable)() -> {
            TableSchemaResolver schemaUtil = new TableSchemaResolver(this.metaClient());
            return AvroConversionUtils$.MODULE$.convertAvroSchemaToStructType(schemaUtil.getTableAvroSchema());
        });
    }

    private Option<HoodieSourceOffset> getLatestOffset() {
        this.metaClient().reloadActiveTimeline();
        HoodieTimeline hoodieTimeline = this.metaClient().getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
        Object object = !hoodieTimeline.empty() ? new Some((Object)new HoodieSourceOffset(hoodieTimeline.lastInstant().get().getTimestamp())) : None$.MODULE$;
        return object;
    }

    public Option<org.apache.spark.sql.execution.streaming.Offset> getOffset() {
        return this.getLatestOffset();
    }

    public Dataset<Row> getBatch(Option<org.apache.spark.sql.execution.streaming.Offset> start2, org.apache.spark.sql.execution.streaming.Offset end) {
        Dataset dataset;
        HoodieSourceOffset startOffset = (HoodieSourceOffset)((Object)start2.map((Function1 & Serializable & scala.Serializable)x$1 -> HoodieSourceOffset$.MODULE$.apply((org.apache.spark.sql.execution.streaming.Offset)x$1)).getOrElse((Function0 & Serializable & scala.Serializable)() -> this.initialOffsets()));
        HoodieSourceOffset endOffset = HoodieSourceOffset$.MODULE$.apply(end);
        HoodieSourceOffset hoodieSourceOffset = startOffset;
        HoodieSourceOffset hoodieSourceOffset2 = endOffset;
        if (!(hoodieSourceOffset != null ? !((Object)((Object)hoodieSourceOffset)).equals((Object)hoodieSourceOffset2) : hoodieSourceOffset2 != null)) {
            dataset = this.sqlContext.internalCreateDataFrame(this.sqlContext.sparkContext().emptyRDD(ClassTag$.MODULE$.apply(InternalRow.class)).setName("empty"), this.schema(), true);
        } else if (this.isCDCQuery()) {
            Map cdcOptions = (Map)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)DataSourceReadOptions$.MODULE$.BEGIN_INSTANTTIME().key()), (Object)this.startCommitTime(startOffset)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)DataSourceReadOptions$.MODULE$.END_INSTANTTIME().key()), (Object)endOffset.commitTime())}));
            RDD<InternalRow> rdd = CDCRelation$.MODULE$.getCDCRelation(this.sqlContext, this.metaClient(), (Map<String, String>)cdcOptions).buildScan0(HoodieCDCUtils.CDC_COLUMNS, (Filter[])Array$.MODULE$.empty(ClassTag$.MODULE$.apply(Filter.class)));
            dataset = this.sqlContext.sparkSession().internalCreateDataFrame(rdd, CDCRelation$.MODULE$.FULL_CDC_SPARK_SCHEMA(), true);
        } else {
            RDD<Row> rDD;
            Map incParams = this.parameters.$plus$plus((GenTraversableOnce)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)DataSourceReadOptions$.MODULE$.QUERY_TYPE().key()), (Object)DataSourceReadOptions$.MODULE$.QUERY_TYPE_INCREMENTAL_OPT_VAL()), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)DataSourceReadOptions$.MODULE$.BEGIN_INSTANTTIME().key()), (Object)this.startCommitTime(startOffset)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)DataSourceReadOptions$.MODULE$.END_INSTANTTIME().key()), (Object)endOffset.commitTime())})));
            HoodieTableType hoodieTableType = this.tableType();
            if (((Object)((Object)HoodieTableType.COPY_ON_WRITE)).equals((Object)hoodieTableType)) {
                SparkRowSerDe serDe = this.sparkAdapter().createSparkRowSerDe(this.schema());
                rDD = new IncrementalRelation(this.sqlContext, (Map<String, String>)incParams, (Option<StructType>)new Some((Object)this.schema()), this.metaClient()).buildScan().map((Function1 & Serializable & scala.Serializable)x$1 -> serDe.serializeRow((Row)x$1), ClassTag$.MODULE$.apply(InternalRow.class));
            } else if (((Object)((Object)HoodieTableType.MERGE_ON_READ)).equals((Object)hoodieTableType)) {
                String[] requiredColumns = (String[])new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[])this.schema().fields())).map((Function1 & Serializable & scala.Serializable)x$2 -> x$2.name(), Array$.MODULE$.canBuildFrom(ClassTag$.MODULE$.apply(String.class)));
                rDD = new MergeOnReadIncrementalRelation(this.sqlContext, (Map<String, String>)incParams, this.metaClient(), (Option<StructType>)new Some((Object)this.schema()), MergeOnReadIncrementalRelation$.MODULE$.$lessinit$greater$default$5()).buildScan(requiredColumns, (Filter[])Array$.MODULE$.empty(ClassTag$.MODULE$.apply(Filter.class)));
            } else {
                throw new IllegalArgumentException(new StringBuilder(21).append("UnSupport tableType: ").append((Object)this.tableType()).toString());
            }
            RDD<Row> rdd = rDD;
            dataset = this.sqlContext.internalCreateDataFrame((RDD)rdd, this.schema(), true);
        }
        return dataset;
    }

    private String startCommitTime(HoodieSourceOffset startOffset) {
        String string;
        HoodieSourceOffset hoodieSourceOffset = startOffset;
        HoodieSourceOffset hoodieSourceOffset2 = HoodieSourceOffset$.MODULE$.INIT_OFFSET();
        HoodieSourceOffset hoodieSourceOffset3 = hoodieSourceOffset;
        if (!(hoodieSourceOffset2 != null ? !((Object)((Object)hoodieSourceOffset2)).equals((Object)hoodieSourceOffset3) : hoodieSourceOffset3 != null)) {
            string = startOffset.commitTime();
        } else if (hoodieSourceOffset != null) {
            String commitTime = hoodieSourceOffset.commitTime();
            long time = HoodieActiveTimeline.parseDateFromInstantTime(commitTime).getTime();
            string = HoodieActiveTimeline.formatDate(new Date(time + 1000L));
        } else {
            throw new IllegalStateException("UnKnow offset type.");
        }
        return string;
    }

    public void stop() {
    }

    public HoodieStreamSource(SQLContext sqlContext, String metadataPath, Option<StructType> schemaOption, Map<String, String> parameters, HoodieOffsetRangeLimit offsetRangeLimit) {
        this.sqlContext = sqlContext;
        this.metadataPath = metadataPath;
        this.schemaOption = schemaOption;
        this.parameters = parameters;
        this.offsetRangeLimit = offsetRangeLimit;
        Source.$init$((Source)this);
        Logging.$init$((Logging)this);
        SparkAdapterSupport.$init$(this);
        this.hadoopConf = sqlContext.sparkSession().sessionState().newHadoopConf();
        this.isCDCQuery = CDCRelation$.MODULE$.isCDCEnabled(this.metaClient()) && parameters.get((Object)DataSourceReadOptions$.MODULE$.QUERY_TYPE().key()).contains((Object)DataSourceReadOptions$.MODULE$.QUERY_TYPE_INCREMENTAL_OPT_VAL()) && parameters.get((Object)DataSourceReadOptions$.MODULE$.INCREMENTAL_FORMAT().key()).contains((Object)DataSourceReadOptions$.MODULE$.INCREMENTAL_FORMAT_CDC_VAL());
    }
}

