/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.utilities.streamer;

import java.io.IOException;
import java.util.function.Predicate;
import java.util.stream.Stream;
import org.apache.avro.Schema;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieErrorTableConfig;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.hadoop.HoodieHadoopStorage;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.InputBatch;
import org.apache.hudi.utilities.streamer.BaseErrorTableWriter;
import org.apache.hudi.utilities.streamer.ErrorEvent;
import org.apache.hudi.utilities.streamer.HoodieStreamer;
import org.apache.hudi.utilities.streamer.SourceFormatAdapter;
import org.apache.hudi.utilities.streamer.StreamSync;
import org.apache.hudi.utilities.transform.Transformer;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;

public class TestStreamSyncUnitTests {
    @ParameterizedTest
    @MethodSource(value={"testCasesFetchNextBatchFromSource"})
    void testFetchNextBatchFromSource(Boolean useRowWriter, Boolean hasTransformer, Boolean hasSchemaProvider, Boolean isNullTargetSchema, Boolean hasErrorTable, Boolean shouldTryWriteToErrorTable) {
        HoodieSparkEngineContext hoodieSparkEngineContext = (HoodieSparkEngineContext)Mockito.mock(HoodieSparkEngineContext.class);
        HoodieHadoopStorage storage = new HoodieHadoopStorage((FileSystem)Mockito.mock(FileSystem.class));
        SparkSession sparkSession = (SparkSession)Mockito.mock(SparkSession.class);
        Configuration configuration = (Configuration)Mockito.mock(Configuration.class);
        HoodieStreamer.Config cfg = new HoodieStreamer.Config();
        cfg.targetTableName = "testTableName";
        cfg.targetBasePath = "/fake/table/name";
        cfg.tableType = "MERGE_ON_READ";
        SourceFormatAdapter sourceFormatAdapter = (SourceFormatAdapter)Mockito.mock(SourceFormatAdapter.class);
        SchemaProvider inputBatchSchemaProvider = this.getSchemaProvider("InputBatch", false);
        Option fakeDataFrame = Option.of((Object)Mockito.mock(Dataset.class));
        InputBatch fakeRowInputBatch = new InputBatch(fakeDataFrame, "chkpt", inputBatchSchemaProvider);
        Mockito.when((Object)sourceFormatAdapter.fetchNewDataInRowFormat((Option)ArgumentMatchers.any(), ArgumentMatchers.anyLong())).thenReturn((Object)fakeRowInputBatch);
        InputBatch fakeAvroInputBatch = new InputBatch(Option.empty(), "chkpt", inputBatchSchemaProvider);
        Mockito.when((Object)sourceFormatAdapter.fetchNewDataInAvroFormat((Option)ArgumentMatchers.any(), ArgumentMatchers.anyLong())).thenReturn((Object)fakeAvroInputBatch);
        Mockito.when((Object)sourceFormatAdapter.processErrorEvents((Option)ArgumentMatchers.any(), (ErrorEvent.ErrorReason)ArgumentMatchers.any())).thenReturn((Object)Option.empty());
        Option transformerOption = Option.empty();
        if (hasTransformer.booleanValue()) {
            transformerOption = Option.of((Object)Mockito.mock(Transformer.class));
        }
        SchemaProvider schemaProvider = null;
        if (hasSchemaProvider.booleanValue()) {
            schemaProvider = this.getSchemaProvider("UserProvided", isNullTargetSchema);
        }
        TypedProperties props = new TypedProperties();
        props.put((Object)DataSourceWriteOptions.RECONCILE_SCHEMA().key(), (Object)false);
        Option errorTableWriterOption = Option.empty();
        if (hasErrorTable.booleanValue()) {
            errorTableWriterOption = Option.of((Object)Mockito.mock(BaseErrorTableWriter.class));
            props.put((Object)HoodieErrorTableConfig.ERROR_ENABLE_VALIDATE_TARGET_SCHEMA.key(), (Object)true);
        }
        TypedProperties propsSpy = (TypedProperties)Mockito.spy((Object)props);
        StreamSync streamSync = new StreamSync(cfg, sparkSession, propsSpy, hoodieSparkEngineContext, (HoodieStorage)storage, configuration, client -> true, schemaProvider, errorTableWriterOption, sourceFormatAdapter, transformerOption, useRowWriter.booleanValue(), false);
        StreamSync spy = (StreamSync)Mockito.spy((Object)streamSync);
        SchemaProvider deducedSchemaProvider = this.getSchemaProvider("deduced", false);
        ((StreamSync)Mockito.doReturn((Object)deducedSchemaProvider).when((Object)spy)).getDeducedSchemaProvider((Schema)ArgumentMatchers.any(), (SchemaProvider)ArgumentMatchers.any(), (HoodieTableMetaClient)ArgumentMatchers.any());
        InputBatch batch = spy.fetchNextBatchFromSource(Option.empty(), (HoodieTableMetaClient)Mockito.mock(HoodieTableMetaClient.class));
        ((StreamSync)Mockito.verify((Object)spy, (VerificationMode)Mockito.times((int)1))).getDeducedSchemaProvider((Schema)ArgumentMatchers.any(), (SchemaProvider)ArgumentMatchers.any(), (HoodieTableMetaClient)ArgumentMatchers.any());
        Assertions.assertEquals((Object)deducedSchemaProvider.getTargetSchema(), (Object)batch.getSchemaProvider().getTargetSchema());
        ((TypedProperties)Mockito.verify((Object)propsSpy, (VerificationMode)(shouldTryWriteToErrorTable != false ? Mockito.times((int)1) : Mockito.never()))).getBoolean(HoodieErrorTableConfig.ERROR_ENABLE_VALIDATE_TARGET_SCHEMA.key(), ((Boolean)HoodieErrorTableConfig.ERROR_ENABLE_VALIDATE_TARGET_SCHEMA.defaultValue()).booleanValue());
    }

