/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.client;

import java.io.IOException;
import java.net.URI;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.embedded.EmbeddedTimelineService;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.data.HoodieJavaRDD;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.testutils.Assertions;
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
import org.apache.spark.api.java.JavaRDD;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.MethodSource;

class TestSparkRDDWriteClient
extends SparkClientFunctionalTestHarness {
    TestSparkRDDWriteClient() {
    }

    static Stream<Arguments> testWriteClientReleaseResourcesShouldOnlyUnpersistRelevantRdds() {
        return Stream.of(Arguments.of((Object[])new Object[]{HoodieTableType.COPY_ON_WRITE, true, true}), Arguments.of((Object[])new Object[]{HoodieTableType.COPY_ON_WRITE, true, false}), Arguments.of((Object[])new Object[]{HoodieTableType.MERGE_ON_READ, true, true}), Arguments.of((Object[])new Object[]{HoodieTableType.MERGE_ON_READ, true, false}), Arguments.of((Object[])new Object[]{HoodieTableType.COPY_ON_WRITE, false, true}), Arguments.of((Object[])new Object[]{HoodieTableType.COPY_ON_WRITE, false, false}), Arguments.of((Object[])new Object[]{HoodieTableType.MERGE_ON_READ, false, true}), Arguments.of((Object[])new Object[]{HoodieTableType.MERGE_ON_READ, false, false}));
    }

    @ParameterizedTest
    @CsvSource(value={"true,true", "true,false", "false,true", "false,false"})
    public void testWriteClientAndTableServiceClientWithTimelineServer(boolean enableEmbeddedTimelineServer, boolean passInTimelineServer) throws IOException {
        SparkRDDWriteClient writeClient;
        HoodieTableMetaClient metaClient = this.getHoodieMetaClient(this.storageConf(), URI.create(this.basePath()).getPath(), new Properties());
        HoodieWriteConfig writeConfig = this.getConfigBuilder(true).withPath(metaClient.getBasePath()).withEmbeddedTimelineServerEnabled(enableEmbeddedTimelineServer).withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder().withRemoteServerPort(Integer.valueOf(this.incrementTimelineServicePortToUse())).build()).build();
        if (passInTimelineServer) {
            EmbeddedTimelineService timelineService = EmbeddedTimelineService.getOrStartEmbeddedTimelineService((HoodieEngineContext)this.context(), null, (HoodieWriteConfig)writeConfig);
            writeConfig.setViewStorageConfig(timelineService.getRemoteFileSystemViewConfig(writeConfig));
            writeClient = new SparkRDDWriteClient((HoodieEngineContext)this.context(), writeConfig, Option.of((Object)timelineService));
            org.junit.jupiter.api.Assertions.assertEquals((Object)timelineService, (Object)writeClient.getTimelineServer().get());
            org.junit.jupiter.api.Assertions.assertEquals((Object)timelineService, (Object)writeClient.getTableServiceClient().getTimelineServer().get());
            org.junit.jupiter.api.Assertions.assertEquals((Object)writeConfig, (Object)writeClient.getConfig());
            timelineService.stopForBasePath(writeConfig.getBasePath());
        } else {
            writeClient = new SparkRDDWriteClient((HoodieEngineContext)this.context(), writeConfig);
            org.junit.jupiter.api.Assertions.assertEquals((Object)writeClient.getTimelineServer(), (Object)writeClient.getTableServiceClient().getTimelineServer());
            if (!enableEmbeddedTimelineServer) {
                org.junit.jupiter.api.Assertions.assertFalse((boolean)writeClient.getTimelineServer().isPresent());
            }
        }
        writeClient.close();
    }

    @ParameterizedTest
    @MethodSource
    void testWriteClientReleaseResourcesShouldOnlyUnpersistRelevantRdds(HoodieTableType tableType, boolean shouldReleaseResource, boolean metadataTableEnable) throws IOException {
        HoodieTableMetaClient metaClient = this.getHoodieMetaClient(this.storageConf(), URI.create(this.basePath()).getPath(), tableType, new Properties());
        HoodieWriteConfig writeConfig = this.getConfigBuilder(true).withPath(metaClient.getBasePath()).withAutoCommit(false).withReleaseResourceEnabled(shouldReleaseResource).withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(metadataTableEnable).build()).build();
        HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(57069L);
        String instant0 = HoodieTestDataGenerator.getCommitTimeAtUTC((long)0L);
        List extraRecords0 = dataGen.generateGenericRecords(10);
        HoodieJavaRDD persistedRdd0 = HoodieJavaRDD.of((JavaRDD)this.jsc().parallelize(extraRecords0, 2));
        persistedRdd0.persist("MEMORY_AND_DISK", (HoodieEngineContext)this.context(), HoodieData.HoodieDataCacheKey.of((String)writeConfig.getBasePath(), (String)instant0));
        String instant1 = HoodieTestDataGenerator.getCommitTimeAtUTC((long)1L);
        List extraRecords1 = dataGen.generateGenericRecords(10);
        HoodieJavaRDD persistedRdd1 = HoodieJavaRDD.of((JavaRDD)this.jsc().parallelize(extraRecords1, 2));
        persistedRdd1.persist("MEMORY_AND_DISK", (HoodieEngineContext)this.context(), HoodieData.HoodieDataCacheKey.of((String)writeConfig.getBasePath(), (String)instant1));
        SparkRDDWriteClient writeClient = this.getHoodieWriteClient(writeConfig);
        List records = dataGen.generateInserts(instant1, Integer.valueOf(10));
        JavaRDD writeRecords = this.jsc().parallelize(records, 2);
        writeClient.startCommitWithTime(instant1);
        List writeStatuses = writeClient.insert(writeRecords, instant1).collect();
        Assertions.assertNoWriteErrors((List)writeStatuses);
        String metadataTableBasePath = HoodieTableMetadata.getMetadataTableBasePath((String)writeConfig.getBasePath());
        List metadataTableCacheIds0 = this.context().getCachedDataIds(HoodieData.HoodieDataCacheKey.of((String)metadataTableBasePath, (String)instant0));
        List metadataTableCacheIds1 = this.context().getCachedDataIds(HoodieData.HoodieDataCacheKey.of((String)metadataTableBasePath, (String)instant1));
        writeClient.commitStats(instant1, writeStatuses.stream().map(WriteStatus::getStat).collect(Collectors.toList()), Option.empty(), metaClient.getCommitActionType());
        writeClient.close();
        if (shouldReleaseResource) {
            org.junit.jupiter.api.Assertions.assertEquals(Collections.singletonList(persistedRdd0.getId()), (Object)this.context().getCachedDataIds(HoodieData.HoodieDataCacheKey.of((String)writeConfig.getBasePath(), (String)instant0)), (String)("RDDs cached for " + instant0 + " should be retained."));
            org.junit.jupiter.api.Assertions.assertEquals(Collections.emptyList(), (Object)this.context().getCachedDataIds(HoodieData.HoodieDataCacheKey.of((String)writeConfig.getBasePath(), (String)instant1)), (String)("RDDs cached for " + instant1 + " should be cleared."));
            org.junit.jupiter.api.Assertions.assertTrue((boolean)this.jsc().getPersistentRDDs().containsKey(persistedRdd0.getId()), (String)("RDDs cached for " + instant0 + " should be retained."));
            org.junit.jupiter.api.Assertions.assertFalse((boolean)this.jsc().getPersistentRDDs().containsKey(persistedRdd1.getId()), (String)("RDDs cached for " + instant1 + " should be cleared."));
            org.junit.jupiter.api.Assertions.assertFalse((boolean)this.jsc().getPersistentRDDs().containsKey(writeRecords.id()), (String)("RDDs cached for " + instant1 + " should be cleared."));
            if (metadataTableEnable) {
                org.junit.jupiter.api.Assertions.assertEquals(metadataTableCacheIds0.stream().sorted().collect(Collectors.toList()), this.context().getCachedDataIds(HoodieData.HoodieDataCacheKey.of((String)metadataTableBasePath, (String)instant0)).stream().sorted().collect(Collectors.toList()), (String)("RDDs cached for metadataTable " + instant0 + " should be retained."));
                org.junit.jupiter.api.Assertions.assertEquals(Collections.emptyList(), (Object)this.context().getCachedDataIds(HoodieData.HoodieDataCacheKey.of((String)metadataTableBasePath, (String)instant1)), (String)("RDDs cached for metadataTable " + instant1 + " should be cleared."));
                metadataTableCacheIds0.forEach(cacheId -> org.junit.jupiter.api.Assertions.assertTrue((boolean)this.jsc().getPersistentRDDs().containsKey(cacheId), (String)("RDDs cached for metadataTable cacheId " + cacheId + " should be retained.")));
                metadataTableCacheIds1.forEach(cacheId -> org.junit.jupiter.api.Assertions.assertFalse((boolean)this.jsc().getPersistentRDDs().containsKey(cacheId), (String)("RDDs cached for metadataTable cacheId " + cacheId + " should be cleared.")));
            }
        } else {
            org.junit.jupiter.api.Assertions.assertEquals(Collections.singletonList(persistedRdd0.getId()), (Object)this.context().getCachedDataIds(HoodieData.HoodieDataCacheKey.of((String)writeConfig.getBasePath(), (String)instant0)), (String)("RDDs cached for " + instant0 + " should be retained."));
            org.junit.jupiter.api.Assertions.assertEquals((int)3, (int)this.context().getCachedDataIds(HoodieData.HoodieDataCacheKey.of((String)writeConfig.getBasePath(), (String)instant1)).size(), (String)("RDDs cached for " + instant1 + " should be retained."));
            org.junit.jupiter.api.Assertions.assertTrue((boolean)this.jsc().getPersistentRDDs().containsKey(persistedRdd0.getId()), (String)("RDDs cached for " + instant0 + " should be retained."));
            org.junit.jupiter.api.Assertions.assertTrue((boolean)this.jsc().getPersistentRDDs().containsKey(persistedRdd1.getId()), (String)("RDDs cached for " + instant1 + " should be retained."));
            org.junit.jupiter.api.Assertions.assertTrue((boolean)this.jsc().getPersistentRDDs().containsKey(writeRecords.id()), (String)("RDDs cached for " + instant1 + " should be retained."));
            if (metadataTableEnable) {
                metadataTableCacheIds0.forEach(cacheId -> org.junit.jupiter.api.Assertions.assertTrue((boolean)this.jsc().getPersistentRDDs().containsKey(cacheId), (String)("RDDs cached for metadataTable cacheId " + cacheId + " should be retained.")));
                metadataTableCacheIds1.forEach(cacheId -> org.junit.jupiter.api.Assertions.assertTrue((boolean)this.jsc().getPersistentRDDs().containsKey(cacheId), (String)("RDDs cached for metadataTable cacheId " + cacheId + " should be retained.")));
            }
        }
    }
}

