/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.utilities.streamer;

import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import org.apache.hudi.common.config.ConfigProperty;
import org.apache.hudi.common.config.DFSPropertiesConfiguration;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieAvroPayload;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV1;
import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV2;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
import org.apache.hudi.common.testutils.SchemaTestUtil;
import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieUpgradeDowngradeException;
import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.testutils.HoodieClientTestUtils;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerTestBase;
import org.apache.hudi.utilities.sources.MockGeneralHoodieIncrSource;
import org.apache.hudi.utilities.sources.S3EventsHoodieIncrSourceHarness;
import org.apache.hudi.utilities.sources.helpers.IncrSourceHelper;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.junit.jupiter.MockitoExtension;

@ExtendWith(value={MockitoExtension.class})
public class TestHoodieIncrSourceE2EAutoUpgrade
extends S3EventsHoodieIncrSourceHarness {
    private String schemaStr;

    @BeforeEach
    void setup() throws IOException {
        super.setUp();
        this.schemaStr = SchemaTestUtil.getSimpleSchema().toString();
    }

    public HoodieTableMetaClient getHoodieMetaClient(StorageConfiguration<?> storageConf, String basePath, Properties props) throws IOException {
        return HoodieTableMetaClient.newTableBuilder().setTableName("raw_trips").setTableType(HoodieTableType.COPY_ON_WRITE).setPayloadClass(HoodieAvroPayload.class).setTableVersion(ConfigUtils.getIntWithAltKeys((Properties)props, (ConfigProperty)HoodieWriteConfig.WRITE_TABLE_VERSION)).setTableCreateSchema(this.schemaStr).fromProperties(props).initTable(storageConf.newInstance(), basePath);
    }

    private String toggleVersion(String version) {
        return "8".equals(version) ? "6" : "8";
    }

    private HoodieDeltaStreamer.Config createConfig(String basePath, String sourceCheckpoint, String sourceClass) {
        HoodieDeltaStreamer.Config cfg = HoodieDeltaStreamerTestBase.TestHelpers.makeConfig(basePath, WriteOperationType.INSERT, sourceClass, Collections.emptyList(), sourceCheckpoint != null ? DFSPropertiesConfiguration.DEFAULT_PATH.toString() : null, false, false, 100000, false, null, null, "timestamp", sourceCheckpoint);
        cfg.propsFilePath = DFSPropertiesConfiguration.DEFAULT_PATH.toString();
        return cfg;
    }

    private TypedProperties setupBaseProperties(String tableVersion) {
        TypedProperties props = this.setProps(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT);
        props.put((Object)HoodieWriteConfig.WRITE_TABLE_VERSION.key(), (Object)tableVersion);
        return props;
    }

    public void verifyLastInstantCommitMetadata(Map<String, String> expectedMetadata) {
        this.metaClient.reloadActiveTimeline();
        Option metadata = HoodieClientTestUtils.getCommitMetadataForInstant((HoodieTableMetaClient)this.metaClient, (HoodieInstant)((HoodieInstant)this.metaClient.getActiveTimeline().lastInstant().get()));
        Assertions.assertFalse((boolean)metadata.isEmpty());
        Assertions.assertEquals(expectedMetadata, (Object)((HoodieCommitMetadata)metadata.get()).getExtraMetadata());
    }

    @Test
    public void testSyncE2ENoPrevCkpThenSyncMultipleTimes() throws Exception {
        String sourceClass = MockGeneralHoodieIncrSource.class.getName();
        this.metaClient = this.getHoodieMetaClientWithTableVersion(this.storageConf(), this.basePath(), "6");
        TypedProperties props = this.setupBaseProperties("6");
        props.put((Object)"mockTestFetchNextBatchOp", (Object)"OP_EMPTY_ROW_SET_NONE_NULL_CKP_V1_KEY");
        props.put((Object)"RETURN_CHECKPOINT_KEY", (Object)"10");
        props.put((Object)"valInputCkp", (Object)"VAL_EMPTY_CKP_KEY");
        props.put((Object)"hoodie.metadata.enable", (Object)"false");
        props.put((Object)"hoodie.write.auto.upgrade", (Object)"false");
        HoodieDeltaStreamer ds = new HoodieDeltaStreamer(this.createConfig(this.basePath(), null, sourceClass), this.jsc, Option.of((Object)props));
        ds.sync();
        this.metaClient.reloadActiveTimeline();
        HashMap<String, String> expectedMetadata = new HashMap<String, String>();
        expectedMetadata.put("schema", "");
        expectedMetadata.put("deltastreamer.checkpoint.key", "10");
        this.verifyLastInstantCommitMetadata(expectedMetadata);
        Assertions.assertEquals((int)HoodieTableVersion.SIX.versionCode(), (int)this.metaClient.getTableConfig().getTableVersion().versionCode());
        Assertions.assertEquals((Integer)TimelineLayoutVersion.LAYOUT_VERSION_1.getVersion(), (Integer)((TimelineLayoutVersion)this.metaClient.getTableConfig().getTimelineLayoutVersion().get()).getVersion());
        HoodieInstant instantAfterFirstRound = (HoodieInstant)this.metaClient.getActiveTimeline().lastInstant().get();
        props = this.setupBaseProperties("8");
        props.put((Object)"hoodie.metadata.enable", (Object)"true");
        props.put((Object)"hoodie.write.auto.upgrade", (Object)"true");
        props.put((Object)"valInputCkp", (Object)"VAL_NO_INGESTION_HAPPENS_KEY");
        HoodieDeltaStreamer.Config cfg = this.createConfig(this.basePath(), null, sourceClass);
        cfg.checkpoint = "overrideWhenAutoUpgradingWouldFail";
        ds = new HoodieDeltaStreamer(cfg, this.jsc, Option.of((Object)props));
        Exception ex = (Exception)Assertions.assertThrows(HoodieUpgradeDowngradeException.class, () -> ((HoodieDeltaStreamer)ds).sync());
        Assertions.assertTrue((boolean)ex.getMessage().contains("When upgrade/downgrade is happening, please avoid setting --checkpoint option and --ignore-checkpoint for your delta streamers."));
        this.metaClient.reloadActiveTimeline();
        Assertions.assertEquals((Object)this.metaClient.getActiveTimeline().lastInstant().get(), (Object)instantAfterFirstRound);
        Assertions.assertEquals((Object)HoodieTableVersion.SIX, (Object)this.metaClient.getTableConfig().getTableVersion());
        Assertions.assertEquals((Object)TimelineLayoutVersion.LAYOUT_VERSION_1, (Object)this.metaClient.getTableConfig().getTimelineLayoutVersion().get());
        props = this.setupBaseProperties("8");
        props.put((Object)"hoodie.metadata.enable", (Object)"false");
        props.put((Object)"valInputCkp", (Object)"VAL_NON_EMPTY_CKP_ALL_MEMBERS");
        props.put((Object)"VAL_CKP_INSTANCE_OF_KEY", (Object)StreamerCheckpointV1.class.getName());
        props.put((Object)"VAL_CKP_KEY_EQ_VAL_KEY", (Object)"10");
        props.put((Object)"VAL_CKP_RESET_KEY_IS_NULL", (Object)"IGNORED");
        props.put((Object)"VAL_CKP_IGNORE_KEY_IS_NULL", (Object)"IGNORED");
        props.put((Object)"mockTestFetchNextBatchOp", (Object)"OP_EMPTY_ROW_SET_NONE_NULL_CKP_V2_KEY");
        props.put((Object)"RETURN_CHECKPOINT_KEY", (Object)"20");
        props.put((Object)"hoodie.write.auto.upgrade", (Object)"true");
        ds = new HoodieDeltaStreamer(this.createConfig(this.basePath(), null, sourceClass), this.jsc, Option.of((Object)props));
        ds.sync();
        this.metaClient = this.getHoodieMetaClientWithTableVersion(this.storageConf(), this.basePath(), "8");
        expectedMetadata.clear();
        expectedMetadata.put("schema", this.schemaStr);
        expectedMetadata.put("streamer.checkpoint.key.v2", "20");
        this.verifyLastInstantCommitMetadata(expectedMetadata);
        this.metaClient.reloadActiveTimeline();
        Assertions.assertNotEquals((Object)this.metaClient.getActiveTimeline().lastInstant().get(), (Object)instantAfterFirstRound);
        Assertions.assertEquals((Object)HoodieTableVersion.EIGHT, (Object)this.metaClient.getTableConfig().getTableVersion());
        Assertions.assertEquals((Object)TimelineLayoutVersion.LAYOUT_VERSION_2, (Object)this.metaClient.getTableConfig().getTimelineLayoutVersion().get());
        props = this.setupBaseProperties("8");
        props.put((Object)"hoodie.metadata.enable", (Object)"true");
        cfg = this.createConfig(this.basePath(), "resumeFromInstantRequestTime:30", sourceClass);
        props.put((Object)"valInputCkp", (Object)"VAL_NON_EMPTY_CKP_ALL_MEMBERS");
        props.put((Object)"VAL_CKP_INSTANCE_OF_KEY", (Object)StreamerCheckpointV1.class.getName());
        props.put((Object)"VAL_CKP_KEY_EQ_VAL_KEY", (Object)"30");
        props.put((Object)"VAL_CKP_RESET_KEY_IS_NULL", (Object)"IGNORED");
        props.put((Object)"VAL_CKP_IGNORE_KEY_IS_NULL", (Object)"IGNORED");
        props.put((Object)"mockTestFetchNextBatchOp", (Object)"OP_EMPTY_ROW_SET_NONE_NULL_CKP_V2_KEY");
        props.put((Object)"RETURN_CHECKPOINT_KEY", (Object)"40");
        props.put((Object)"hoodie.write.auto.upgrade", (Object)"true");
        ds = new HoodieDeltaStreamer(cfg, this.jsc, Option.of((Object)props));
        ds.sync();
        this.metaClient = this.getHoodieMetaClientWithTableVersion(this.storageConf(), this.basePath(), "8");
        expectedMetadata.clear();
        expectedMetadata.put("schema", this.schemaStr);
        expectedMetadata.put("streamer.checkpoint.key.v2", "40");
        expectedMetadata.put("streamer.checkpoint.reset.key.v2", "resumeFromInstantRequestTime:30");
        this.verifyLastInstantCommitMetadata(expectedMetadata);
        this.metaClient.reloadActiveTimeline();
        Assertions.assertNotEquals((Object)this.metaClient.getActiveTimeline().lastInstant().get(), (Object)instantAfterFirstRound);
        Assertions.assertEquals((Object)HoodieTableVersion.EIGHT, (Object)this.metaClient.getTableConfig().getTableVersion());
        Assertions.assertEquals((Object)TimelineLayoutVersion.LAYOUT_VERSION_2, (Object)this.metaClient.getTableConfig().getTimelineLayoutVersion().get());
        props = this.setupBaseProperties("8");
        props.put((Object)"hoodie.metadata.enable", (Object)"true");
        props.put((Object)"valInputCkp", (Object)"VAL_NON_EMPTY_CKP_ALL_MEMBERS");
        props.put((Object)"VAL_CKP_INSTANCE_OF_KEY", (Object)StreamerCheckpointV2.class.getName());
        props.put((Object)"VAL_CKP_KEY_EQ_VAL_KEY", (Object)"40");
        props.put((Object)"VAL_CKP_RESET_KEY", (Object)"resumeFromInstantRequestTime:30");
        props.put((Object)"VAL_CKP_IGNORE_KEY_IS_NULL", (Object)"IGNORED");
        props.put((Object)"mockTestFetchNextBatchOp", (Object)"OP_EMPTY_ROW_SET_NONE_NULL_CKP_V2_KEY");
        props.put((Object)"RETURN_CHECKPOINT_KEY", (Object)"50");
        props.put((Object)"hoodie.write.auto.upgrade", (Object)"true");
        ds = new HoodieDeltaStreamer(cfg, this.jsc, Option.of((Object)props));
        ds.sync();
        this.metaClient = this.getHoodieMetaClientWithTableVersion(this.storageConf(), this.basePath(), "8");
        expectedMetadata.clear();
        expectedMetadata.put("schema", this.schemaStr);
        expectedMetadata.put("streamer.checkpoint.key.v2", "50");
        expectedMetadata.put("streamer.checkpoint.reset.key.v2", "resumeFromInstantRequestTime:30");
        this.verifyLastInstantCommitMetadata(expectedMetadata);
        this.metaClient.reloadActiveTimeline();
        Assertions.assertEquals((Object)HoodieTableVersion.EIGHT, (Object)this.metaClient.getTableConfig().getTableVersion());
        Assertions.assertEquals((Object)TimelineLayoutVersion.LAYOUT_VERSION_2, (Object)this.metaClient.getTableConfig().getTimelineLayoutVersion().get());
    }
}

