/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.utilities.streamer;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import javolution.testing.TestContext;
import org.apache.hudi.common.config.DFSPropertiesConfiguration;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.common.table.checkpoint.CheckpointUtils;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.testutils.HoodieClientTestUtils;
import org.apache.hudi.utilities.config.HoodieStreamerConfig;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerTestBase;
import org.apache.hudi.utilities.sources.GcsEventsHoodieIncrSource;
import org.apache.hudi.utilities.sources.MockS3EventsHoodieIncrSource;
import org.apache.hudi.utilities.sources.S3EventsHoodieIncrSource;
import org.apache.hudi.utilities.sources.S3EventsHoodieIncrSourceHarness;
import org.apache.hudi.utilities.sources.helpers.IncrSourceHelper;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.junit.jupiter.MockitoExtension;

@ExtendWith(value={MockitoExtension.class})
public class TestHoodieIncrSourceE2E
extends S3EventsHoodieIncrSourceHarness {
    private String toggleVersion(String version) {
        return "8".equals(version) ? "6" : "8";
    }

    private HoodieDeltaStreamer.Config createConfig(String basePath, String sourceCheckpoint) {
        return this.createConfig(basePath, sourceCheckpoint, MockS3EventsHoodieIncrSource.class.getName());
    }

    private HoodieDeltaStreamer.Config createConfig(String basePath, String sourceCheckpoint, String sourceClass) {
        HoodieDeltaStreamer.Config cfg = HoodieDeltaStreamerTestBase.TestHelpers.makeConfig(basePath, WriteOperationType.INSERT, sourceClass, Collections.emptyList(), sourceCheckpoint != null ? DFSPropertiesConfiguration.DEFAULT_PATH.toString() : null, false, false, 100000, false, null, null, "timestamp", sourceCheckpoint);
        cfg.propsFilePath = DFSPropertiesConfiguration.DEFAULT_PATH.toString();
        return cfg;
    }

    private TypedProperties setupBaseProperties(String tableVersion) {
        TypedProperties props = this.setProps(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT);
        props.put((Object)HoodieWriteConfig.WRITE_TABLE_VERSION.key(), (Object)tableVersion);
        return props;
    }

    public void verifyLastInstantCommitMetadata(Map<String, String> expectedMetadata) {
        this.metaClient.reloadActiveTimeline();
        Option metadata = HoodieClientTestUtils.getCommitMetadataForInstant((HoodieTableMetaClient)this.metaClient, (HoodieInstant)((HoodieInstant)this.metaClient.getActiveTimeline().lastInstant().get()));
        Assertions.assertFalse((boolean)metadata.isEmpty());
        Assertions.assertEquals((Object)((HoodieCommitMetadata)metadata.get()).getExtraMetadata(), expectedMetadata);
    }

    @Disabled(value="HUDI-8952")
    public void testSyncE2EWrongCheckpointVersionErrorOut(String tableVersion, String sourceClass) throws Exception {
        this.metaClient = this.getHoodieMetaClientWithTableVersion(this.storageConf(), this.basePath(), tableVersion);
        TypedProperties props = this.setupBaseProperties(tableVersion);
        props.put((Object)"mockTestFetchNextBatchOp", (Object)(tableVersion.equals("6") ? "OP_EMPTY_ROW_SET_NONE_NULL_CKP_V2_KEY" : "OP_EMPTY_ROW_SET_NONE_NULL_CKP_V1_KEY"));
        props.put((Object)"RETURN_CHECKPOINT_KEY", (Object)"10");
        props.put((Object)"valInputCkp", (Object)"VAL_EMPTY_CKP_KEY");
        props.put((Object)"hoodie.metadata.enable", (Object)"false");
        props.put((Object)"hoodie.write.auto.upgrade", (Object)"false");
        props.put((Object)"hoodie.write.table.version", (Object)tableVersion);
        HoodieDeltaStreamer ds = new HoodieDeltaStreamer(this.createConfig(this.basePath(), null, sourceClass), this.jsc, Option.of((Object)props));
        Exception ex = (Exception)Assertions.assertThrows(IllegalStateException.class, () -> ((HoodieDeltaStreamer)ds).sync());
        if (tableVersion.equals("8")) {
            TestContext.assertTrue((boolean)ex.getMessage().contains("Data source should return checkpoint version V2."));
        } else {
            TestContext.assertTrue((boolean)ex.getMessage().contains("Data source should return checkpoint version V1."));
        }
    }

    @ParameterizedTest
    @CsvSource(value={"6, org.apache.hudi.utilities.sources.MockS3EventsHoodieIncrSource", "8, org.apache.hudi.utilities.sources.MockS3EventsHoodieIncrSource", "6, org.apache.hudi.utilities.sources.MockGcsEventsHoodieIncrSource", "8, org.apache.hudi.utilities.sources.MockGcsEventsHoodieIncrSource"})
    public void testSyncE2ENoPrevCkpThenSyncMultipleTimes(String tableVersion, String sourceClass) throws Exception {
        this.metaClient = this.getHoodieMetaClientWithTableVersion(this.storageConf(), this.basePath(), tableVersion);
        TypedProperties props = this.setupBaseProperties(tableVersion);
        props.put((Object)"mockTestFetchNextBatchOp", (Object)"OP_EMPTY_ROW_SET_NONE_NULL_CKP_V1_KEY");
        props.put((Object)"RETURN_CHECKPOINT_KEY", (Object)"10");
        props.put((Object)"valInputCkp", (Object)"VAL_EMPTY_CKP_KEY");
        props.put((Object)"hoodie.metadata.enable", (Object)"false");
        props.put((Object)"hoodie.write.auto.upgrade", (Object)"false");
        HoodieDeltaStreamer ds = new HoodieDeltaStreamer(this.createConfig(this.basePath(), null, sourceClass), this.jsc, Option.of((Object)props));
        ds.sync();
        HashMap<String, String> expectedMetadata = new HashMap<String, String>();
        expectedMetadata.put("schema", "");
        expectedMetadata.put("deltastreamer.checkpoint.key", "10");
        this.verifyLastInstantCommitMetadata(expectedMetadata);
        props = this.setupBaseProperties(this.toggleVersion(tableVersion));
        props.put((Object)"hoodie.metadata.enable", (Object)"false");
        props.put((Object)"mockTestFetchNextBatchOp", (Object)"OP_EMPTY_ROW_SET_NONE_NULL_CKP_V1_KEY");
        props.put((Object)"RETURN_CHECKPOINT_KEY", (Object)"20");
        props.put((Object)"hoodie.write.auto.upgrade", (Object)"false");
        props.put((Object)"valInputCkp", (Object)"VAL_NON_EMPTY_CKP_ALL_MEMBERS");
        props.put((Object)"VAL_CKP_KEY_EQ_VAL_KEY", (Object)"10");
        props.put((Object)"VAL_CKP_RESET_KEY_IS_NULL", (Object)"IGNORED");
        props.put((Object)"VAL_CKP_IGNORE_KEY_IS_NULL", (Object)"IGNORED");
        ds = new HoodieDeltaStreamer(this.createConfig(this.basePath(), null, sourceClass), this.jsc, Option.of((Object)props));
        ds.sync();
        expectedMetadata = new HashMap();
        expectedMetadata.put("schema", "");
        expectedMetadata.put("deltastreamer.checkpoint.key", "20");
        this.verifyLastInstantCommitMetadata(expectedMetadata);
        props = this.setupBaseProperties("8");
        props.put((Object)"hoodie.metadata.enable", (Object)"false");
        props.put((Object)"mockTestFetchNextBatchOp", (Object)"OP_EMPTY_ROW_SET_NONE_NULL_CKP_V1_KEY");
        props.put((Object)"RETURN_CHECKPOINT_KEY", (Object)"30");
        props.put((Object)"hoodie.write.auto.upgrade", (Object)"false");
        props.put((Object)"valInputCkp", (Object)"VAL_NON_EMPTY_CKP_ALL_MEMBERS");
        props.put((Object)"VAL_CKP_KEY_EQ_VAL_KEY", (Object)"20");
        props.put((Object)"VAL_CKP_RESET_KEY_IS_NULL", (Object)"IGNORED");
        props.put((Object)"VAL_CKP_IGNORE_KEY_IS_NULL", (Object)"IGNORED");
        ds = new HoodieDeltaStreamer(this.createConfig(this.basePath(), null, sourceClass), this.jsc, Option.of((Object)props));
        ds.sync();
        if (tableVersion.equals("6")) {
            this.metaClient = this.getHoodieMetaClientWithTableVersion(this.storageConf(), this.basePath(), "6");
            Assertions.assertEquals((Object)HoodieTableVersion.SIX, (Object)this.metaClient.getTableConfig().getTableVersion());
            Assertions.assertEquals((Object)TimelineLayoutVersion.LAYOUT_VERSION_1, (Object)this.metaClient.getTableConfig().getTimelineLayoutVersion().get());
        }
        expectedMetadata = new HashMap();
        expectedMetadata.put("schema", "");
        expectedMetadata.put("deltastreamer.checkpoint.key", "30");
        this.verifyLastInstantCommitMetadata(expectedMetadata);
        props = this.setupBaseProperties("8");
        props.put((Object)"hoodie.metadata.enable", (Object)"false");
        props.put((Object)"mockTestFetchNextBatchOp", (Object)"OP_EMPTY_ROW_SET_NONE_NULL_CKP_V1_KEY");
        props.put((Object)"RETURN_CHECKPOINT_KEY", (Object)"40");
        props.put((Object)"hoodie.write.auto.upgrade", (Object)"true");
        props.put((Object)"valInputCkp", (Object)"VAL_NON_EMPTY_CKP_ALL_MEMBERS");
        props.put((Object)"VAL_CKP_KEY_EQ_VAL_KEY", (Object)"30");
        props.put((Object)"VAL_CKP_RESET_KEY_IS_NULL", (Object)"IGNORED");
        props.put((Object)"VAL_CKP_IGNORE_KEY_IS_NULL", (Object)"IGNORED");
        ds = new HoodieDeltaStreamer(this.createConfig(this.basePath(), null, sourceClass), this.jsc, Option.of((Object)props));
        ds.sync();
        this.metaClient = this.getHoodieMetaClientWithTableVersion(this.storageConf(), this.basePath(), "8");
        Assertions.assertEquals((Object)HoodieTableVersion.EIGHT, (Object)this.metaClient.getTableConfig().getTableVersion());
        Assertions.assertEquals((Object)TimelineLayoutVersion.LAYOUT_VERSION_2, (Object)this.metaClient.getTableConfig().getTimelineLayoutVersion().get());
        expectedMetadata = new HashMap();
        expectedMetadata.put("schema", "");
        expectedMetadata.put("deltastreamer.checkpoint.key", "40");
        this.verifyLastInstantCommitMetadata(expectedMetadata);
    }

    @ParameterizedTest
    @ValueSource(strings={"6", "8"})
    public void testSyncE2ENoPrevCkpWithCkpOverride(String tableVersion) throws Exception {
        this.metaClient = this.getHoodieMetaClientWithTableVersion(this.storageConf(), this.basePath(), tableVersion);
        TypedProperties props = this.setupBaseProperties(tableVersion);
        props.put((Object)"mockTestFetchNextBatchOp", (Object)"OP_EMPTY_ROW_SET_NONE_NULL_CKP_V1_KEY");
        props.put((Object)"valInputCkp", (Object)"VAL_NON_EMPTY_CKP_ALL_MEMBERS");
        props.put((Object)"RETURN_CHECKPOINT_KEY", (Object)"30");
        props.put((Object)"VAL_CKP_KEY_EQ_VAL_KEY", (Object)"10");
        HoodieDeltaStreamer.Config cfg = this.createConfig(this.basePath(), "10");
        HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, this.jsc, Option.of((Object)props));
        ds.sync();
        HashMap<String, String> expectedMetadata = new HashMap<String, String>();
        expectedMetadata.put("schema", "");
        expectedMetadata.put("deltastreamer.checkpoint.key", "30");
        expectedMetadata.put("deltastreamer.checkpoint.reset_key", "10");
        this.verifyLastInstantCommitMetadata(expectedMetadata);
        props.put((Object)"RETURN_CHECKPOINT_KEY", (Object)"40");
        props.put((Object)"VAL_CKP_KEY_EQ_VAL_KEY", (Object)"30");
        ds = new HoodieDeltaStreamer(cfg, this.jsc, Option.of((Object)props));
        ds.sync();
        expectedMetadata.clear();
        expectedMetadata.put("schema", "");
        expectedMetadata.put("deltastreamer.checkpoint.key", "40");
        expectedMetadata.put("deltastreamer.checkpoint.reset_key", "10");
        this.verifyLastInstantCommitMetadata(expectedMetadata);
        props.put((Object)"VAL_CKP_KEY_EQ_VAL_KEY", (Object)"40");
        props.put((Object)"RETURN_CHECKPOINT_KEY", (Object)"50");
        ds = new HoodieDeltaStreamer(this.createConfig(this.basePath(), null), this.jsc, Option.of((Object)props));
        ds.sync();
        expectedMetadata.clear();
        expectedMetadata.put("schema", "");
        expectedMetadata.put("deltastreamer.checkpoint.key", "50");
        this.verifyLastInstantCommitMetadata(expectedMetadata);
        props.put((Object)"valInputCkp", (Object)"VAL_EMPTY_CKP_KEY");
        props.put((Object)"RETURN_CHECKPOINT_KEY", (Object)"60");
        cfg = this.createConfig(this.basePath(), null);
        cfg.ignoreCheckpoint = "50";
        ds = new HoodieDeltaStreamer(cfg, this.jsc, Option.of((Object)props));
        ds.sync();
        expectedMetadata.clear();
        expectedMetadata.put("schema", "");
        expectedMetadata.put("deltastreamer.checkpoint.key", "60");
        expectedMetadata.put("deltastreamer.checkpoint.ignore_key", "50");
        this.verifyLastInstantCommitMetadata(expectedMetadata);
        props = this.setupBaseProperties(tableVersion);
        props.put((Object)"mockTestFetchNextBatchOp", (Object)"OP_EMPTY_ROW_SET_NONE_NULL_CKP_V1_KEY");
        props.put((Object)"RETURN_CHECKPOINT_KEY", (Object)"70");
        props.put((Object)"valInputCkp", (Object)"VAL_NON_EMPTY_CKP_ALL_MEMBERS");
        props.put((Object)"VAL_CKP_KEY_EQ_VAL_KEY", (Object)"60");
        ds = new HoodieDeltaStreamer(cfg, this.jsc, Option.of((Object)props));
        ds.sync();
        expectedMetadata.clear();
        expectedMetadata.put("schema", "");
        expectedMetadata.put("deltastreamer.checkpoint.key", "70");
        expectedMetadata.put("deltastreamer.checkpoint.ignore_key", "50");
        this.verifyLastInstantCommitMetadata(expectedMetadata);
        cfg.ignoreCheckpoint = null;
        props = this.setupBaseProperties(tableVersion);
        props.put((Object)"mockTestFetchNextBatchOp", (Object)"OP_EMPTY_ROW_SET_NONE_NULL_CKP_V1_KEY");
        props.put((Object)"RETURN_CHECKPOINT_KEY", (Object)"80");
        props.put((Object)"valInputCkp", (Object)"VAL_NON_EMPTY_CKP_ALL_MEMBERS");
        props.put((Object)"VAL_CKP_KEY_EQ_VAL_KEY", (Object)"70");
        ds = new HoodieDeltaStreamer(cfg, this.jsc, Option.of((Object)props));
        ds.sync();
        expectedMetadata.clear();
        expectedMetadata.put("schema", "");
        expectedMetadata.put("deltastreamer.checkpoint.key", "80");
        this.verifyLastInstantCommitMetadata(expectedMetadata);
    }

    @ParameterizedTest
    @ValueSource(strings={"6", "8"})
    public void testSyncE2ENoNextCkpNoPrevCkp(String tableVersion) throws Exception {
        this.metaClient = this.getHoodieMetaClientWithTableVersion(this.storageConf(), this.basePath(), tableVersion);
        TypedProperties props = this.setupBaseProperties(tableVersion);
        props.put((Object)"mockTestFetchNextBatchOp", (Object)"OP_EMPTY_ROW_SET_NULL_CKP");
        props.put((Object)"valInputCkp", (Object)"VAL_EMPTY_CKP_KEY");
        HoodieDeltaStreamer.Config cfg = this.createConfig(this.basePath(), null);
        cfg.allowCommitOnNoCheckpointChange = true;
        HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, this.jsc, Option.of((Object)props));
        ds.sync();
        HashMap<String, String> expectedMetadata = new HashMap<String, String>();
        expectedMetadata.put("schema", "");
        this.verifyLastInstantCommitMetadata(expectedMetadata);
    }

    @ParameterizedTest
    @ValueSource(strings={"6", "8"})
    public void testSyncE2ENoNextCkpHasPrevCkp(String tableVersion) throws Exception {
        this.metaClient = this.getHoodieMetaClientWithTableVersion(this.storageConf(), this.basePath(), tableVersion);
        String previousCkp = "previousCkp";
        TypedProperties props = this.setupBaseProperties(tableVersion);
        props.put((Object)"valInputCkp", (Object)"VAL_NON_EMPTY_CKP_ALL_MEMBERS");
        props.put((Object)"VAL_CKP_KEY_EQ_VAL_KEY", (Object)previousCkp);
        props.put((Object)"VAL_CKP_RESET_KEY_IS_NULL", (Object)"");
        props.put((Object)"VAL_CKP_IGNORE_KEY_IS_NULL", (Object)"");
        props.put((Object)"mockTestFetchNextBatchOp", (Object)"OP_EMPTY_ROW_SET_NULL_CKP");
        HoodieDeltaStreamer.Config cfg = this.createConfig(this.basePath(), previousCkp);
        cfg.allowCommitOnNoCheckpointChange = true;
        HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, this.jsc, Option.of((Object)props));
        ds.sync();
        HashMap<String, String> expectedMetadata = new HashMap<String, String>();
        expectedMetadata.put("schema", "");
        expectedMetadata.put("deltastreamer.checkpoint.reset_key", previousCkp);
        this.verifyLastInstantCommitMetadata(expectedMetadata);
    }

    @ParameterizedTest
    @ValueSource(strings={"6", "8"})
    public void testSyncE2EForceSkip(String tableVersion) throws Exception {
        this.metaClient = this.getHoodieMetaClientWithTableVersion(this.storageConf(), this.basePath(), tableVersion);
        TypedProperties props = this.setupBaseProperties(tableVersion);
        props.put((Object)HoodieStreamerConfig.CHECKPOINT_FORCE_SKIP.key(), (Object)"true");
        props.put((Object)"mockTestFetchNextBatchOp", (Object)"OP_EMPTY_ROW_SET_NONE_NULL_CKP_V1_KEY");
        props.put((Object)"valInputCkp", (Object)"VAL_EMPTY_CKP_KEY");
        HoodieDeltaStreamer ds = new HoodieDeltaStreamer(this.createConfig(this.basePath(), null), this.jsc, Option.of((Object)props));
        ds.sync();
        HashMap<String, String> expectedMetadata = new HashMap<String, String>();
        expectedMetadata.put("schema", "");
        this.verifyLastInstantCommitMetadata(expectedMetadata);
    }

    @Test
    public void testTargetCheckpointV2ForS3Gcs() {
        Assertions.assertFalse((boolean)CheckpointUtils.shouldTargetCheckpointV2((int)8, (String)S3EventsHoodieIncrSource.class.getName()));
        Assertions.assertFalse((boolean)CheckpointUtils.shouldTargetCheckpointV2((int)6, (String)S3EventsHoodieIncrSource.class.getName()));
        Assertions.assertFalse((boolean)CheckpointUtils.shouldTargetCheckpointV2((int)8, (String)GcsEventsHoodieIncrSource.class.getName()));
        Assertions.assertFalse((boolean)CheckpointUtils.shouldTargetCheckpointV2((int)6, (String)GcsEventsHoodieIncrSource.class.getName()));
    }
}

