/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.utilities.deltastreamer;

import java.io.File;
import java.util.Arrays;
import java.util.List;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.util.FileIOUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerTestBase;
import org.apache.hudi.utilities.sources.ParquetDFSSource;
import org.apache.spark.scheduler.SparkListener;
import org.apache.spark.scheduler.SparkListenerInterface;
import org.apache.spark.scheduler.SparkListenerStageCompleted;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;

@Disabled(value="HUDI-6505")
public class TestHoodieDeltaStreamerDAGExecution
extends HoodieDeltaStreamerTestBase {
    @ParameterizedTest
    @CsvSource(value={"upsert", "insert", "bulk_insert"})
    public void testWriteOperationDoesNotTriggerRepeatedDAG(String operation) throws Exception {
        StageListener stageListener = new StageListener("org.apache.hudi.client.BaseHoodieClient.finalizeWrite");
        sparkSession.sparkContext().addSparkListener((SparkListenerInterface)stageListener);
        this.runDeltaStreamer(WriteOperationType.fromValue((String)operation), false, (Option<List<String>>)Option.empty());
        Assertions.assertEquals((int)1, (int)stageListener.triggerCount);
    }

    @Test
    public void testClusteringDoesNotTriggerRepeatedDAG() throws Exception {
        StageListener stageListener = new StageListener("org.apache.hudi.table.action.commit.BaseCommitActionExecutor.executeClustering");
        sparkSession.sparkContext().addSparkListener((SparkListenerInterface)stageListener);
        List<String> configs = TestHoodieDeltaStreamerDAGExecution.getTableServicesConfigs(100, "false", "true", "1", "", "");
        this.runDeltaStreamer(WriteOperationType.UPSERT, false, (Option<List<String>>)Option.of(configs));
        Assertions.assertEquals((int)1, (int)stageListener.triggerCount);
    }

    @Test
    public void testCompactionDoesNotTriggerRepeatedDAG() throws Exception {
        StageListener stageListener = new StageListener("org.apache.hudi.table.action.compact.RunCompactionActionExecutor.execute");
        sparkSession.sparkContext().addSparkListener((SparkListenerInterface)stageListener);
        List<String> configs = Arrays.asList("hoodie.compact.inline.max.delta.commits=1", "hoodie.compact.inline=true");
        this.runDeltaStreamer(WriteOperationType.UPSERT, true, (Option<List<String>>)Option.of(configs));
        Assertions.assertEquals((int)1, (int)stageListener.triggerCount);
    }

    private void runDeltaStreamer(WriteOperationType operationType, boolean shouldGenerateUpdates, Option<List<String>> configsOpt) throws Exception {
        boolean useSchemaProvider = true;
        PARQUET_SOURCE_ROOT = basePath + "/parquetFilesDfs" + testNum;
        int parquetRecordsCount = 10;
        HoodieTestDataGenerator dataGenerator = TestHoodieDeltaStreamerDAGExecution.prepareParquetDFSFiles(parquetRecordsCount, PARQUET_SOURCE_ROOT, "1.parquet", false, null, null);
        this.prepareParquetDFSSource(useSchemaProvider, true, "source.avsc", "source.avsc", "test-parquet-dfs-source.properties", PARQUET_SOURCE_ROOT, false, "partition_path", "");
        String tableBasePath = basePath + "/runDeltaStreamer" + testNum;
        FileIOUtils.deleteDirectory((File)new File(tableBasePath));
        HoodieDeltaStreamer.Config config = HoodieDeltaStreamerTestBase.TestHelpers.makeConfig(tableBasePath, operationType, ParquetDFSSource.class.getName(), null, "test-parquet-dfs-source.properties", false, useSchemaProvider, 100000, false, null, HoodieTableType.MERGE_ON_READ.name(), "timestamp", null);
        configsOpt.ifPresent(cfgs -> config.configs.addAll(cfgs));
        HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(config, jsc);
        deltaStreamer.sync();
        this.assertRecordCount(parquetRecordsCount, tableBasePath, sqlContext);
        ++testNum;
        if (shouldGenerateUpdates) {
            TestHoodieDeltaStreamerDAGExecution.prepareParquetDFSUpdates(parquetRecordsCount, PARQUET_SOURCE_ROOT, "1.parquet", false, null, null, dataGenerator, "001");
            HoodieDeltaStreamer updateDs = new HoodieDeltaStreamer(config, jsc);
            updateDs.sync();
        }
    }

    private static class StageListener
    extends SparkListener {
        int triggerCount = 0;
        private final String eventToTrack;

        private StageListener(String eventToTrack) {
            this.eventToTrack = eventToTrack;
        }

        public void onStageCompleted(SparkListenerStageCompleted stageCompleted) {
            System.out.println("stage details: " + stageCompleted.stageInfo().details());
            if (stageCompleted.stageInfo().details().contains(this.eventToTrack)) {
                ++this.triggerCount;
            }
        }
    }
}

