/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.client;

import java.io.IOException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.transaction.ConflictResolutionStrategy;
import org.apache.hudi.client.transaction.PreferWriterConflictResolutionStrategy;
import org.apache.hudi.client.transaction.lock.InProcessLockProvider;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.TableServiceType;
import org.apache.hudi.common.model.WriteConcurrencyMode;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.table.view.FileSystemViewStorageType;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieCleanConfig;
import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieLockConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieClusteringException;
import org.apache.hudi.exception.HoodieWriteConflictException;
import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.testutils.HoodieClientTestBase;
import org.apache.hudi.utils.HoodieWriterClientTestHarness;
import org.apache.spark.api.java.JavaRDD;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;

public class TestMultiWriterWithPreferWriterIngestion
extends HoodieClientTestBase {
    public void setUpMORTestTable() throws IOException {
        this.cleanupResources();
        this.initPath();
        this.initSparkContexts();
        this.initTestDataGenerator();
        this.initHoodieStorage();
        this.storage.createDirectory(new StoragePath(this.basePath));
        this.metaClient = HoodieTestUtils.init((StorageConfiguration)this.storageConf, (String)this.basePath, (HoodieTableType)HoodieTableType.MERGE_ON_READ, (HoodieFileFormat)HoodieFileFormat.PARQUET);
        this.initTestDataGenerator();
    }

    @AfterEach
    public void clean() throws IOException {
        this.cleanupResources();
    }

    @ParameterizedTest
    @EnumSource(value=HoodieTableType.class, names={"COPY_ON_WRITE", "MERGE_ON_READ"})
    public void testMultiWriterWithAsyncTableServicesWithConflict(HoodieTableType tableType) throws Exception {
        if (tableType == HoodieTableType.MERGE_ON_READ) {
            this.setUpMORTestTable();
        }
        Properties properties = new Properties();
        properties.setProperty("hoodie.write.lock.filesystem.path", this.basePath + "/.hoodie/.locks");
        HoodieWriteConfig cfg = this.getConfigBuilder().withCompactionConfig(HoodieCompactionConfig.newBuilder().withInlineCompaction(Boolean.valueOf(false)).withMaxNumDeltaCommitsBeforeCompaction(2).build()).withCleanConfig(HoodieCleanConfig.newBuilder().withAutoClean(Boolean.valueOf(false)).withAsyncClean(Boolean.valueOf(true)).withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).build()).withEmbeddedTimelineServerEnabled(false).withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder().withStorageType(FileSystemViewStorageType.MEMORY).build()).withClusteringConfig(HoodieClusteringConfig.newBuilder().withInlineClusteringNumCommits(1).build()).withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL).withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(InProcessLockProvider.class).withConflictResolutionStrategy((ConflictResolutionStrategy)new PreferWriterConflictResolutionStrategy()).build()).withAutoCommit(false).withProperties(properties).build();
        HashSet<String> validInstants = new HashSet<String>();
        SparkRDDWriteClient client = this.getHoodieWriteClient(cfg);
        String instantTime1 = HoodieActiveTimeline.createNewInstantTime();
        this.createCommitWithInserts(cfg, client, "000", instantTime1, 200);
        validInstants.add(instantTime1);
        String instantTime2 = HoodieActiveTimeline.createNewInstantTime();
        this.createCommitWithUpserts(cfg, client, instantTime1, "000", instantTime2, 100);
        String instantTime3 = HoodieActiveTimeline.createNewInstantTime();
        this.createCommitWithUpserts(cfg, client, instantTime2, "000", instantTime3, 100);
        validInstants.add(instantTime2);
        validInstants.add(instantTime3);
        ExecutorService executors = Executors.newFixedThreadPool(2);
        SparkRDDWriteClient client1 = this.getHoodieWriteClient(cfg);
        SparkRDDWriteClient client2 = this.getHoodieWriteClient(cfg);
        String instant4 = HoodieActiveTimeline.createNewInstantTime();
        Future<?> future1 = executors.submit(() -> {
            int numRecords = 100;
            String commitTimeBetweenPrevAndNew = instantTime2;
            try {
                this.createCommitWithUpserts(cfg, client1, instantTime3, commitTimeBetweenPrevAndNew, instant4, numRecords);
                validInstants.add(instant4);
            }
            catch (Exception e1) {
                throw new RuntimeException(e1);
            }
        });
        String instant5 = HoodieActiveTimeline.createNewInstantTime();
        Future<?> future2 = executors.submit(() -> {
            block2: {
                try {
                    client2.scheduleTableService(instant5, Option.empty(), TableServiceType.COMPACT);
                }
                catch (Exception e2) {
                    if (tableType != HoodieTableType.MERGE_ON_READ) break block2;
                    throw new RuntimeException(e2);
                }
            }
        });
        String instant6 = HoodieActiveTimeline.createNewInstantTime();
        Future<?> future3 = executors.submit(() -> {
            try {
                client2.scheduleTableService(instant6, Option.empty(), TableServiceType.CLEAN);
            }
            catch (Exception e2) {
                throw new RuntimeException(e2);
            }
        });
        future1.get();
        future2.get();
        future3.get();
        String instant7 = HoodieActiveTimeline.createNewInstantTime();
        future1 = executors.submit(() -> {
            int numRecords = 100;
            try {
                this.createCommitWithInserts(cfg, client1, instantTime3, instant7, numRecords);
                validInstants.add(instant7);
            }
            catch (Exception e1) {
                throw new RuntimeException(e1);
            }
        });
        future2 = executors.submit(() -> {
            block2: {
                try {
                    HoodieWriteMetadata compactionMetadata = client2.compact(instant5);
                    client2.commitCompaction(instant5, (HoodieCommitMetadata)compactionMetadata.getCommitMetadata().get(), Option.empty());
                    validInstants.add(instant5);
                }
                catch (Exception e2) {
                    if (tableType != HoodieTableType.MERGE_ON_READ) break block2;
                    Assertions.assertTrue((boolean)(e2 instanceof HoodieWriteConflictException));
                }
            }
        });
        future3 = executors.submit(() -> {
            try {
                client2.clean(instant6, false);
                validInstants.add(instant6);
            }
            catch (Exception e2) {
                throw new RuntimeException(e2);
            }
        });
        future1.get();
        future2.get();
        future3.get();
        Set completedInstants = this.metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().getInstantsAsStream().map(HoodieInstant::getTimestamp).collect(Collectors.toSet());
        Assertions.assertTrue((boolean)validInstants.containsAll(completedInstants));
    }

    @ParameterizedTest
    @EnumSource(value=HoodieTableType.class, names={"COPY_ON_WRITE", "MERGE_ON_READ"})
    public void testHoodieClientMultiWriterWithClustering(HoodieTableType tableType) throws Exception {
        if (tableType == HoodieTableType.MERGE_ON_READ) {
            this.setUpMORTestTable();
        }
        Properties properties = new Properties();
        properties.put("hoodie.datasource.write.row.writer.enable", String.valueOf(false));
        properties.setProperty("hoodie.write.lock.filesystem.path", this.basePath + "/.hoodie/.locks");
        HoodieWriteConfig cfg = this.getConfigBuilder().withCleanConfig(HoodieCleanConfig.newBuilder().withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).withAutoClean(Boolean.valueOf(false)).build()).withClusteringConfig(HoodieClusteringConfig.newBuilder().withInlineClusteringNumCommits(1).build()).withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL).withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(InProcessLockProvider.class).withConflictResolutionStrategy((ConflictResolutionStrategy)new PreferWriterConflictResolutionStrategy()).build()).withAutoCommit(false).withProperties(properties).build();
        String instant1 = HoodieActiveTimeline.createNewInstantTime();
        this.createCommitWithInserts(cfg, this.getHoodieWriteClient(cfg), "000", instant1, 200);
        String instant2 = HoodieActiveTimeline.createNewInstantTime();
        int numRecords = 100;
        SparkRDDWriteClient client1 = this.getHoodieWriteClient(cfg);
        JavaRDD<WriteStatus> result1 = this.updateBatch(cfg, client1, instant2, instant1, (Option<List<String>>)Option.of(Arrays.asList(instant1)), "000", numRecords, (HoodieWriterClientTestHarness.Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String>)((HoodieWriterClientTestHarness.Function3)SparkRDDWriteClient::upsert), false, false, numRecords, 200, 2);
        String instant3 = HoodieActiveTimeline.createNewInstantTime();
        SparkRDDWriteClient client2 = this.getHoodieWriteClient(cfg);
        JavaRDD<WriteStatus> result2 = this.updateBatch(cfg, client2, instant3, instant1, (Option<List<String>>)Option.of(Arrays.asList(instant1)), "000", numRecords, (HoodieWriterClientTestHarness.Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String>)((HoodieWriterClientTestHarness.Function3)SparkRDDWriteClient::upsert), false, false, numRecords, 200, 2);
        client2.commit(instant3, result2);
        SparkRDDWriteClient client3 = this.getHoodieWriteClient(cfg);
        Option clusterInstant = client3.scheduleTableService(Option.empty(), TableServiceType.CLUSTER);
        Assertions.assertThrows(HoodieClusteringException.class, () -> client3.cluster((String)clusterInstant.get(), true));
    }

    private void createCommitWithInserts(HoodieWriteConfig cfg, SparkRDDWriteClient client, String prevCommitTime, String newCommitTime, int numRecords) throws Exception {
        JavaRDD<WriteStatus> result = this.insertFirstBatch(cfg, client, newCommitTime, prevCommitTime, numRecords, (HoodieWriterClientTestHarness.Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String>)((HoodieWriterClientTestHarness.Function3)SparkRDDWriteClient::bulkInsert), false, false, numRecords);
        Assertions.assertTrue((boolean)client.commit(newCommitTime, result), (String)"Commit should succeed");
    }

    private void createCommitWithUpserts(HoodieWriteConfig cfg, SparkRDDWriteClient client, String prevCommit, String commitTimeBetweenPrevAndNew, String newCommitTime, int numRecords) throws Exception {
        JavaRDD<WriteStatus> result = this.updateBatch(cfg, client, newCommitTime, prevCommit, (Option<List<String>>)Option.of(Arrays.asList(commitTimeBetweenPrevAndNew)), "000", numRecords, (HoodieWriterClientTestHarness.Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String>)((HoodieWriterClientTestHarness.Function3)SparkRDDWriteClient::upsert), false, false, numRecords, 200, 2);
        client.commit(newCommitTime, result);
    }
}

