/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.utilities.testutils.sources;

import java.io.IOException;
import java.util.List;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hudi.AvroConversionUtils;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
import org.apache.hudi.utilities.sources.InputBatch;
import org.apache.hudi.utilities.sources.Source;
import org.apache.hudi.utilities.streamer.SourceFormatAdapter;
import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

public abstract class AbstractDFSSourceTestBase
extends UtilitiesTestBase {
    protected FilebasedSchemaProvider schemaProvider;
    protected String dfsRoot;
    protected String fileSuffix;
    protected HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
    protected boolean useFlattenedSchema = false;

    @BeforeAll
    public static void initClass() throws Exception {
        UtilitiesTestBase.initTestServices(false, false, false);
    }

    @Override
    @BeforeEach
    public void setup() throws Exception {
        super.setup();
        this.schemaProvider = new FilebasedSchemaProvider(UtilitiesTestBase.Helpers.setupSchemaOnDFS(), jsc);
    }

    protected final Source prepareDFSSource() {
        return this.prepareDFSSource(new TypedProperties());
    }

    protected abstract Source prepareDFSSource(TypedProperties var1);

    protected abstract void writeNewDataToFile(List<HoodieRecord> var1, Path var2) throws IOException;

    protected Path generateOneFile(String filename, String instantTime, int n) throws IOException {
        Path path = new Path(this.dfsRoot, filename + this.fileSuffix);
        this.writeNewDataToFile(this.dataGenerator.generateInserts(instantTime, Integer.valueOf(n), this.useFlattenedSchema), path);
        return path;
    }

    @Test
    public void testReadingFromSource() throws IOException {
        fs.mkdirs(new Path(this.dfsRoot));
        SourceFormatAdapter sourceFormatAdapter = new SourceFormatAdapter(this.prepareDFSSource());
        Assertions.assertEquals((Object)Option.empty(), (Object)sourceFormatAdapter.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE).getBatch());
        int sourceLimit = 10;
        RemoteIterator files = fs.listFiles(this.generateOneFile("1", "000", 100), true);
        FileStatus file1Status = (FileStatus)files.next();
        Assertions.assertTrue((file1Status.getLen() > (long)sourceLimit ? 1 : 0) != 0);
        Assertions.assertEquals((Object)Option.empty(), (Object)sourceFormatAdapter.fetchNewDataInAvroFormat(Option.empty(), (long)sourceLimit).getBatch());
        InputBatch fetch1 = sourceFormatAdapter.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE);
        Assertions.assertEquals((long)100L, (long)((JavaRDD)fetch1.getBatch().get()).count());
        InputBatch fetch1AsRows = sourceFormatAdapter.fetchNewDataInRowFormat(Option.empty(), Long.MAX_VALUE);
        Assertions.assertEquals((long)100L, (long)((Dataset)fetch1AsRows.getBatch().get()).count());
        Dataset fetch1Rows = AvroConversionUtils.createDataFrame((RDD)JavaRDD.toRDD((JavaRDD)((JavaRDD)fetch1.getBatch().get())), (String)this.schemaProvider.getSourceSchema().toString(), (SparkSession)sparkSession);
        Assertions.assertEquals((long)100L, (long)fetch1Rows.count());
        this.generateOneFile("2", "001", 10000);
        InputBatch fetch2 = sourceFormatAdapter.fetchNewDataInAvroFormat(Option.of((Object)fetch1.getCheckpointForNextBatch()), Long.MAX_VALUE);
        Assertions.assertEquals((long)10000L, (long)((JavaRDD)fetch2.getBatch().get()).count());
        InputBatch fetch2AsRows = sourceFormatAdapter.fetchNewDataInRowFormat(Option.of((Object)fetch1AsRows.getCheckpointForNextBatch()), Long.MAX_VALUE);
        Assertions.assertEquals((long)10000L, (long)((Dataset)fetch2AsRows.getBatch().get()).count());
        InputBatch fetch3AsRows = sourceFormatAdapter.fetchNewDataInRowFormat(Option.of((Object)fetch1AsRows.getCheckpointForNextBatch()), Long.MAX_VALUE);
        Assertions.assertEquals((long)10000L, (long)((Dataset)fetch3AsRows.getBatch().get()).count());
        Assertions.assertEquals((Object)fetch2AsRows.getCheckpointForNextBatch(), (Object)fetch3AsRows.getCheckpointForNextBatch());
        ((Dataset)fetch3AsRows.getBatch().get()).createOrReplaceTempView("test_dfs_table");
        Dataset rowDataset = SparkSession.builder().sparkContext(jsc.sc()).getOrCreate().sql("select * from test_dfs_table");
        Assertions.assertEquals((long)10000L, (long)rowDataset.count());
        InputBatch fetch4 = sourceFormatAdapter.fetchNewDataInAvroFormat(Option.of((Object)fetch2.getCheckpointForNextBatch()), Long.MAX_VALUE);
        Assertions.assertEquals((Object)Option.empty(), (Object)fetch4.getBatch());
        InputBatch fetch5 = sourceFormatAdapter.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE);
        Assertions.assertEquals((long)10100L, (long)((JavaRDD)fetch5.getBatch().get()).count());
        this.generateOneFile(".checkpoint/3", "002", 100);
        this.generateOneFile("_checkpoint/3", "002", 100);
        this.generateOneFile(".3", "002", 100);
        this.generateOneFile("_3", "002", 100);
        this.generateOneFile("foo/.bar/3", "002", 1);
        this.generateOneFile("foo/bar/3", "002", 1);
        InputBatch fetch6 = sourceFormatAdapter.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE);
        Assertions.assertEquals((long)10101L, (long)((JavaRDD)fetch6.getBatch().get()).count());
    }
}

