/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.integ.testsuite;

import java.io.Serializable;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import scala.Function1;
import scala.Predef$;
import scala.collection.immutable.StringOps;
import scala.collection.mutable.ArrayOps;
import scala.math.Ordering;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.ObjectRef;

@ScalaSignature(bytes="\u0006\u0001\u0005eb\u0001\u0002\u000b\u0016\u0001\u0001B\u0001b\f\u0001\u0003\u0006\u0004%\t\u0001\r\u0005\tq\u0001\u0011\t\u0011)A\u0005c!A\u0011\b\u0001BC\u0002\u0013\u0005!\b\u0003\u0005C\u0001\t\u0005\t\u0015!\u0003<\u0011!\u0019\u0005A!b\u0001\n\u0003!\u0005\u0002C&\u0001\u0005\u0003\u0005\u000b\u0011B#\t\u00111\u0003!Q1A\u0005\u00025C\u0001\"\u0017\u0001\u0003\u0002\u0003\u0006IA\u0014\u0005\t5\u0002\u0011)\u0019!C\u0001\t\"A1\f\u0001B\u0001B\u0003%Q\t\u0003\u0005]\u0001\t\u0005\t\u0015!\u0003F\u0011!i\u0006A!A!\u0002\u0013q\u0006\u0002\u00033\u0001\u0005\u0003\u0005\u000b\u0011B3\t\u000b!\u0004A\u0011A5\t\u000fQ\u0004!\u0019!C\u0005k\"1A\u0010\u0001Q\u0001\nYDQ! \u0001\u0005\u0002yDq!!\u0002\u0001\t\u0003\t9\u0001C\u0004\u0002,\u0001!\t!!\f\u0003?M\u0003\u0018M]6ECR\f7k\\;sG\u0016\u001cuN\u001c;j]V|Wo]%oO\u0016\u001cHO\u0003\u0002\u0017/\u0005IA/Z:ugVLG/\u001a\u0006\u00031e\tQ!\u001b8uK\u001eT!AG\u000e\u0002\t!,H-\u001b\u0006\u00039u\ta!\u00199bG\",'\"\u0001\u0010\u0002\u0007=\u0014xm\u0001\u0001\u0014\u0007\u0001\ts\u0005\u0005\u0002#K5\t1EC\u0001%\u0003\u0015\u00198-\u00197b\u0013\t13E\u0001\u0004B]f\u0014VM\u001a\t\u0003Q5j\u0011!\u000b\u0006\u0003U-\n!![8\u000b\u00031\nAA[1wC&\u0011a&\u000b\u0002\r'\u0016\u0014\u0018.\u00197ju\u0006\u0014G.Z\u0001\u0006gB\f'o[\u000b\u0002cA\u0011!GN\u0007\u0002g)\u0011A'N\u0001\u0004gFd'BA\u0018\u001c\u0013\t94G\u0001\u0007Ta\u0006\u00148nU3tg&|g.\u0001\u0004ta\u0006\u00148\u000eI\u0001\u0005G>tg-F\u0001<!\ta\u0004)D\u0001>\u0015\tIdH\u0003\u0002@7\u00051\u0001.\u00193p_BL!!Q\u001f\u0003\u001b\r{gNZ5hkJ\fG/[8o\u0003\u0015\u0019wN\u001c4!\u0003)\u0019x.\u001e:dKB\u000bG\u000f[\u000b\u0002\u000bB\u0011a)S\u0007\u0002\u000f*\u0011\u0001JP\u0001\u0003MNL!AS$\u0003\tA\u000bG\u000f[\u0001\fg>,(oY3QCRD\u0007%\u0001\u0007t_V\u00148-\u001a$pe6\fG/F\u0001O!\tyeK\u0004\u0002Q)B\u0011\u0011kI\u0007\u0002%*\u00111kH\u0001\u0007yI|w\u000e\u001e \n\u0005U\u001b\u0013A\u0002)sK\u0012,g-\u0003\u0002X1\n11\u000b\u001e:j]\u001eT!!V\u0012\u0002\u001bM|WO]2f\r>\u0014X.\u0019;!\u00039\u0019\u0007.Z2la>Lg\u000e\u001e$jY\u0016\fqb\u00195fG.\u0004x.\u001b8u\r&dW\rI\u0001\rQV$\u0017NQ1tKB\u000bG\u000f[\u0001\fQV$\u0017n\u00149uS>t7\u000f\u0005\u0003`E:sU\"\u00011\u000b\u0005\u0005\\\u0013\u0001B;uS2L!a\u00191\u0003\u00075\u000b\u0007/\u0001\fnS:\u001c\u0016P\\2J]R,'O^1m'\u0016\u001cwN\u001c3t!\t\u0011c-\u0003\u0002hG\t!Aj\u001c8h\u0003\u0019a\u0014N\\5u}QI!\u000e\\7o_B\f(o\u001d\t\u0003W\u0002i\u0011!\u0006\u0005\u0006_9\u0001\r!\r\u0005\u0006s9\u0001\ra\u000f\u0005\u0006\u0007:\u0001\r!\u0012\u0005\u0006\u0019:\u0001\rA\u0014\u0005\u00065:\u0001\r!\u0012\u0005\u00069:\u0001\r!\u0012\u0005\u0006;:\u0001\rA\u0018\u0005\u0006I:\u0001\r!Z\u0001\u0004Y><W#\u0001<\u0011\u0005]TX\"\u0001=\u000b\u0005e\\\u0012!\u00027pORR\u0017BA>y\u0005\u0019aunZ4fe\u0006!An\\4!\u00039\u0019H/\u0019:u\u0013:<Wm\u001d;j_:$\u0012a \t\u0004E\u0005\u0005\u0011bAA\u0002G\t!QK\\5u\u0003e1W\r^2i\u0019&\u001cHo\u00144GS2,7\u000fV8D_:\u001cX/\\3\u0015\u0011\u0005%\u0011QCA\u000f\u0003C\u0001RAIA\u0006\u0003\u001fI1!!\u0004$\u0005\u0015\t%O]1z!\r1\u0015\u0011C\u0005\u0004\u0003'9%A\u0003$jY\u0016\u001cF/\u0019;vg\"1\u0001J\u0005a\u0001\u0003/\u00012ARA\r\u0013\r\tYb\u0012\u0002\u000b\r&dWmU=ti\u0016l\u0007BBA\u0010%\u0001\u0007Q)\u0001\u0005cCN,\u0007+\u0019;i\u0011\u001d\t\u0019C\u0005a\u0001\u0003K\t!\u0002]1uQ\u001aKG\u000e^3s!\r1\u0015qE\u0005\u0004\u0003S9%A\u0003)bi\"4\u0015\u000e\u001c;fe\u0006YqO]5uKR{g)\u001b7f)\u001dy\u0018qFA\u001a\u0003oAa!!\r\u0014\u0001\u0004)\u0015AE2iK\u000e\\\u0007o\\5oi\u001aKG.\u001a)bi\"Da!!\u000e\u0014\u0001\u0004q\u0015aA:ue\"1\u0001j\u0005a\u0001\u0003/\u0001")
public class SparkDataSourceContinuousIngest
implements Serializable {
    private final SparkSession spark;
    private final Configuration conf;
    private final Path sourcePath;
    private final String sourceFormat;
    private final Path checkpointFile;
    private final Path hudiBasePath;
    private final Map<String, String> hudiOptions;
    private final long minSyncIntervalSeconds;
    private final Logger log;

    public SparkSession spark() {
        return this.spark;
    }

    public Configuration conf() {
        return this.conf;
    }

    public Path sourcePath() {
        return this.sourcePath;
    }

    public String sourceFormat() {
        return this.sourceFormat;
    }

    public Path checkpointFile() {
        return this.checkpointFile;
    }

    private Logger log() {
        return this.log;
    }

    public void startIngestion() {
        FileSystem fs = this.sourcePath().getFileSystem(this.conf());
        ObjectRef checkPointFs = ObjectRef.create((Object)this.checkpointFile().getFileSystem(this.conf()));
        FileStatus[] orderedBatch = null;
        if (((FileSystem)checkPointFs.elem).exists(this.checkpointFile())) {
            this.log().info((Object)"Checkpoint file exists. ");
            String checkpoint = ((String[])this.spark().sparkContext().textFile(this.checkpointFile().toString(), this.spark().sparkContext().textFile$default$2()).collect())[0];
            this.log().warn((Object)new StringBuilder(26).append("Checkpoint to resume from ").append(checkpoint).toString());
            orderedBatch = this.fetchListOfFilesToConsume(fs, this.sourcePath(), new PathFilter(null, checkpoint){
                private final String checkpoint$1;

                public boolean accept(Path path) {
                    return new StringOps(Predef$.MODULE$.augmentString(path.getName())).toLong() > new StringOps(Predef$.MODULE$.augmentString(this.checkpoint$1)).toLong();
                }
                {
                    this.checkpoint$1 = checkpoint$1;
                }
            });
            if (this.log().isDebugEnabled()) {
                this.log().debug((Object)"List of batches to consume in order ");
                new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[])orderedBatch)).foreach((Function1 & Serializable & scala.Serializable)entry -> {
                    SparkDataSourceContinuousIngest.$anonfun$startIngestion$1(this, entry);
                    return BoxedUnit.UNIT;
                });
            }
        } else {
            this.log().warn((Object)"No checkpoint file exists. Starting from scratch ");
            orderedBatch = this.fetchListOfFilesToConsume(fs, this.sourcePath(), new PathFilter(null){

                public boolean accept(Path path) {
                    return true;
                }
            });
            if (this.log().isDebugEnabled()) {
                this.log().debug((Object)"List of batches to consume in order ");
                new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[])orderedBatch)).foreach((Function1 & Serializable & scala.Serializable)entry -> {
                    SparkDataSourceContinuousIngest.$anonfun$startIngestion$2(this, entry);
                    return BoxedUnit.UNIT;
                });
            }
        }
        if (new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[])orderedBatch)).isEmpty()) {
            this.log().info((Object)"All batches have been consumed. Exiting.");
        } else {
            new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[])orderedBatch)).foreach((Function1 & Serializable & scala.Serializable)entry -> {
                SparkDataSourceContinuousIngest.$anonfun$startIngestion$3(this, checkPointFs, entry);
                return BoxedUnit.UNIT;
            });
        }
    }

    public FileStatus[] fetchListOfFilesToConsume(FileSystem fs, Path basePath, PathFilter pathFilter) {
        FileStatus[] nextBatches = fs.listStatus(basePath, pathFilter);
        return (FileStatus[])new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[])nextBatches)).sortBy((Function1 & Serializable & scala.Serializable)fileStatus -> BoxesRunTime.boxToLong((long)SparkDataSourceContinuousIngest.$anonfun$fetchListOfFilesToConsume$1(fileStatus)), (Ordering)Ordering.Long$.MODULE$);
    }

    public void writeToFile(Path checkpointFilePath, String str, FileSystem fs) {
        Object object = !fs.exists(checkpointFilePath) ? fs.create(checkpointFilePath) : BoxedUnit.UNIT;
        FSDataOutputStream fsOutStream = fs.create(checkpointFilePath, true);
        fsOutStream.writeBytes(str);
        fsOutStream.flush();
        fsOutStream.close();
    }

    public static final /* synthetic */ void $anonfun$startIngestion$1(SparkDataSourceContinuousIngest $this, FileStatus entry) {
        $this.log().warn((Object)new StringBuilder(1).append(" ").append(entry.getPath().getName()).toString());
    }

    public static final /* synthetic */ void $anonfun$startIngestion$2(SparkDataSourceContinuousIngest $this, FileStatus entry) {
        $this.log().warn((Object)new StringBuilder(1).append(" ").append(entry.getPath().getName()).toString());
    }

    public static final /* synthetic */ void $anonfun$startIngestion$3(SparkDataSourceContinuousIngest $this, ObjectRef checkPointFs$1, FileStatus entry) {
        $this.log().info((Object)new StringBuilder(21).append("Consuming from batch ").append(entry).toString());
        Path pathToConsume = new Path(new StringBuilder(1).append($this.sourcePath().toString()).append("/").append(entry.getPath().getName()).toString());
        Dataset df = $this.spark().read().format($this.sourceFormat()).load(pathToConsume.toString());
        df.write().format("hudi").options($this.hudiOptions).mode(SaveMode.Append).save($this.hudiBasePath.toString());
        $this.writeToFile($this.checkpointFile(), entry.getPath().getName(), (FileSystem)checkPointFs$1.elem);
        $this.log().info((Object)new StringBuilder(76).append("Completed batch ").append(entry).append(". Moving to next batch. Sleeping for ").append($this.minSyncIntervalSeconds).append(" secs before next batch").toString());
        Thread.sleep($this.minSyncIntervalSeconds * 1000L);
    }

    public static final /* synthetic */ long $anonfun$fetchListOfFilesToConsume$1(FileStatus fileStatus) {
        return new StringOps(Predef$.MODULE$.augmentString(fileStatus.getPath().getName())).toLong();
    }

    public SparkDataSourceContinuousIngest(SparkSession spark, Configuration conf, Path sourcePath, String sourceFormat, Path checkpointFile, Path hudiBasePath, Map<String, String> hudiOptions, long minSyncIntervalSeconds) {
        this.spark = spark;
        this.conf = conf;
        this.sourcePath = sourcePath;
        this.sourceFormat = sourceFormat;
        this.checkpointFile = checkpointFile;
        this.hudiBasePath = hudiBasePath;
        this.hudiOptions = hudiOptions;
        this.minSyncIntervalSeconds = minSyncIntervalSeconds;
        this.log = LogManager.getLogger(this.getClass());
    }
}

