/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.client.functional;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.attribute.FileAttribute;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.stream.Collectors;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.config.HoodieStorageConfig;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.ConsistencyGuardConfig;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.testutils.RawTripTestPayload;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieCleanConfig;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.testutils.Assertions;
import org.apache.hudi.testutils.GenericRecordValidationTestUtils;
import org.apache.hudi.testutils.HoodieClientTestBase;
import org.apache.hudi.testutils.HoodieSparkWriteableTestTable;
import org.apache.hudi.testutils.MetadataMergeWriteStatus;
import org.apache.hudi.utils.HoodieWriterClientTestHarness;
import org.apache.spark.api.java.JavaRDD;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

public class TestDataValidationCheckForLogCompactionActions
extends HoodieClientTestBase {
    private static final String RECORD_KEY_APPEND_VALUE = "-EXP";
    private static final int PARALLELISM = 2;
    private final Random random = new Random();
    @TempDir
    Path secondTableBasePath;
    HoodieWriterClientTestHarness.Function3<List<HoodieRecord>, HoodieTestDataGenerator, String, Integer> insertsGenFunction = HoodieTestDataGenerator::generateInserts;
    HoodieWriterClientTestHarness.Function3<List<HoodieRecord>, HoodieTestDataGenerator, String, Integer> updatesGenFunction = HoodieTestDataGenerator::generateUniqueUpdates;
    HoodieWriterClientTestHarness.Function2<List<HoodieKey>, HoodieTestDataGenerator, Integer> deletesGenFunction = HoodieTestDataGenerator::generateUniqueDeletes;
    HoodieWriterClientTestHarness.Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String> insertsFunction = SparkRDDWriteClient::insert;
    HoodieWriterClientTestHarness.Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String> updatesFunction = SparkRDDWriteClient::upsert;
    HoodieWriterClientTestHarness.Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieKey>, String> deletesFunction = SparkRDDWriteClient::delete;

    @BeforeEach
    public void setUpTestTable() {
        HoodieSparkWriteableTestTable.of((HoodieTableMetaClient)this.metaClient);
    }

    @AfterEach
    public void cleanupResources() throws IOException {
        this.cleanupTimelineService();
        this.cleanupClients();
        this.cleanupSparkContexts();
        this.cleanupTestDataGenerator();
        this.cleanupFileSystem();
        this.cleanupExecutorService();
        System.gc();
    }

    @ParameterizedTest
    @ValueSource(ints={17})
    public void stressTestCompactionAndLogCompactionOperations(int seed) throws Exception {
        this.random.setSeed(seed);
        TestTableContents mainTable = this.setupTestTable1();
        TestTableContents experimentTable = this.setupTestTable2();
        int totalWrites = 15;
        LOG.warn("Starting trial with seed " + seed);
        for (int curr = 1; curr < totalWrites; ++curr) {
            LOG.warn("Starting write No. " + curr);
            boolean status = this.writeOnMainTable(mainTable, curr);
            if (!status) continue;
            this.writeOnExperimentTable(mainTable, experimentTable);
            this.scheduleLogCompactionOnExperimentTable(experimentTable);
            org.junit.jupiter.api.Assertions.assertEquals((int)0, (int)mainTable.metaClient.reloadActiveTimeline().filterPendingCompactionTimeline().countInstants());
            org.junit.jupiter.api.Assertions.assertEquals((int)0, (int)experimentTable.metaClient.reloadActiveTimeline().filterPendingCompactionTimeline().countInstants());
            org.junit.jupiter.api.Assertions.assertEquals((int)0, (int)experimentTable.metaClient.reloadActiveTimeline().filterPendingLogCompactionTimeline().countInstants());
            this.verifyRecords(mainTable, experimentTable);
            LOG.warn("For write No." + curr + ", verification passed. Last ingestion commit timestamp is " + mainTable.commitTimeOnMainTable);
        }
        mainTable.client.close();
        experimentTable.client.close();
    }

    private void verifyRecords(TestTableContents mainTable, TestTableContents experimentTable) {
        Map mainRecordsMap = GenericRecordValidationTestUtils.getRecordsMap((HoodieWriteConfig)mainTable.config, (StorageConfiguration)this.storageConf, (HoodieTestDataGenerator)this.dataGen);
        Map experimentRecordsMap = GenericRecordValidationTestUtils.getRecordsMap((HoodieWriteConfig)experimentTable.config, (StorageConfiguration)this.storageConf, (HoodieTestDataGenerator)this.dataGen);
        org.junit.jupiter.api.Assertions.assertEquals((int)mainRecordsMap.size(), (int)experimentRecordsMap.size());
        Schema readerSchema = new Schema.Parser().parse(mainTable.config.getSchema());
        List excludeFields = CollectionUtils.createImmutableList((Object[])new String[]{HoodieRecord.COMMIT_TIME_METADATA_FIELD, HoodieRecord.COMMIT_SEQNO_METADATA_FIELD, HoodieRecord.FILENAME_METADATA_FIELD, HoodieRecord.OPERATION_METADATA_FIELD, HoodieRecord.RECORD_KEY_METADATA_FIELD});
        mainRecordsMap.forEach((key, value) -> {
            org.junit.jupiter.api.Assertions.assertTrue((boolean)experimentRecordsMap.containsKey(key + RECORD_KEY_APPEND_VALUE));
            GenericRecordValidationTestUtils.assertGenericRecords((GenericRecord)value, (GenericRecord)((GenericRecord)experimentRecordsMap.get(key + RECORD_KEY_APPEND_VALUE)), (Schema)readerSchema, (List)excludeFields);
        });
    }

    private void scheduleLogCompactionOnExperimentTable(TestTableContents experimentTable) {
        Option logCompactionTimeStamp = experimentTable.logCompactionClient.scheduleLogCompaction(Option.empty());
        if (logCompactionTimeStamp.isPresent()) {
            experimentTable.logCompactionClient.logCompact((String)logCompactionTimeStamp.get());
        }
    }

    private boolean writeOnMainTable(TestTableContents mainTable, int curr) throws IOException {
        JavaRDD<WriteStatus> result;
        String commitTime = mainTable.client.createNewInstantTime();
        mainTable.client.startCommitWithTime(commitTime);
        int actionType = this.pickAWriteAction();
        if (curr == 1 || actionType == 0) {
            result = this.insertDataIntoMainTable(mainTable, commitTime);
        } else {
            try {
                result = actionType == 1 ? this.updateDataIntoMainTable(mainTable, commitTime) : this.deleteDataIntoMainTable(mainTable, commitTime);
            }
            catch (IllegalArgumentException e) {
                LOG.warn(e.getMessage() + " ignoring current command.");
                return false;
            }
        }
        this.verifyWriteStatus(result);
        return true;
    }

    private int pickAWriteAction() {
        int val = this.random.nextInt(10);
        if (val < 5) {
            return 0;
        }
        if (val < 8) {
            return 1;
        }
        return 2;
    }

    private void writeOnExperimentTable(TestTableContents mainTable, TestTableContents experimentTable) throws IOException {
        String commitTime = mainTable.commitTimeOnMainTable;
        experimentTable.client.startCommitWithTime(commitTime);
        int actionType = mainTable.previousActionType;
        JavaRDD<WriteStatus> result = actionType == 0 ? this.insertDataIntoExperimentTable(mainTable, experimentTable) : (actionType == 1 ? this.updateDataIntoExperimentTable(mainTable, experimentTable) : this.deleteDataIntoExperimentTable(mainTable, experimentTable));
        this.verifyWriteStatus(result);
    }

    private JavaRDD<WriteStatus> insertDataIntoMainTable(TestTableContents mainTable, String commitTime) throws IOException {
        int numRecords = 50 + this.random.nextInt(10);
        List records = (List)this.insertsGenFunction.apply((Object)this.dataGen, (Object)commitTime, (Object)numRecords);
        mainTable.updatePreviousGeneration(records, commitTime, 0);
        JavaRDD writeRecords = this.jsc.parallelize(records, 2);
        return (JavaRDD)this.insertsFunction.apply((Object)mainTable.client, (Object)writeRecords, (Object)commitTime);
    }

    private JavaRDD<WriteStatus> updateDataIntoMainTable(TestTableContents mainTable, String commitTime) throws IOException {
        int numRecords = 10 + this.random.nextInt(10);
        List records = (List)this.updatesGenFunction.apply((Object)this.dataGen, (Object)commitTime, (Object)numRecords);
        mainTable.updatePreviousGeneration(records, commitTime, 1);
        JavaRDD writeRecords = this.jsc.parallelize(records, 2);
        return (JavaRDD)this.updatesFunction.apply((Object)mainTable.client, (Object)writeRecords, (Object)commitTime);
    }

    private JavaRDD<WriteStatus> deleteDataIntoMainTable(TestTableContents mainTable, String commitTime) throws IOException {
        int numRecords = 5 + this.random.nextInt(10);
        List keys = (List)this.deletesGenFunction.apply((Object)this.dataGen, (Object)numRecords);
        mainTable.updatePreviousGenerationForDelete(keys, commitTime);
        JavaRDD deleteKeys = this.jsc.parallelize(keys, 2);
        return (JavaRDD)this.deletesFunction.apply((Object)mainTable.client, (Object)deleteKeys, (Object)commitTime);
    }

    private JavaRDD<WriteStatus> insertDataIntoExperimentTable(TestTableContents mainTable, TestTableContents experimentTable) throws IOException {
        JavaRDD writeRecords = this.jsc.parallelize(mainTable.generatedRecords, 2);
        return (JavaRDD)this.insertsFunction.apply((Object)experimentTable.client, (Object)writeRecords, (Object)mainTable.commitTimeOnMainTable);
    }

    private JavaRDD<WriteStatus> updateDataIntoExperimentTable(TestTableContents mainTable, TestTableContents experimentTable) throws IOException {
        JavaRDD writeRecords = this.jsc.parallelize(mainTable.generatedRecords, 2);
        return (JavaRDD)this.updatesFunction.apply((Object)experimentTable.client, (Object)writeRecords, (Object)mainTable.commitTimeOnMainTable);
    }

    private JavaRDD<WriteStatus> deleteDataIntoExperimentTable(TestTableContents mainTable, TestTableContents experimentTable) throws IOException {
        JavaRDD writeKeys = this.jsc.parallelize(mainTable.generatedKeysForDelete, 2);
        return (JavaRDD)this.deletesFunction.apply((Object)experimentTable.client, (Object)writeKeys, (Object)mainTable.commitTimeOnMainTable);
    }

    private void verifyWriteStatus(JavaRDD<WriteStatus> writeStatuses) {
        List statuses = writeStatuses.collect();
        Assertions.assertNoWriteErrors((List)statuses);
    }

    private TestTableContents setupTestTable1() {
        Properties properties = new Properties();
        properties.setProperty("hoodie.parquet.small.file.limit", "0");
        HoodieWriteConfig config = this.getConfigBuilder("{\"type\": \"record\",\"name\": \"triprec\",\"fields\": [ {\"name\": \"timestamp\",\"type\": \"long\"},{\"name\": \"_row_key\", \"type\": \"string\"},{\"name\": \"partition_path\", \"type\": [\"null\", \"string\"], \"default\": null },{\"name\": \"trip_type\", \"type\": {\"type\": \"enum\", \"name\": \"TripType\", \"symbols\": [\"UNKNOWN\", \"UBERX\", \"BLACK\"], \"default\": \"UNKNOWN\"}},{\"name\": \"rider\", \"type\": \"string\"},{\"name\": \"driver\", \"type\": \"string\"},{\"name\": \"begin_lat\", \"type\": \"double\"},{\"name\": \"begin_lon\", \"type\": \"double\"},{\"name\": \"end_lat\", \"type\": \"double\"},{\"name\": \"end_lon\", \"type\": \"double\"},{\"name\": \"distance_in_meters\", \"type\": \"int\"},{\"name\": \"seconds_since_epoch\", \"type\": \"long\"},{\"name\": \"weight\", \"type\": \"float\"},{\"name\": \"nation\", \"type\": \"bytes\"},{\"name\":\"current_date\",\"type\": {\"type\": \"int\", \"logicalType\": \"date\"}},{\"name\":\"current_ts\",\"type\": {\"type\": \"long\"}},{\"name\":\"height\",\"type\":{\"type\":\"fixed\",\"name\":\"abc\",\"size\":5,\"logicalType\":\"decimal\",\"precision\":10,\"scale\":6}},{\"name\": \"city_to_state\", \"type\": {\"type\": \"map\", \"values\": \"string\"}},{\"name\": \"fare\",\"type\": {\"type\":\"record\", \"name\":\"fare\",\"fields\": [{\"name\": \"amount\",\"type\": \"double\"},{\"name\": \"currency\", \"type\": \"string\"}]}},{\"name\": \"tip_history\", \"default\": [], \"type\": {\"type\": \"array\", \"default\": [], \"items\": {\"type\": \"record\", \"default\": null, \"name\": \"tip_history\", \"fields\": [{\"name\": \"amount\", \"type\": \"double\"}, {\"name\": \"currency\", \"type\": \"string\"}]}}},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false} ]}", HoodieIndex.IndexType.INMEMORY).withCompactionConfig(HoodieCompactionConfig.newBuilder().withInlineCompaction(Boolean.valueOf(true)).build()).withAutoCommit(true).withProperties(properties).build();
        SparkRDDWriteClient client = new SparkRDDWriteClient((HoodieEngineContext)this.context, config);
        return new TestTableContents(this.basePath, this.tableName, this.metaClient, config, client);
    }

    private TestTableContents setupTestTable2() throws IOException {
        String tableName2 = "test-trip-table2";
        String basePath2 = this.createBasePathForSecondTable(this.secondTableBasePath);
        Properties properties = new Properties();
        properties.put(HoodieTableConfig.NAME.key(), tableName2);
        HoodieTableMetaClient metaClient2 = HoodieTestUtils.init((StorageConfiguration)this.storageConf, (String)basePath2, (HoodieTableType)HoodieTableType.MERGE_ON_READ, (Properties)properties);
        HoodieWriteConfig config2 = this.getConfigBuilderForSecondTable(tableName2, basePath2, "{\"type\": \"record\",\"name\": \"triprec\",\"fields\": [ {\"name\": \"timestamp\",\"type\": \"long\"},{\"name\": \"_row_key\", \"type\": \"string\"},{\"name\": \"partition_path\", \"type\": [\"null\", \"string\"], \"default\": null },{\"name\": \"trip_type\", \"type\": {\"type\": \"enum\", \"name\": \"TripType\", \"symbols\": [\"UNKNOWN\", \"UBERX\", \"BLACK\"], \"default\": \"UNKNOWN\"}},{\"name\": \"rider\", \"type\": \"string\"},{\"name\": \"driver\", \"type\": \"string\"},{\"name\": \"begin_lat\", \"type\": \"double\"},{\"name\": \"begin_lon\", \"type\": \"double\"},{\"name\": \"end_lat\", \"type\": \"double\"},{\"name\": \"end_lon\", \"type\": \"double\"},{\"name\": \"distance_in_meters\", \"type\": \"int\"},{\"name\": \"seconds_since_epoch\", \"type\": \"long\"},{\"name\": \"weight\", \"type\": \"float\"},{\"name\": \"nation\", \"type\": \"bytes\"},{\"name\":\"current_date\",\"type\": {\"type\": \"int\", \"logicalType\": \"date\"}},{\"name\":\"current_ts\",\"type\": {\"type\": \"long\"}},{\"name\":\"height\",\"type\":{\"type\":\"fixed\",\"name\":\"abc\",\"size\":5,\"logicalType\":\"decimal\",\"precision\":10,\"scale\":6}},{\"name\": \"city_to_state\", \"type\": {\"type\": \"map\", \"values\": \"string\"}},{\"name\": \"fare\",\"type\": {\"type\":\"record\", \"name\":\"fare\",\"fields\": [{\"name\": \"amount\",\"type\": \"double\"},{\"name\": \"currency\", \"type\": \"string\"}]}},{\"name\": \"tip_history\", \"default\": [], \"type\": {\"type\": \"array\", \"default\": [], \"items\": {\"type\": \"record\", \"default\": null, \"name\": \"tip_history\", \"fields\": [{\"name\": \"amount\", \"type\": \"double\"}, {\"name\": \"currency\", \"type\": \"string\"}]}}},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false} ]}", HoodieIndex.IndexType.INMEMORY).withCompactionConfig(HoodieCompactionConfig.newBuilder().withInlineCompaction(Boolean.valueOf(true)).build()).withAutoCommit(true).build();
        SparkRDDWriteClient client2 = new SparkRDDWriteClient((HoodieEngineContext)this.context, config2);
        HoodieWriteConfig logCompactionConfig = HoodieWriteConfig.newBuilder().withProps((Map)config2.getProps()).withCompactionConfig(HoodieCompactionConfig.newBuilder().withLogCompactionBlocksThreshold(2).build()).build();
        SparkRDDWriteClient logCompactionClient = new SparkRDDWriteClient((HoodieEngineContext)this.context, logCompactionConfig);
        return new TestTableContents(basePath2, tableName2, metaClient2, config2, client2, logCompactionClient);
    }

    private String createBasePathForSecondTable(Path secondTableBasePath) throws IOException {
        Path basePath = secondTableBasePath.resolve("dataset2");
        Files.createDirectories(basePath, new FileAttribute[0]);
        return basePath.toString();
    }

    private HoodieWriteConfig.Builder getConfigBuilderForSecondTable(String tableName, String basePath, String schemaStr, HoodieIndex.IndexType indexType) {
        Properties properties = new Properties();
        properties.setProperty("hoodie.parquet.small.file.limit", "0");
        return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(schemaStr).withParallelism(2, 2).withBulkInsertParallelism(2).withFinalizeWriteParallelism(2).withDeleteParallelism(2).withTimelineLayoutVersion(TimelineLayoutVersion.CURR_VERSION.intValue()).withWriteStatusClass(MetadataMergeWriteStatus.class).withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build()).withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(0x100000L).build()).withStorageConfig(HoodieStorageConfig.newBuilder().hfileMaxFileSize(0x100000L).parquetMaxFileSize(0x100000L).orcMaxFileSize(0x100000L).build()).withCleanConfig(HoodieCleanConfig.newBuilder().withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.EAGER).build()).forTable(tableName).withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(indexType).build()).withEmbeddedTimelineServerEnabled(true).withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder().withEnableBackupForRemoteFileSystemView(false).withRemoteServerPort(Integer.valueOf(timelineServicePort)).build()).withProperties(properties);
    }

    protected HoodieTableType getTableType() {
        return HoodieTableType.MERGE_ON_READ;
    }

    private class TestTableContents {
        final String basePath;
        final String tableName;
        final HoodieTableMetaClient metaClient;
        final HoodieWriteConfig config;
        final SparkRDDWriteClient client;
        String commitTimeOnMainTable = "";
        List<HoodieRecord> generatedRecords = new ArrayList<HoodieRecord>();
        List<HoodieKey> generatedKeysForDelete = new ArrayList<HoodieKey>();
        int previousActionType = 0;
        final SparkRDDWriteClient logCompactionClient;

        public TestTableContents(String basePath, String tableName, HoodieTableMetaClient metaClient, HoodieWriteConfig config, SparkRDDWriteClient client) {
            this(basePath, tableName, metaClient, config, client, null);
        }

        public TestTableContents(String basePath, String tableName, HoodieTableMetaClient metaClient, HoodieWriteConfig config, SparkRDDWriteClient client, SparkRDDWriteClient logCompactionClient) {
            this.basePath = basePath;
            this.tableName = tableName;
            this.metaClient = metaClient;
            this.config = config;
            this.client = client;
            this.logCompactionClient = logCompactionClient;
        }

        private void updatePreviousGeneration(List<HoodieRecord> generatedRecords, String commitTimeOnMainTable, int previousActionType) {
            Schema schema = new Schema.Parser().parse(this.config.getSchema());
            this.generatedRecords = generatedRecords.stream().map(rec -> this.deepCopyAndModifyRecordKey((HoodieRecord)rec)).collect(Collectors.toList());
            this.commitTimeOnMainTable = commitTimeOnMainTable;
            this.previousActionType = previousActionType;
        }

        private HoodieRecord deepCopyAndModifyRecordKey(HoodieRecord record) {
            HoodieKey key = this.deepCopyAndModifyRecordKey(record.getKey());
            RawTripTestPayload payload = ((RawTripTestPayload)record.getData()).clone();
            return new HoodieAvroRecord(key, (HoodieRecordPayload)payload);
        }

        private HoodieKey deepCopyAndModifyRecordKey(HoodieKey key) {
            return new HoodieKey(key.getRecordKey() + TestDataValidationCheckForLogCompactionActions.RECORD_KEY_APPEND_VALUE, key.getPartitionPath());
        }

        private void updatePreviousGenerationForDelete(List<HoodieKey> generatedKeysForDelete, String commitTimeOnMainTable) {
            this.generatedKeysForDelete = generatedKeysForDelete.stream().map(this::deepCopyAndModifyRecordKey).collect(Collectors.toList());
            this.commitTimeOnMainTable = commitTimeOnMainTable;
            this.previousActionType = 2;
        }
    }
}