    @ParameterizedTest
    @MethodSource(value={"getCheckpointToResumeCases"})
    void testGetCheckpointToResume(HoodieStreamer.Config cfg, HoodieCommitMetadata commitMetadata, Option<String> expectedResumeCheckpoint) throws IOException {
        HoodieSparkEngineContext hoodieSparkEngineContext = (HoodieSparkEngineContext)Mockito.mock(HoodieSparkEngineContext.class);
        HoodieHadoopStorage storage = new HoodieHadoopStorage((FileSystem)Mockito.mock(FileSystem.class));
        TypedProperties props = new TypedProperties();
        SparkSession sparkSession = (SparkSession)Mockito.mock(SparkSession.class);
        Configuration configuration = (Configuration)Mockito.mock(Configuration.class);
        HoodieTimeline commitsTimeline = (HoodieTimeline)Mockito.mock(HoodieTimeline.class);
        HoodieInstant hoodieInstant = (HoodieInstant)Mockito.mock(HoodieInstant.class);
        Mockito.when((Object)commitsTimeline.filter((Predicate)ArgumentMatchers.any())).thenReturn((Object)commitsTimeline);
        Mockito.when((Object)commitsTimeline.lastInstant()).thenReturn((Object)Option.of((Object)hoodieInstant));
        StreamSync streamSync = new StreamSync(cfg, sparkSession, props, hoodieSparkEngineContext, (HoodieStorage)storage, configuration, client -> true, null, Option.empty(), null, Option.empty(), true, true);
        StreamSync spy = (StreamSync)Mockito.spy((Object)streamSync);
        ((StreamSync)Mockito.doReturn((Object)Option.of((Object)commitMetadata)).when((Object)spy)).getLatestCommitMetadataWithValidCheckpointInfo((HoodieTimeline)ArgumentMatchers.any());
        Option resumeCheckpoint = spy.getCheckpointToResume(Option.of((Object)commitsTimeline));
        Assertions.assertEquals(expectedResumeCheckpoint, (Object)resumeCheckpoint);
    }

