/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.table.functional;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.client.functional.TestHoodieBackedMetadata;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieDeltaWriteStat;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.IOType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.marker.MarkerType;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.marker.WriteMarkers;
import org.apache.hudi.table.marker.WriteMarkersFactory;
import org.apache.hudi.testutils.Assertions;
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
import org.apache.spark.api.java.JavaRDD;

public class TestHoodieSparkRollback
extends SparkClientFunctionalTestHarness {
    private String basePath;

    private void initBasePath() {
        this.basePath = this.basePath().substring(7);
    }

    private SparkRDDWriteClient getHoodieWriteClient(Boolean autoCommitEnabled) throws IOException {
        return this.getHoodieWriteClient(this.getConfigToTestMDTRollbacks(autoCommitEnabled));
    }

    protected List<HoodieRecord> insertRecords(SparkRDDWriteClient client, HoodieTestDataGenerator dataGen, String commitTime) {
        client.startCommitWithTime(commitTime);
        List records = dataGen.generateInserts(commitTime, Integer.valueOf(20));
        JavaRDD writeRecords = this.jsc().parallelize(records, 1);
        List statuses = client.upsert(writeRecords, commitTime).collect();
        Assertions.assertNoWriteErrors((List)statuses);
        return records;
    }

    protected List<WriteStatus> updateRecords(SparkRDDWriteClient client, HoodieTestDataGenerator dataGen, String commitTime, List<HoodieRecord> records) throws IOException {
        client.startCommitWithTime(commitTime);
        records = dataGen.generateUpdates(commitTime, records);
        JavaRDD writeRecords = this.jsc().parallelize(records, 1);
        List statuses = client.upsert(writeRecords, commitTime).collect();
        Assertions.assertNoWriteErrors((List)statuses);
        return statuses;
    }

    protected HoodieWriteConfig getConfigToTestMDTRollbacks(Boolean autoCommit) {
        return this.getConfigToTestMDTRollbacks(autoCommit, true);
    }

    protected HoodieWriteConfig getConfigToTestMDTRollbacks(Boolean autoCommit, Boolean mdtEnable) {
        return HoodieWriteConfig.newBuilder().withPath(this.basePath).withProperties(this.getPropertiesForKeyGen(true)).withSchema("{\"type\": \"record\",\"name\": \"triprec\",\"fields\": [ {\"name\": \"timestamp\",\"type\": \"long\"},{\"name\": \"_row_key\", \"type\": \"string\"},{\"name\": \"partition_path\", \"type\": [\"null\", \"string\"], \"default\": null },{\"name\": \"trip_type\", \"type\": {\"type\": \"enum\", \"name\": \"TripType\", \"symbols\": [\"UNKNOWN\", \"UBERX\", \"BLACK\"], \"default\": \"UNKNOWN\"}},{\"name\": \"rider\", \"type\": \"string\"},{\"name\": \"driver\", \"type\": \"string\"},{\"name\": \"begin_lat\", \"type\": \"double\"},{\"name\": \"begin_lon\", \"type\": \"double\"},{\"name\": \"end_lat\", \"type\": \"double\"},{\"name\": \"end_lon\", \"type\": \"double\"},{\"name\": \"distance_in_meters\", \"type\": \"int\"},{\"name\": \"seconds_since_epoch\", \"type\": \"long\"},{\"name\": \"weight\", \"type\": \"float\"},{\"name\": \"nation\", \"type\": \"bytes\"},{\"name\":\"current_date\",\"type\": {\"type\": \"int\", \"logicalType\": \"date\"}},{\"name\":\"current_ts\",\"type\": {\"type\": \"long\"}},{\"name\":\"height\",\"type\":{\"type\":\"fixed\",\"name\":\"abc\",\"size\":5,\"logicalType\":\"decimal\",\"precision\":10,\"scale\":6}},{\"name\": \"city_to_state\", \"type\": {\"type\": \"map\", \"values\": \"string\"}},{\"name\": \"fare\",\"type\": {\"type\":\"record\", \"name\":\"fare\",\"fields\": [{\"name\": \"amount\",\"type\": \"double\"},{\"name\": \"currency\", \"type\": \"string\"}]}},{\"name\": \"tip_history\", \"default\": [], \"type\": {\"type\": \"array\", \"default\": [], \"items\": {\"type\": \"record\", \"default\": null, \"name\": \"tip_history\", \"fields\": [{\"name\": \"amount\", \"type\": \"double\"}, {\"name\": \"currency\", \"type\": \"string\"}]}}},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false} ]}").withParallelism(2, 2).withDeleteParallelism(2).withAutoCommit(autoCommit.booleanValue()).withEmbeddedTimelineServerEnabled(false).forTable("test-trip-table").withRollbackUsingMarkers(true).withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(mdtEnable.booleanValue()).build()).build();
    }

    protected void testRollbackWithFailurePreMDT(HoodieTableType tableType) throws IOException {
        this.initBasePath();
        HoodieTableMetaClient metaClient = this.getHoodieMetaClient(tableType);
        SparkRDDWriteClient client = this.getHoodieWriteClient(true);
        HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator();
        List<HoodieRecord> records = this.insertRecords(client, dataGen, "001");
        client = this.getHoodieWriteClient(false);
        this.updateRecords(client, dataGen, "002", records);
        client = this.getHoodieWriteClient(true);
        this.updateRecords(client, dataGen, "003", records);
        metaClient = HoodieTableMetaClient.reload((HoodieTableMetaClient)metaClient);
        TestHoodieBackedMetadata.validateMetadata(this.getConfigToTestMDTRollbacks(true), (Option<String>)Option.empty(), this.fs(), this.basePath, metaClient, (Configuration)this.storageConf().unwrap(), new HoodieSparkEngineContext(this.jsc()), TestHoodieBackedMetadata.metadata(client, this.hoodieStorage()));
    }

    protected void testRollbackWithFailurePostMDT(HoodieTableType tableType) throws IOException {
        this.testRollbackWithFailurePostMDT(tableType, false);
    }

    protected void testRollbackWithFailurePostMDT(HoodieTableType tableType, Boolean failRollback) throws IOException {
        this.initBasePath();
        HoodieTableMetaClient metaClient = this.getHoodieMetaClient(tableType);
        HoodieWriteConfig cfg = this.getConfigToTestMDTRollbacks(true);
        SparkRDDWriteClient client = this.getHoodieWriteClient(cfg);
        HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator();
        List<HoodieRecord> records = this.insertRecords(client, dataGen, "001");
        List<WriteStatus> statuses = this.updateRecords(client, dataGen, "002", records);
        metaClient = HoodieTableMetaClient.reload((HoodieTableMetaClient)metaClient);
        String filename = ((HoodieInstant)metaClient.getActiveTimeline().lastInstant().get()).getFileName();
        File commit = new File(metaClient.getBasePathV2().toString().substring(5) + "/.hoodie/" + filename);
        org.junit.jupiter.api.Assertions.assertTrue((boolean)commit.delete());
        metaClient.reloadActiveTimeline();
        statuses.forEach(s -> {
            try {
                this.recreateMarkerFile(cfg, "002", (WriteStatus)s);
            }
            catch (IOException | InterruptedException e) {
                throw new RuntimeException(e);
            }
        });
        if (failRollback.booleanValue()) {
            this.copyOut(tableType, "002");
            client = this.getHoodieWriteClient(this.getConfigToTestMDTRollbacks(true, false));
            org.junit.jupiter.api.Assertions.assertTrue((boolean)client.rollback("002", "003"));
            metaClient = HoodieTableMetaClient.reload((HoodieTableMetaClient)metaClient);
            HoodieInstant lastInstant = (HoodieInstant)metaClient.getActiveTimeline().lastInstant().get();
            org.junit.jupiter.api.Assertions.assertEquals((Object)"rollback", (Object)lastInstant.getAction());
            HoodieRollbackMetadata rollbackMetadata = TimelineMetadataUtils.deserializeHoodieRollbackMetadata((byte[])((byte[])metaClient.getActiveTimeline().getInstantDetails(lastInstant).get()));
            this.copyIn(tableType, "002");
            rollbackMetadata.getPartitionMetadata().forEach((partition, metadata) -> metadata.getRollbackLogFiles().forEach((n, k) -> this.recreateMarkerFile(cfg, "003", (String)partition, (String)n)));
            rollbackMetadata.getPartitionMetadata().forEach((partition, metadata) -> metadata.getLogFilesFromFailedCommit().forEach((n, k) -> this.recreateMarkerFile(cfg, "002", (String)partition, (String)n)));
            commit = new File(metaClient.getBasePathV2().toString().substring(5) + "/.hoodie/" + lastInstant.getFileName());
            org.junit.jupiter.api.Assertions.assertTrue((boolean)commit.delete());
            metaClient.reloadActiveTimeline();
        }
        client = this.getHoodieWriteClient(this.getConfigToTestMDTRollbacks(true, true));
        this.updateRecords(client, dataGen, "004", records);
        metaClient = HoodieTableMetaClient.reload((HoodieTableMetaClient)metaClient);
        TestHoodieBackedMetadata.validateMetadata(cfg, (Option<String>)Option.empty(), this.fs(), this.basePath, metaClient, (Configuration)this.storageConf().unwrap(), new HoodieSparkEngineContext(this.jsc()), TestHoodieBackedMetadata.metadata(client, this.hoodieStorage()));
    }

    private void copyOut(HoodieTableType tableType, String commitTime) throws IOException {
        File tmpDir = new File(this.basePath, ".tmpdir");
        org.junit.jupiter.api.Assertions.assertTrue((boolean)tmpDir.mkdir());
        String commitAction = tableType.equals((Object)HoodieTableType.COPY_ON_WRITE) ? ".commit" : ".deltacommit";
        String metaDir = this.basePath + ".hoodie/";
        String inflight = commitTime + (tableType.equals((Object)HoodieTableType.COPY_ON_WRITE) ? "" : commitAction) + ".inflight";
        Files.copy(new File(metaDir + inflight).toPath(), tmpDir.toPath().resolve(inflight), StandardCopyOption.REPLACE_EXISTING);
        String requested = commitTime + commitAction + ".requested";
        Files.copy(new File(metaDir + requested).toPath(), tmpDir.toPath().resolve(requested), StandardCopyOption.REPLACE_EXISTING);
    }

    private void copyIn(HoodieTableType tableType, String commitTime) throws IOException {
        Path tmpDir = new File(this.basePath, ".tmpdir").toPath();
        String commitAction = tableType.equals((Object)HoodieTableType.COPY_ON_WRITE) ? ".commit" : ".deltacommit";
        String metaDir = this.basePath + ".hoodie/";
        String inflight = commitTime + (tableType.equals((Object)HoodieTableType.COPY_ON_WRITE) ? "" : commitAction) + ".inflight";
        Files.copy(tmpDir.resolve(inflight), new File(metaDir + inflight).toPath(), StandardCopyOption.REPLACE_EXISTING);
        String requested = commitTime + commitAction + ".requested";
        Files.copy(tmpDir.resolve(requested), new File(metaDir + requested).toPath(), StandardCopyOption.REPLACE_EXISTING);
    }

    protected void testRollbackWithFailureinMDT(HoodieTableType tableType) throws Exception {
        this.initBasePath();
        HoodieWriteConfig cfg = this.getConfigToTestMDTRollbacks(true);
        HoodieTableMetaClient metaClient = this.getHoodieMetaClient(tableType);
        SparkRDDWriteClient client = this.getHoodieWriteClient(cfg);
        HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator();
        List<HoodieRecord> records = this.insertRecords(client, dataGen, "001");
        List<WriteStatus> statuses = this.updateRecords(client, dataGen, "002", records);
        metaClient = HoodieTableMetaClient.reload((HoodieTableMetaClient)metaClient);
        String filename = ((HoodieInstant)metaClient.getActiveTimeline().lastInstant().get()).getFileName();
        File deltacommit = new File(metaClient.getBasePathV2().toString().substring(5) + "/.hoodie/" + filename);
        org.junit.jupiter.api.Assertions.assertTrue((boolean)deltacommit.delete());
        metaClient.reloadActiveTimeline();
        statuses.forEach(s -> {
            try {
                this.recreateMarkerFile(cfg, "002", (WriteStatus)s);
            }
            catch (IOException | InterruptedException e) {
                throw new RuntimeException(e);
            }
        });
        String metadataBasePath = this.basePath + "/.hoodie/metadata";
        HoodieTableMetaClient metadataMetaClient = HoodieTableMetaClient.builder().setConf(this.storageConf()).setBasePath(metadataBasePath).build();
        HoodieInstant latestCommitInstant = (HoodieInstant)metadataMetaClient.getActiveTimeline().lastInstant().get();
        File metadatadeltacommit = new File(metadataBasePath + "/.hoodie/" + latestCommitInstant.getFileName());
        org.junit.jupiter.api.Assertions.assertTrue((boolean)metadatadeltacommit.delete());
        this.updateRecords(client, dataGen, "003", records);
        metaClient = HoodieTableMetaClient.reload((HoodieTableMetaClient)metaClient);
        TestHoodieBackedMetadata.validateMetadata(cfg, (Option<String>)Option.empty(), this.fs(), this.basePath, metaClient, (Configuration)this.storageConf().unwrap(), new HoodieSparkEngineContext(this.jsc()), TestHoodieBackedMetadata.metadata(client, this.hoodieStorage()));
    }

    protected void recreateMarkerFile(HoodieWriteConfig cfg, String commitTime, WriteStatus writeStatus) throws IOException, InterruptedException {
        HoodieWriteStat writeStat = writeStatus.getStat();
        WriteMarkers writeMarkers = WriteMarkersFactory.get((MarkerType)cfg.getMarkersType(), (HoodieTable)HoodieSparkTable.create((HoodieWriteConfig)cfg, (HoodieEngineContext)this.context()), (String)commitTime);
        if (writeStat instanceof HoodieDeltaWriteStat) {
            ((HoodieDeltaWriteStat)writeStat).getLogFiles().forEach(lf -> writeMarkers.create(writeStat.getPartitionPath(), lf, IOType.APPEND));
        } else {
            writeMarkers.create(writeStat.getPartitionPath(), writeStat.getPath().replace(writeStat.getPartitionPath() + "/", ""), IOType.MERGE);
        }
    }

    protected void recreateMarkerFile(HoodieWriteConfig cfg, String commitTime, String partitionPath, String path) {
        WriteMarkers writeMarkers = WriteMarkersFactory.get((MarkerType)cfg.getMarkersType(), (HoodieTable)HoodieSparkTable.create((HoodieWriteConfig)cfg, (HoodieEngineContext)this.context()), (String)commitTime);
        writeMarkers.create(partitionPath, new File(path).getName(), IOType.APPEND);
    }
}

