/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.utilities.streamer;

import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.stream.Stream;
import org.apache.avro.Schema;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.common.table.checkpoint.Checkpoint;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieErrorTableConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.storage.hadoop.HoodieHadoopStorage;
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
import org.apache.hudi.utilities.config.HoodieStreamerConfig;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.InputBatch;
import org.apache.hudi.utilities.streamer.BaseErrorTableWriter;
import org.apache.hudi.utilities.streamer.ErrorEvent;
import org.apache.hudi.utilities.streamer.HoodieMultiTableStreamer;
import org.apache.hudi.utilities.streamer.HoodieStreamer;
import org.apache.hudi.utilities.streamer.SourceFormatAdapter;
import org.apache.hudi.utilities.streamer.StreamSync;
import org.apache.hudi.utilities.transform.Transformer;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;

public class TestStreamSync
extends SparkClientFunctionalTestHarness {
    @ParameterizedTest
    @MethodSource(value={"testCasesFetchNextBatchFromSource"})
    void testFetchNextBatchFromSource(Boolean useRowWriter, Boolean hasTransformer, Boolean hasSchemaProvider, Boolean isNullTargetSchema, Boolean hasErrorTable, Boolean shouldTryWriteToErrorTable) {
        HoodieSparkEngineContext hoodieSparkEngineContext = (HoodieSparkEngineContext)Mockito.mock(HoodieSparkEngineContext.class);
        HoodieHadoopStorage storage = new HoodieHadoopStorage((FileSystem)Mockito.mock(FileSystem.class));
        SparkSession sparkSession = (SparkSession)Mockito.mock(SparkSession.class);
        Configuration configuration = (Configuration)Mockito.mock(Configuration.class);
        HoodieStreamer.Config cfg = new HoodieStreamer.Config();
        cfg.targetTableName = "testTableName";
        cfg.targetBasePath = "/fake/table/name";
        cfg.tableType = "MERGE_ON_READ";
        SourceFormatAdapter sourceFormatAdapter = (SourceFormatAdapter)Mockito.mock(SourceFormatAdapter.class);
        SchemaProvider inputBatchSchemaProvider = this.getSchemaProvider("InputBatch", false);
        Option fakeDataFrame = Option.of((Object)Mockito.mock(Dataset.class));
        InputBatch fakeRowInputBatch = new InputBatch(fakeDataFrame, "chkpt", inputBatchSchemaProvider);
        Mockito.when((Object)sourceFormatAdapter.fetchNewDataInRowFormat((Option)ArgumentMatchers.any(), ArgumentMatchers.anyLong())).thenReturn((Object)fakeRowInputBatch);
        InputBatch fakeAvroInputBatch = new InputBatch(Option.empty(), "chkpt", inputBatchSchemaProvider);
        Mockito.when((Object)sourceFormatAdapter.fetchNewDataInAvroFormat((Option)ArgumentMatchers.any(), ArgumentMatchers.anyLong())).thenReturn((Object)fakeAvroInputBatch);
        Mockito.when((Object)sourceFormatAdapter.processErrorEvents((Option)ArgumentMatchers.any(), (ErrorEvent.ErrorReason)ArgumentMatchers.any())).thenReturn((Object)Option.empty());
        Option transformerOption = Option.empty();
        if (hasTransformer.booleanValue()) {
            transformerOption = Option.of((Object)Mockito.mock(Transformer.class));
        }
        SchemaProvider schemaProvider = null;
        if (hasSchemaProvider.booleanValue()) {
            schemaProvider = this.getSchemaProvider("UserProvided", isNullTargetSchema);
        }
        TypedProperties props = new TypedProperties();
        props.put((Object)DataSourceWriteOptions.RECONCILE_SCHEMA().key(), (Object)false);
        Option errorTableWriterOption = Option.empty();
        if (hasErrorTable.booleanValue()) {
            errorTableWriterOption = Option.of((Object)Mockito.mock(BaseErrorTableWriter.class));
            props.put((Object)HoodieErrorTableConfig.ERROR_ENABLE_VALIDATE_TARGET_SCHEMA.key(), (Object)true);
        }
        TypedProperties propsSpy = (TypedProperties)Mockito.spy((Object)props);
        StreamSync streamSync = new StreamSync(cfg, sparkSession, propsSpy, hoodieSparkEngineContext, (HoodieStorage)storage, configuration, client -> true, schemaProvider, errorTableWriterOption, sourceFormatAdapter, transformerOption, false);
        StreamSync spy = (StreamSync)Mockito.spy((Object)streamSync);
        ((StreamSync)Mockito.doReturn((Object)useRowWriter).when((Object)spy)).canUseRowWriter((Schema)ArgumentMatchers.any());
        ((StreamSync)Mockito.doReturn((Object)useRowWriter).when((Object)spy)).isRowWriterEnabled();
        SchemaProvider deducedSchemaProvider = this.getSchemaProvider("deduced", false);
        ((StreamSync)Mockito.doReturn((Object)deducedSchemaProvider).when((Object)spy)).getDeducedSchemaProvider((Schema)ArgumentMatchers.any(), (SchemaProvider)ArgumentMatchers.any(), (HoodieTableMetaClient)ArgumentMatchers.any());
        Pair batchAndUseRowWriter = spy.fetchNextBatchFromSource(Option.empty(), (HoodieTableMetaClient)Mockito.mock(HoodieTableMetaClient.class));
        InputBatch batch = (InputBatch)batchAndUseRowWriter.getLeft();
        ((StreamSync)Mockito.verify((Object)spy, (VerificationMode)Mockito.times((int)1))).getDeducedSchemaProvider((Schema)ArgumentMatchers.any(), (SchemaProvider)ArgumentMatchers.any(), (HoodieTableMetaClient)ArgumentMatchers.any());
        Assertions.assertEquals((Object)deducedSchemaProvider.getTargetSchema(), (Object)batch.getSchemaProvider().getTargetSchema());
        ((TypedProperties)Mockito.verify((Object)propsSpy, (VerificationMode)(shouldTryWriteToErrorTable != false ? Mockito.times((int)1) : Mockito.never()))).getBoolean(HoodieErrorTableConfig.ERROR_ENABLE_VALIDATE_TARGET_SCHEMA.key(), ((Boolean)HoodieErrorTableConfig.ERROR_ENABLE_VALIDATE_TARGET_SCHEMA.defaultValue()).booleanValue());
    }

    @ParameterizedTest
    @MethodSource(value={"getMultiTableStreamerCases"})
    void testCloneConfigsFromMultiTableStreamer(HoodieMultiTableStreamer.Config cfg) throws IOException {
        Configuration configuration = new Configuration();
        JavaSparkContext jssc = (JavaSparkContext)Mockito.mock(JavaSparkContext.class);
        Mockito.when((Object)jssc.hadoopConfiguration()).thenReturn((Object)configuration);
        HoodieMultiTableStreamer multiTableStreamer = new HoodieMultiTableStreamer(cfg, jssc);
        List tableExecutionContextList = multiTableStreamer.getTableExecutionContexts();
        tableExecutionContextList.forEach(it -> {
            Assertions.assertTrue((boolean)it.getConfig().configs.containsAll(cfg.configs));
            Assertions.assertNotEquals((Object)HoodieStreamer.Config.DEFAULT_DFS_SOURCE_PROPERTIES, (Object)it.getConfig().propsFilePath);
        });
        ((JavaSparkContext)Mockito.verify((Object)jssc)).hadoopConfiguration();
    }

    private static Stream<Arguments> getMultiTableStreamerCases() {
        String propFile = "src/test/resources/streamer-config/kafka-source-multi.properties";
        return Stream.of(Arguments.of((Object[])new Object[]{TestStreamSync.generateMultiTableStreamerConfig(propFile, Collections.emptyList())}), Arguments.of((Object[])new Object[]{TestStreamSync.generateMultiTableStreamerConfig(propFile, Collections.singletonList("hoodie.keygen.timebased.output.dateformat=yyyyMMdd"))}));
    }

    private static HoodieMultiTableStreamer.Config generateMultiTableStreamerConfig(String propsFilePath, List<String> configs) {
        HoodieMultiTableStreamer.Config cfg = new HoodieMultiTableStreamer.Config();
        cfg.basePathPrefix = "src/test/resources/streamer-config";
        cfg.configFolder = "src/test/resources/streamer-config";
        cfg.propsFilePath = propsFilePath;
        cfg.configs = configs;
        cfg.tableType = "MERGE_ON_READ";
        return cfg;
    }

    private static Stream<Arguments> getCheckpointToResumeCases() {
        return Stream.of(Arguments.of((Object[])new Object[]{TestStreamSync.generateDeltaStreamerConfig("new-reset-checkpoint", null), TestStreamSync.generateCommitMetadata("old-reset-checkpoint", null, null), Option.of((Object)"new-reset-checkpoint")}), Arguments.of((Object[])new Object[]{TestStreamSync.generateDeltaStreamerConfig("old-reset-checkpoint", null), TestStreamSync.generateCommitMetadata("old-reset-checkpoint", null, "checkpoint-prev-run"), Option.of((Object)"checkpoint-prev-run")}), Arguments.of((Object[])new Object[]{TestStreamSync.generateDeltaStreamerConfig("old-reset-checkpoint", "123445"), TestStreamSync.generateCommitMetadata("old-reset-checkpoint", "123445", "checkpoint-prev-run"), Option.of((Object)"checkpoint-prev-run")}), Arguments.of((Object[])new Object[]{TestStreamSync.generateDeltaStreamerConfig("old-reset-checkpoint", "123445"), TestStreamSync.generateCommitMetadata("old-reset-checkpoint", "123422", "checkpoint-prev-run"), Option.empty()}), Arguments.of((Object[])new Object[]{TestStreamSync.generateDeltaStreamerConfig("new-reset-checkpoint", "123445"), TestStreamSync.generateCommitMetadata("old-reset-checkpoint", "123422", "checkpoint-prev-run"), Option.empty()}));
    }

    private static HoodieStreamer.Config generateDeltaStreamerConfig(String checkpoint, String ignoreCheckpoint) {
        HoodieStreamer.Config cfg = new HoodieStreamer.Config();
        cfg.checkpoint = checkpoint;
        cfg.ignoreCheckpoint = ignoreCheckpoint;
        cfg.tableType = "MERGE_ON_READ";
        return cfg;
    }

    private static HoodieCommitMetadata generateCommitMetadata(String resetCheckpointValue, String ignoreCheckpointValue, String checkpointValue) {
        HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
        commitMetadata.addMetadata("deltastreamer.checkpoint.reset_key", resetCheckpointValue);
        commitMetadata.addMetadata("deltastreamer.checkpoint.ignore_key", ignoreCheckpointValue);
        commitMetadata.addMetadata("deltastreamer.checkpoint.key", checkpointValue);
        return commitMetadata;
    }

    private SchemaProvider getSchemaProvider(String name, boolean isNullTargetSchema) {
        SchemaProvider schemaProvider = (SchemaProvider)Mockito.mock(SchemaProvider.class);
        Schema sourceSchema = (Schema)Mockito.mock(Schema.class);
        Schema targetSchema = isNullTargetSchema ? InputBatch.NULL_SCHEMA : (Schema)Mockito.mock(Schema.class);
        Mockito.when((Object)schemaProvider.getSourceSchema()).thenReturn((Object)sourceSchema);
        Mockito.when((Object)schemaProvider.getTargetSchema()).thenReturn((Object)targetSchema);
        Mockito.when((Object)sourceSchema.toString()).thenReturn((Object)(name + "SourceSchema"));
        if (!isNullTargetSchema) {
            Mockito.when((Object)targetSchema.toString()).thenReturn((Object)(name + "TargetSchema"));
        }
        return schemaProvider;
    }

    static Stream<Arguments> testCasesFetchNextBatchFromSource() {
        Stream.Builder<Arguments> b = Stream.builder();
        for (Boolean useRowWriter : new Boolean[]{false, true}) {
            for (Boolean hasErrorTable : new Boolean[]{false, true}) {
                boolean errorTableEnabled = hasErrorTable != false && useRowWriter == false;
                b.add(Arguments.of((Object[])new Object[]{useRowWriter, false, false, false, hasErrorTable, errorTableEnabled}));
            }
        }
        for (Boolean useRowWriter : new Boolean[]{false, true}) {
            for (Boolean hasSchemaProvider : new Boolean[]{false, true}) {
                for (Boolean isNullTargetSchema : new Boolean[]{false, true}) {
                    for (Boolean hasErrorTable : new Boolean[]{false, true}) {
                        boolean errorTableEnabled = hasErrorTable != false && useRowWriter == false;
                        boolean schemaProviderNullOrMissing = isNullTargetSchema != false || hasSchemaProvider == false;
                        boolean shouldTryWriteToErrorTable = errorTableEnabled && !schemaProviderNullOrMissing;
                        b.add(Arguments.of((Object[])new Object[]{useRowWriter, true, hasSchemaProvider, isNullTargetSchema, hasErrorTable, shouldTryWriteToErrorTable}));
                    }
                }
            }
        }
        return b.build();
    }

    @Test
    public void testInitializeEmptyTable() throws IOException {
        HoodieStreamer.Config cfg = new HoodieStreamer.Config();
        cfg.targetTableName = "testTableName";
        cfg.targetBasePath = "/fake/table/name";
        cfg.tableType = "MERGE_ON_READ";
        SchemaProvider schemaProvider = this.getSchemaProvider("InputBatch", false);
        TypedProperties props = new TypedProperties();
        props.put((Object)HoodieWriteConfig.WRITE_TABLE_VERSION.key(), (Object)HoodieTableVersion.SIX.versionCode());
        HoodieSparkEngineContext hoodieSparkEngineContext = (HoodieSparkEngineContext)Mockito.mock(HoodieSparkEngineContext.class);
        HoodieHadoopStorage storage = new HoodieHadoopStorage((FileSystem)Mockito.mock(FileSystem.class));
        SparkSession sparkSession = (SparkSession)Mockito.mock(SparkSession.class);
        Configuration configuration = (Configuration)Mockito.mock(Configuration.class);
        SourceFormatAdapter sourceFormatAdapter = (SourceFormatAdapter)Mockito.mock(SourceFormatAdapter.class);
        TypedProperties propsSpy = (TypedProperties)Mockito.spy((Object)props);
        HoodieTableMetaClient.TableBuilder tableBuilder = (HoodieTableMetaClient.TableBuilder)Mockito.spy((Object)HoodieTableMetaClient.newTableBuilder().fromProperties((Properties)propsSpy));
        ((HoodieTableMetaClient.TableBuilder)Mockito.doReturn(null).when((Object)tableBuilder)).initTable((StorageConfiguration)ArgumentMatchers.any(), ArgumentMatchers.anyString());
        StreamSync streamSync = new StreamSync(cfg, sparkSession, propsSpy, hoodieSparkEngineContext, (HoodieStorage)storage, configuration, client -> true, schemaProvider, Option.empty(), sourceFormatAdapter, Option.empty(), false);
        StreamSync spy = (StreamSync)Mockito.spy((Object)streamSync);
        spy.initializeEmptyTable(tableBuilder, "", null);
        ((HoodieTableMetaClient.TableBuilder)Mockito.verify((Object)tableBuilder, (VerificationMode)Mockito.times((int)1))).setTableVersion(HoodieTableVersion.SIX);
    }

    private StreamSync setupStreamSync() {
        HoodieStreamer.Config cfg = new HoodieStreamer.Config();
        cfg.checkpoint = "test-checkpoint";
        cfg.ignoreCheckpoint = "test-ignore";
        cfg.sourceClassName = "test-source";
        TypedProperties props = new TypedProperties();
        SchemaProvider schemaProvider = (SchemaProvider)Mockito.mock(SchemaProvider.class);
        return new StreamSync(cfg, (SparkSession)Mockito.mock(SparkSession.class), props, (HoodieSparkEngineContext)Mockito.mock(HoodieSparkEngineContext.class), (HoodieStorage)Mockito.mock(HoodieStorage.class), (Configuration)Mockito.mock(Configuration.class), client -> true, schemaProvider, Option.empty(), (SourceFormatAdapter)Mockito.mock(SourceFormatAdapter.class), Option.empty(), false);
    }

    @Test
    public void testExtractCheckpointMetadata_WhenForceSkipIsTrue() {
        StreamSync streamSync = this.setupStreamSync();
        HoodieStreamer.Config cfg = new HoodieStreamer.Config();
        TypedProperties props = new TypedProperties();
        props.put((Object)HoodieStreamerConfig.CHECKPOINT_FORCE_SKIP.key(), (Object)"true");
        Map result = streamSync.extractCheckpointMetadata((InputBatch)Mockito.mock(InputBatch.class), props, HoodieTableVersion.ZERO.versionCode(), cfg);
        Assertions.assertTrue((boolean)result.isEmpty(), (String)"Should return empty map when CHECKPOINT_FORCE_SKIP is true");
    }

    @Test
    public void testExtractCheckpointMetadata_WhenCheckpointExists() {
        StreamSync streamSync = this.setupStreamSync();
        HoodieStreamer.Config cfg = new HoodieStreamer.Config();
        TypedProperties props = new TypedProperties();
        props.put((Object)HoodieStreamerConfig.CHECKPOINT_FORCE_SKIP.key(), (Object)"false");
        InputBatch inputBatch = (InputBatch)Mockito.mock(InputBatch.class);
        Checkpoint checkpoint = (Checkpoint)Mockito.mock(Checkpoint.class);
        HashMap<String, String> expectedMetadata = new HashMap<String, String>();
        expectedMetadata.put("test", "value");
        Mockito.when((Object)inputBatch.getCheckpointForNextBatch()).thenReturn((Object)checkpoint);
        Mockito.when((Object)checkpoint.getCheckpointCommitMetadata(cfg.checkpoint, cfg.ignoreCheckpoint)).thenReturn(expectedMetadata);
        Map result = streamSync.extractCheckpointMetadata(inputBatch, props, HoodieTableVersion.ZERO.versionCode(), cfg);
        Assertions.assertEquals(expectedMetadata, (Object)result, (String)"Should return checkpoint metadata when checkpoint exists");
    }

    @Test
    public void testExtractCheckpointMetadata_WhenCheckpointIsNullV2() {
        StreamSync streamSync = this.setupStreamSync();
        HoodieStreamer.Config cfg = new HoodieStreamer.Config();
        cfg.checkpoint = "test-checkpoint";
        cfg.ignoreCheckpoint = "test-ignore";
        TypedProperties props = new TypedProperties();
        InputBatch inputBatch = (InputBatch)Mockito.mock(InputBatch.class);
        Mockito.when((Object)inputBatch.getCheckpointForNextBatch()).thenReturn(null);
        Map result = streamSync.extractCheckpointMetadata(inputBatch, props, HoodieTableVersion.EIGHT.versionCode(), cfg);
        HashMap<String, String> expected = new HashMap<String, String>();
        expected.put("deltastreamer.checkpoint.ignore_key", "test-ignore");
        expected.put("streamer.checkpoint.reset.key.v2", "test-checkpoint");
        Assertions.assertEquals(expected, (Object)result, (String)"Should return default metadata when checkpoint is null");
    }

    @Test
    public void testExtractCheckpointMetadata_WhenCheckpointIsNullV1() {
        StreamSync streamSync = this.setupStreamSync();
        HoodieStreamer.Config cfg = new HoodieStreamer.Config();
        cfg.checkpoint = "test-checkpoint";
        cfg.ignoreCheckpoint = "test-ignore";
        TypedProperties props = new TypedProperties();
        InputBatch inputBatch = (InputBatch)Mockito.mock(InputBatch.class);
        Mockito.when((Object)inputBatch.getCheckpointForNextBatch()).thenReturn(null);
        Map result = streamSync.extractCheckpointMetadata(inputBatch, props, HoodieTableVersion.SIX.versionCode(), cfg);
        HashMap<String, String> expected = new HashMap<String, String>();
        expected.put("deltastreamer.checkpoint.ignore_key", "test-ignore");
        expected.put("deltastreamer.checkpoint.reset_key", "test-checkpoint");
        Assertions.assertEquals(expected, (Object)result, (String)"Should return default metadata when checkpoint is null");
    }
}

