/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.table.functional;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.apache.avro.Schema;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieInstantInfo;
import org.apache.hudi.avro.model.HoodieRollbackPlan;
import org.apache.hudi.avro.model.HoodieRollbackRequest;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.client.embedded.EmbeddedTimelineServerHelper;
import org.apache.hudi.client.embedded.EmbeddedTimelineService;
import org.apache.hudi.common.HoodieRollbackStat;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.IOType;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.testutils.FileCreateUtils;
import org.apache.hudi.common.testutils.HoodieTestTable;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.testutils.RawTripTestPayload;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.storage.StoragePathInfo;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.rollback.BaseRollbackPlanActionExecutor;
import org.apache.hudi.table.action.rollback.MarkerBasedRollbackStrategy;
import org.apache.hudi.table.action.rollback.MergeOnReadRollbackActionExecutor;
import org.apache.hudi.table.action.rollback.RollbackHelper;
import org.apache.hudi.table.marker.DirectWriteMarkers;
import org.apache.hudi.testutils.HoodieClientTestBase;
import org.apache.hudi.testutils.HoodieSparkWriteableTestTable;
import org.apache.spark.api.java.JavaRDD;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;

@Tag(value="functional")
public class TestMarkerBasedRollbackStrategy
extends HoodieClientTestBase {
    private static final String TEST_NAME_WITH_PARAMS = "[{index}] Test with listing metadata enable={0}";
    private HoodieTableType tableType = HoodieTableType.COPY_ON_WRITE;

    public static Stream<Arguments> configParams() {
        return Arrays.stream(new Boolean[][]{{true}, {false}}).map(Arguments::of);
    }

    @Override
    @BeforeEach
    public void setUp() throws Exception {
        this.initPath();
        this.initSparkContexts();
        this.initHoodieStorage();
        this.initMetaClient(this.tableType);
        this.initTestDataGenerator();
    }

    @Override
    @AfterEach
    public void tearDown() throws Exception {
        this.cleanupResources();
    }

    @Test
    public void testMarkerBasedRollbackAppend() throws Exception {
        this.tearDown();
        this.tableType = HoodieTableType.MERGE_ON_READ;
        this.setUp();
        HoodieTestTable testTable = HoodieTestTable.of((HoodieTableMetaClient)this.metaClient);
        String f0 = (String)testTable.addRequestedCommit("000").getFileIdsWithBaseFilesInPartitions(new String[]{"partA"}).get("partA");
        testTable.forCommit("001").withLogMarkerFile("000", "partA", f0, IOType.APPEND, 1);
        HoodieSparkTable hoodieTable = HoodieSparkTable.create((HoodieWriteConfig)this.getConfig(), (HoodieEngineContext)this.context, (HoodieTableMetaClient)this.metaClient);
        List rollbackRequests = new MarkerBasedRollbackStrategy((HoodieTable)hoodieTable, (HoodieEngineContext)this.context, this.getConfig(), "002").getRollbackRequests(HoodieTestUtils.INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.INFLIGHT, "commit", "001"));
        Assertions.assertEquals((int)1, (int)rollbackRequests.size());
    }

    @ParameterizedTest
    @EnumSource(names={"APPEND", "CREATE"})
    public void testMarkerBasedRollbackAppendWithLogFileMarkers(IOType testIOType) throws Exception {
        this.tearDown();
        this.tableType = HoodieTableType.MERGE_ON_READ;
        this.setUp();
        HoodieTestTable testTable = HoodieTestTable.of((HoodieTableMetaClient)this.metaClient);
        String f0 = testTable.addRequestedCommit("000").getFileIdWithLogFile("partA");
        testTable.forCommit("001").withLogMarkerFile("partA", f0, testIOType);
        HoodieSparkTable hoodieTable = HoodieSparkTable.create((HoodieWriteConfig)this.getConfig(), (HoodieEngineContext)this.context, (HoodieTableMetaClient)this.metaClient);
        List rollbackRequests = new MarkerBasedRollbackStrategy((HoodieTable)hoodieTable, (HoodieEngineContext)this.context, this.getConfig(), "002").getRollbackRequests(HoodieTestUtils.INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.INFLIGHT, "deltacommit", "001"));
        Assertions.assertEquals((int)1, (int)rollbackRequests.size());
        HoodieRollbackRequest rollbackRequest = (HoodieRollbackRequest)rollbackRequests.get(0);
        Assertions.assertEquals((Object)"partA", (Object)rollbackRequest.getPartitionPath());
        Assertions.assertEquals((Object)f0, (Object)rollbackRequest.getFileId());
        Assertions.assertEquals((int)(testIOType.equals((Object)IOType.CREATE) ? 1 : 0), (int)rollbackRequest.getFilesToBeDeleted().size());
        Assertions.assertEquals((int)(testIOType.equals((Object)IOType.CREATE) ? 0 : 1), (int)rollbackRequest.getLogBlocksToBeDeleted().size());
    }

    @Test
    public void testCopyOnWriteRollbackWithTestTable() throws Exception {
        HoodieTestTable testTable = HoodieTestTable.of((HoodieTableMetaClient)this.metaClient);
        String f0 = (String)testTable.addRequestedCommit("000").getFileIdsWithBaseFilesInPartitions(new String[]{"partA"}).get("partA");
        String f1 = (String)((HoodieTestTable)testTable.addCommit("001").withBaseFilesInPartition("partA", new String[]{f0}).getLeft()).getFileIdsWithBaseFilesInPartitions(new String[]{"partB"}).get("partB");
        String f2 = "f2";
        testTable.forCommit("001").withMarkerFile("partA", f0, IOType.MERGE).withMarkerFile("partB", f1, IOType.CREATE).withMarkerFile("partA", f2, IOType.CREATE);
        HoodieSparkTable hoodieTable = HoodieSparkTable.create((HoodieWriteConfig)this.getConfigBuilder().build(), (HoodieEngineContext)this.context, (HoodieTableMetaClient)this.metaClient);
        List rollbackRequests = new MarkerBasedRollbackStrategy((HoodieTable)hoodieTable, (HoodieEngineContext)this.context, this.getConfig(), "002").getRollbackRequests(HoodieTestUtils.INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.INFLIGHT, "commit", "001"));
        List stats = new RollbackHelper((HoodieTable)hoodieTable, this.getConfig()).performRollback((HoodieEngineContext)this.context, "002", HoodieTestUtils.INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.INFLIGHT, "commit", "001"), rollbackRequests);
        Assertions.assertEquals((int)2, (int)stats.size());
        FileStatus[] partAFiles = testTable.listAllFilesInPartition("partA");
        FileStatus[] partBFiles = testTable.listAllFilesInPartition("partB");
        Assertions.assertEquals((int)0, (int)partBFiles.length);
        Assertions.assertEquals((int)1, (int)partAFiles.length);
        Assertions.assertEquals((int)2, (int)stats.stream().mapToInt(r -> r.getSuccessDeleteFiles().size()).sum());
        Assertions.assertEquals((int)1, (int)stats.stream().mapToInt(r -> r.getFailedDeleteFiles().size()).sum());
    }

    @ParameterizedTest(name="[{index}] Test with listing metadata enable={0}")
    @MethodSource(value={"configParams"})
    public void testCopyOnWriteRollback(boolean useFileListingMetadata) throws Exception {
        HoodieWriteConfig writeConfig = this.getConfigBuilder().withRollbackUsingMarkers(true).withAutoCommit(false).withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(useFileListingMetadata).build()).withPath(this.basePath).build();
        HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(this.jsc);
        try (SparkRDDWriteClient writeClient = new SparkRDDWriteClient((HoodieEngineContext)engineContext, writeConfig);){
            List<HoodieRollbackStat> stats = this.testUpdateAndRollback(useFileListingMetadata, writeConfig, writeClient);
            Assertions.assertEquals((int)3, (int)stats.size());
            for (HoodieRollbackStat stat : stats) {
                Assertions.assertEquals((int)1, (int)stat.getSuccessDeleteFiles().size());
                Assertions.assertEquals((int)0, (int)stat.getFailedDeleteFiles().size());
                Assertions.assertEquals((int)0, (int)stat.getCommandBlocksCount().size());
            }
        }
    }

    @ParameterizedTest(name="[{index}] Test with listing metadata enable={0}")
    @MethodSource(value={"configParams"})
    public void testMergeOnReadRollbackDeletesFirstAppendFiles(boolean useFileListingMetadata) throws Exception {
        this.tearDown();
        this.tableType = HoodieTableType.MERGE_ON_READ;
        this.setUp();
        HoodieWriteConfig writeConfig = this.getConfigBuilder().withRollbackUsingMarkers(true).withAutoCommit(false).withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(useFileListingMetadata).build()).withPath(this.basePath).build();
        HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(this.jsc);
        try (SparkRDDWriteClient writeClient = new SparkRDDWriteClient((HoodieEngineContext)engineContext, writeConfig);){
            List<HoodieRollbackStat> stats = this.testInsertAndRollback(writeClient);
            Assertions.assertEquals((int)3, (int)stats.size());
            for (HoodieRollbackStat stat : stats) {
                Assertions.assertEquals((int)1, (int)stat.getSuccessDeleteFiles().size());
                Assertions.assertEquals((int)0, (int)stat.getFailedDeleteFiles().size());
                Assertions.assertEquals((int)0, (int)stat.getCommandBlocksCount().size());
                stat.getCommandBlocksCount().forEach((fileStatus, len) -> Assertions.assertTrue((boolean)fileStatus.getPath().getName().contains(HoodieFileFormat.HOODIE_LOG.getFileExtension())));
            }
        }
    }

    private List<HoodieRollbackStat> testInsertAndRollback(SparkRDDWriteClient writeClient) {
        String newCommitTime = "001";
        writeClient.startCommitWithTime(newCommitTime);
        List records = this.dataGen.generateInserts(newCommitTime, Integer.valueOf(100));
        JavaRDD writeStatuses = writeClient.insert(this.jsc.parallelize(records, 1), newCommitTime);
        writeClient.commit(newCommitTime, (Object)writeStatuses);
        writeStatuses.collect();
        HoodieSparkTable hoodieTable = HoodieSparkTable.create((HoodieWriteConfig)this.getConfigBuilder().build(), (HoodieEngineContext)this.context, (HoodieTableMetaClient)this.metaClient);
        List rollbackRequests = new MarkerBasedRollbackStrategy((HoodieTable)hoodieTable, (HoodieEngineContext)this.context, this.getConfigBuilder().build(), "002").getRollbackRequests(HoodieTestUtils.INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.INFLIGHT, "deltacommit", "001"));
        return new RollbackHelper((HoodieTable)hoodieTable, this.getConfig()).performRollback((HoodieEngineContext)this.context, "002", HoodieTestUtils.INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.INFLIGHT, "deltacommit", "001"), rollbackRequests);
    }

    private List<HoodieRollbackStat> testUpdateAndRollback(boolean useFileListingMetadata, HoodieWriteConfig writeConfig, SparkRDDWriteClient writeClient) {
        String newCommitTime = "001";
        writeClient.startCommitWithTime(newCommitTime);
        List records = this.dataGen.generateInserts(newCommitTime, Integer.valueOf(100));
        JavaRDD writeStatuses = writeClient.insert(this.jsc.parallelize(records, 1), newCommitTime);
        writeClient.commit(newCommitTime, (Object)writeStatuses);
        newCommitTime = "002";
        writeClient.startCommitWithTime(newCommitTime);
        records = this.dataGen.generateUniqueUpdates(newCommitTime, Integer.valueOf(50));
        writeStatuses = writeClient.upsert(this.jsc.parallelize(records, 1), newCommitTime);
        writeStatuses.collect();
        HoodieSparkTable hoodieTable = HoodieSparkTable.create((HoodieWriteConfig)this.getConfigBuilder().build(), (HoodieEngineContext)this.context, (HoodieTableMetaClient)this.metaClient);
        List rollbackRequests = new MarkerBasedRollbackStrategy((HoodieTable)hoodieTable, (HoodieEngineContext)this.context, this.getConfigBuilder().build(), "003").getRollbackRequests(HoodieTestUtils.INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.INFLIGHT, "deltacommit", "002"));
        return new RollbackHelper((HoodieTable)hoodieTable, this.getConfig()).performRollback((HoodieEngineContext)this.context, "003", HoodieTestUtils.INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.INFLIGHT, "deltacommit", "002"), rollbackRequests);
    }

    @Test
    public void testMarkerBasedRollbackFallbackToTimelineServerWhenDirectMarkerFails() throws Exception {
        HoodieTestTable testTable = HoodieTestTable.of((HoodieTableMetaClient)this.metaClient);
        String f0 = (String)testTable.addRequestedCommit("000").getFileIdsWithBaseFilesInPartitions(new String[]{"partA"}).get("partA");
        testTable.forCommit("001").withLogMarkerFile("000", "partA", f0, IOType.APPEND, 1);
        HoodieSparkTable hoodieTable = HoodieSparkTable.create((HoodieWriteConfig)this.getConfig(), (HoodieEngineContext)this.context, (HoodieTableMetaClient)this.metaClient);
        DirectWriteMarkers writeMarkers = (DirectWriteMarkers)Mockito.mock(DirectWriteMarkers.class);
        MockitoAnnotations.initMocks((Object)((Object)this));
        Mockito.when((Object)writeMarkers.allMarkerFilePaths()).thenThrow(new Throwable[]{new IOException("Markers.type file not present")});
        MarkerBasedRollbackStrategy rollbackStrategy = new MarkerBasedRollbackStrategy((HoodieTable)hoodieTable, (HoodieEngineContext)this.context, this.getConfig(), "002");
        List rollbackRequests = rollbackStrategy.getRollbackRequests(HoodieTestUtils.INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.INFLIGHT, "commit", "001"));
        Assertions.assertEquals((int)1, (int)rollbackRequests.size());
    }

    @ParameterizedTest
    @ValueSource(strings={"SIX", "EIGHT"})
    void testRollbackMultipleAppendLogFilesInOneFileGroupInMOR(HoodieTableVersion tableVersion) throws Exception {
        Properties props = new Properties();
        props.put(HoodieTableConfig.VERSION.key(), (Object)tableVersion.versionCode());
        this.initMetaClient(this.tableType, props);
        String partition = "partA";
        HoodieSparkWriteableTestTable testTable = HoodieSparkWriteableTestTable.of(this.metaClient, HoodieAvroUtils.addMetadataFields((Schema)RawTripTestPayload.JSON_DATA_SCHEMA));
        String fileId = UUID.randomUUID().toString();
        HoodieRecord tripRecord = new RawTripTestPayload("{\"_row_key\":\"key1\",\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12}").toHoodieRecord();
        String instantTime1 = "001";
        testTable.forCommit(instantTime1);
        StoragePath baseFilePath = testTable.withInserts(partition, fileId, Collections.singletonList(tripRecord));
        testTable.addDeltaCommit(instantTime1);
        Assertions.assertTrue((boolean)this.storage.exists(baseFilePath));
        String instantTime2 = "002";
        testTable.forDeltaCommit(instantTime2).withLogFile(partition, fileId, tableVersion.greaterThanOrEquals(HoodieTableVersion.EIGHT) ? instantTime2 : instantTime1, new int[]{1});
        testTable.addDeltaCommit(instantTime2);
        String instantTime3 = "003";
        int numLogFiles = 199;
        HashSet<String> logFilePathSet = new HashSet<String>();
        int[] logVersions = tableVersion.greaterThanOrEquals(HoodieTableVersion.EIGHT) ? IntStream.rangeClosed(1, numLogFiles).toArray() : IntStream.rangeClosed(2, numLogFiles + 1).toArray();
        String logFileInstantTime = tableVersion.greaterThanOrEquals(HoodieTableVersion.EIGHT) ? instantTime3 : instantTime1;
        testTable.forDeltaCommit(instantTime3).withLogFile(partition, fileId, logFileInstantTime, logVersions);
        for (int version : logVersions) {
            String logFileName = FileCreateUtils.logFileName((String)logFileInstantTime, (String)fileId, (int)version);
            StoragePath logFilePath = new StoragePath(new StoragePath(this.basePath, partition), logFileName);
            Assertions.assertTrue((boolean)this.storage.exists(logFilePath));
            logFilePathSet.add(logFilePath.toString());
            testTable.withLogMarkerFile(partition, logFileName);
        }
        testTable.addInflightDeltaCommit(instantTime3);
        this.metaClient.reloadActiveTimeline();
        HoodieWriteConfig writeConfig = this.getConfig();
        writeConfig.setValue(HoodieWriteConfig.ROLLBACK_PARALLELISM_VALUE, String.valueOf(logVersions.length));
        HoodieSparkTable hoodieTable = HoodieSparkTable.create((HoodieWriteConfig)this.getConfig(), (HoodieEngineContext)this.context, (HoodieTableMetaClient)this.metaClient);
        DirectWriteMarkers writeMarkers = (DirectWriteMarkers)Mockito.mock(DirectWriteMarkers.class);
        MockitoAnnotations.openMocks((Object)((Object)this));
        Mockito.when((Object)writeMarkers.allMarkerFilePaths()).thenThrow(new Throwable[]{new IOException("Markers.type file not present")});
        MarkerBasedRollbackStrategy rollbackStrategy = new MarkerBasedRollbackStrategy((HoodieTable)hoodieTable, (HoodieEngineContext)this.context, this.getConfig(), "004");
        HoodieInstant instantToRollback = HoodieTestUtils.INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.INFLIGHT, "deltacommit", instantTime3);
        List rollbackRequests = rollbackStrategy.getRollbackRequests(instantToRollback);
        Assertions.assertEquals((int)(tableVersion.greaterThanOrEquals(HoodieTableVersion.EIGHT) ? numLogFiles : 1), (int)rollbackRequests.size());
        HoodieRollbackPlan rollbackPlan = new HoodieRollbackPlan(new HoodieInstantInfo(instantTime3, "deltacommit"), rollbackRequests, BaseRollbackPlanActionExecutor.LATEST_ROLLBACK_PLAN_VERSION);
        EmbeddedTimelineService timelineServer = EmbeddedTimelineServerHelper.createEmbeddedTimelineService((HoodieEngineContext)this.context, (HoodieWriteConfig)writeConfig);
        writeConfig.setViewStorageConfig(timelineServer.getRemoteFileSystemViewConfig(writeConfig));
        hoodieTable = HoodieSparkTable.create((HoodieWriteConfig)writeConfig, (HoodieEngineContext)this.context, (HoodieTableMetaClient)this.metaClient);
        MergeOnReadRollbackActionExecutor rollbackActionExecutor = new MergeOnReadRollbackActionExecutor((HoodieEngineContext)this.context, writeConfig, (HoodieTable)hoodieTable, "004", instantToRollback, true, false);
        List rollbackStats = rollbackActionExecutor.doRollbackAndGetStats(rollbackPlan);
        timelineServer.stopForBasePath(this.basePath);
        Assertions.assertEquals((int)1, (int)rollbackStats.size());
        HoodieRollbackStat rollbackStat = (HoodieRollbackStat)rollbackStats.get(0);
        if (!tableVersion.greaterThanOrEquals(HoodieTableVersion.EIGHT)) {
            StoragePath rollbackLogPath = new StoragePath(new StoragePath(this.basePath, partition), FileCreateUtils.logFileName((String)instantTime1, (String)fileId, (int)(numLogFiles + 2)));
            Assertions.assertTrue((boolean)this.storage.exists(rollbackLogPath));
            Assertions.assertEquals((Object)rollbackLogPath.getPathWithoutSchemeAndAuthority(), (Object)((StoragePathInfo)((Map.Entry)rollbackStat.getCommandBlocksCount().entrySet().stream().findFirst().get()).getKey()).getPath().getPathWithoutSchemeAndAuthority());
        }
        Assertions.assertEquals((Object)partition, (Object)rollbackStat.getPartitionPath());
        Assertions.assertEquals((int)(tableVersion.greaterThanOrEquals(HoodieTableVersion.EIGHT) ? numLogFiles : 0), (int)rollbackStat.getSuccessDeleteFiles().size());
        if (tableVersion.greaterThanOrEquals(HoodieTableVersion.EIGHT)) {
            rollbackStat.getSuccessDeleteFiles().forEach(logFilePathSet::contains);
        }
        Assertions.assertEquals((int)0, (int)rollbackStat.getFailedDeleteFiles().size());
        Assertions.assertEquals((int)(tableVersion.greaterThanOrEquals(HoodieTableVersion.EIGHT) ? 0 : 1), (int)rollbackStat.getCommandBlocksCount().size());
        Assertions.assertEquals((int)(tableVersion.greaterThanOrEquals(HoodieTableVersion.EIGHT) ? 0 : numLogFiles), (int)rollbackStat.getLogFilesFromFailedCommit().size());
    }
}

