/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.utilities.sources;

import java.io.IOException;
import java.io.OutputStream;
import java.io.PrintStream;
import java.util.List;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.SchemaCompatibilityException;
import org.apache.hudi.utilities.config.HoodieStreamerConfig;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.InputBatch;
import org.apache.hudi.utilities.sources.JsonDFSSource;
import org.apache.hudi.utilities.sources.Source;
import org.apache.hudi.utilities.streamer.SourceFormatAdapter;
import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
import org.apache.hudi.utilities.testutils.sources.AbstractDFSSourceTestBase;
import org.apache.spark.sql.Dataset;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

public class TestJsonDFSSource
extends AbstractDFSSourceTestBase {
    @Override
    @BeforeEach
    public void setup() throws Exception {
        super.setup();
        this.dfsRoot = basePath + "/jsonFiles";
        this.fileSuffix = ".json";
    }

    @Override
    public Source prepareDFSSource(TypedProperties props) {
        props.setProperty("hoodie.streamer.source.dfs.root", this.dfsRoot);
        return new JsonDFSSource(props, jsc, sparkSession, (SchemaProvider)this.schemaProvider);
    }

    @Override
    public void writeNewDataToFile(List<HoodieRecord> records, Path path) throws IOException {
        UtilitiesTestBase.Helpers.saveStringsToDFS(UtilitiesTestBase.Helpers.jsonifyRecords(records), storage, path.toString());
    }

    @Test
    public void testCorruptedSourceFile() throws IOException {
        Throwable t;
        fs.mkdirs(new Path(this.dfsRoot));
        TypedProperties props = new TypedProperties();
        props.setProperty(HoodieStreamerConfig.ROW_THROW_EXPLICIT_EXCEPTIONS.key(), "true");
        SourceFormatAdapter sourceFormatAdapter = new SourceFormatAdapter(this.prepareDFSSource(props), Option.empty(), Option.of((Object)props));
        this.generateOneFile("1", "000", 10);
        this.generateOneFile("2", "000", 10);
        RemoteIterator files = fs.listFiles(this.generateOneFile("3", "000", 10), true);
        FileStatus file1Status = (FileStatus)files.next();
        InputBatch batch = sourceFormatAdapter.fetchNewDataInRowFormat(Option.empty(), Long.MAX_VALUE);
        this.corruptFile(file1Status.getPath());
        Assertions.assertTrue((boolean)batch.getBatch().isPresent());
        for (t = Assertions.assertThrows(Exception.class, () -> ((Dataset)batch.getBatch().get()).show(30)); t != null; t = t.getCause()) {
            if (!(t instanceof SchemaCompatibilityException)) continue;
            return;
        }
        throw new AssertionError("Exception does not have SchemaCompatibility in its trace", t);
    }

    protected void corruptFile(Path path) throws IOException {
        PrintStream os = new PrintStream((OutputStream)fs.appendFile(path).build());
        os.println("\ud83e\udd37\u200d");
        os.flush();
        os.close();
    }
}