    private static Stream<Arguments> getCheckpointToResumeCases() {
        return Stream.of(Arguments.of((Object[])new Object[]{TestStreamSyncUnitTests.generateDeltaStreamerConfig("new-reset-checkpoint", null), TestStreamSyncUnitTests.generateCommitMetadata("old-reset-checkpoint", null, null), Option.of((Object)"new-reset-checkpoint")}), Arguments.of((Object[])new Object[]{TestStreamSyncUnitTests.generateDeltaStreamerConfig("old-reset-checkpoint", null), TestStreamSyncUnitTests.generateCommitMetadata("old-reset-checkpoint", null, "checkpoint-prev-run"), Option.of((Object)"checkpoint-prev-run")}), Arguments.of((Object[])new Object[]{TestStreamSyncUnitTests.generateDeltaStreamerConfig("old-reset-checkpoint", "123445"), TestStreamSyncUnitTests.generateCommitMetadata("old-reset-checkpoint", "123445", "checkpoint-prev-run"), Option.of((Object)"checkpoint-prev-run")}), Arguments.of((Object[])new Object[]{TestStreamSyncUnitTests.generateDeltaStreamerConfig("old-reset-checkpoint", "123445"), TestStreamSyncUnitTests.generateCommitMetadata("old-reset-checkpoint", "123422", "checkpoint-prev-run"), Option.empty()}), Arguments.of((Object[])new Object[]{TestStreamSyncUnitTests.generateDeltaStreamerConfig("new-reset-checkpoint", "123445"), TestStreamSyncUnitTests.generateCommitMetadata("old-reset-checkpoint", "123422", "checkpoint-prev-run"), Option.empty()}));
    }

    private static HoodieStreamer.Config generateDeltaStreamerConfig(String checkpoint, String ignoreCheckpoint) {
        HoodieStreamer.Config cfg = new HoodieStreamer.Config();
        cfg.checkpoint = checkpoint;
        cfg.ignoreCheckpoint = ignoreCheckpoint;
        cfg.tableType = "MERGE_ON_READ";
        return cfg;
    }

    private static HoodieCommitMetadata generateCommitMetadata(String resetCheckpointValue, String ignoreCheckpointValue, String checkpointValue) {
        HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
        commitMetadata.addMetadata("deltastreamer.checkpoint.reset_key", resetCheckpointValue);
        commitMetadata.addMetadata("deltastreamer.checkpoint.ignore_key", ignoreCheckpointValue);
        commitMetadata.addMetadata("deltastreamer.checkpoint.key", checkpointValue);
        return commitMetadata;
    }

    private SchemaProvider getSchemaProvider(String name, boolean isNullTargetSchema) {
        SchemaProvider schemaProvider = (SchemaProvider)Mockito.mock(SchemaProvider.class);
        Schema sourceSchema = (Schema)Mockito.mock(Schema.class);
        Schema targetSchema = isNullTargetSchema ? InputBatch.NULL_SCHEMA : (Schema)Mockito.mock(Schema.class);
        Mockito.when((Object)schemaProvider.getSourceSchema()).thenReturn((Object)sourceSchema);
        Mockito.when((Object)schemaProvider.getTargetSchema()).thenReturn((Object)targetSchema);
        Mockito.when((Object)sourceSchema.toString()).thenReturn((Object)(name + "SourceSchema"));
        if (!isNullTargetSchema) {
            Mockito.when((Object)targetSchema.toString()).thenReturn((Object)(name + "TargetSchema"));
        }
        return schemaProvider;
    }

    static Stream<Arguments> testCasesFetchNextBatchFromSource() {
        Stream.Builder<Arguments> b = Stream.builder();
        for (Boolean useRowWriter : new Boolean[]{false, true}) {
            for (Boolean hasErrorTable : new Boolean[]{false, true}) {
                boolean errorTableEnabled = hasErrorTable != false && useRowWriter == false;
                b.add(Arguments.of((Object[])new Object[]{useRowWriter, false, false, false, hasErrorTable, errorTableEnabled}));
            }
        }
        for (Boolean useRowWriter : new Boolean[]{false, true}) {
            for (Boolean hasSchemaProvider : new Boolean[]{false, true}) {
                for (Boolean isNullTargetSchema : new Boolean[]{false, true}) {
                    for (Boolean hasErrorTable : new Boolean[]{false, true}) {
                        boolean errorTableEnabled = hasErrorTable != false && useRowWriter == false;
                        boolean schemaProviderNullOrMissing = isNullTargetSchema != false || hasSchemaProvider == false;
                        boolean shouldTryWriteToErrorTable = errorTableEnabled && !schemaProviderNullOrMissing;
                        b.add(Arguments.of((Object[])new Object[]{useRowWriter, true, hasSchemaProvider, isNullTargetSchema, hasErrorTable, shouldTryWriteToErrorTable}));
                    }
                }
            }
        }
        return b.build();
    }
}

