/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.client.functional;

import java.io.IOException;
import java.io.Serializable;
import java.nio.file.Files;
import java.nio.file.attribute.FileAttribute;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.embedded.EmbeddedTimelineService;
import org.apache.hudi.common.config.HoodieCommonConfig;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.engine.HoodieLocalEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.function.SerializableFunctionUnchecked;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
import org.apache.hudi.common.table.view.FileSystemViewManager;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.table.view.FileSystemViewStorageType;
import org.apache.hudi.common.table.view.RemoteHoodieTableFileSystemView;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.metadata.HoodieBackedTestDelayedTableMetadata;
import org.apache.hudi.storage.HoodieStorageUtils;
import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.testutils.HoodieSparkClientTestHarness;
import org.apache.hudi.timeline.service.TimelineService;
import org.apache.spark.api.java.JavaRDD;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TestRemoteFileSystemViewWithMetadataTable
extends HoodieSparkClientTestHarness {
    private static final Logger LOG = LoggerFactory.getLogger(TestRemoteFileSystemViewWithMetadataTable.class);

    @BeforeEach
    public void setUp() throws Exception {
        this.initPath();
        this.initSparkContexts();
        this.initHoodieStorage();
        this.dataGen = new HoodieTestDataGenerator(8070L);
    }

    @AfterEach
    public void tearDown() throws Exception {
        this.cleanupTimelineService();
        this.cleanupClients();
        this.cleanupSparkContexts();
        this.cleanupFileSystem();
        this.cleanupExecutorService();
        this.dataGen = null;
        System.gc();
    }

    @Override
    public void initTimelineService() {
        HoodieLocalEngineContext localEngineContext = new HoodieLocalEngineContext(this.storageConf);
        try {
            HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(this.basePath).withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder().withRemoteServerPort(Integer.valueOf(this.incrementTimelineServicePortToUse())).build()).build();
            this.timelineService = new TimelineService((HoodieEngineContext)localEngineContext, new Configuration(), TimelineService.Config.builder().enableMarkerRequests(true).serverPort(config.getViewStorageConfig().getRemoteViewServerPort().intValue()).build(), HoodieStorageUtils.getStorage((StorageConfiguration)HoodieTestUtils.getDefaultStorageConf()), FileSystemViewManager.createViewManager((HoodieEngineContext)this.context, (FileSystemViewStorageConfig)config.getViewStorageConfig(), (HoodieCommonConfig)config.getCommonConfig(), (SerializableFunctionUnchecked & Serializable)metaClient -> new HoodieBackedTestDelayedTableMetadata((HoodieEngineContext)this.context, metaClient.getStorage(), config.getMetadataConfig(), metaClient.getBasePathV2().toString(), true)));
            this.timelineService.startService();
            timelineServicePort = this.timelineService.getServerPort();
            LOG.info("Started timeline server on port: " + timelineServicePort);
        }
        catch (Exception ex) {
            throw new RuntimeException(ex);
        }
    }

    @ParameterizedTest
    @EnumSource(value=TestCase.class)
    public void testMORGetLatestFileSliceWithMetadataTable(TestCase testCase) throws IOException {
        if (testCase.useExistingTimelineServer) {
            this.initTimelineService();
        }
        String basePathStr1 = this.initializeTable("dataset1");
        String basePathStr2 = this.initializeTable("dataset2");
        try (SparkRDDWriteClient writeClient1 = this.createWriteClient(basePathStr1, "test_mor_table1", testCase.reuseTimelineServer, (Option<TimelineService>)(testCase.useExistingTimelineServer ? Option.of((Object)this.timelineService) : Option.empty()));
             SparkRDDWriteClient writeClient2 = this.createWriteClient(basePathStr2, "test_mor_table2", testCase.reuseTimelineServer, (Option<TimelineService>)(testCase.useExistingTimelineServer ? Option.of((Object)this.timelineService) : Option.empty()));){
            int i;
            for (i = 0; i < 3; ++i) {
                this.writeToTable(i, writeClient1);
            }
            for (i = 0; i < 3; ++i) {
                this.writeToTable(i, writeClient2);
            }
            this.runAssertionsForBasePath(testCase.useExistingTimelineServer, basePathStr1, writeClient1);
            this.runAssertionsForBasePath(testCase.useExistingTimelineServer, basePathStr2, writeClient2);
        }
    }

    private void runAssertionsForBasePath(boolean useExistingTimelineServer, String basePathStr, SparkRDDWriteClient writeClient) throws IOException {
        HoodieTableMetaClient newMetaClient = this.createMetaClient(basePathStr);
        HoodieActiveTimeline timeline = newMetaClient.getActiveTimeline();
        HoodieInstant compactionCommit = (HoodieInstant)timeline.lastInstant().get();
        Assertions.assertTrue((boolean)((HoodieInstant)timeline.lastInstant().get()).getAction().equals("commit"));
        HoodieCommitMetadata commitMetadata = (HoodieCommitMetadata)HoodieCommitMetadata.fromBytes((byte[])((byte[])timeline.getInstantDetails(compactionCommit).get()), HoodieCommitMetadata.class);
        List partitionFileIdPairList = commitMetadata.getPartitionToWriteStats().entrySet().stream().flatMap(entry -> {
            String partitionPath = (String)entry.getKey();
            return ((List)entry.getValue()).stream().map(writeStat -> Pair.of((Object)partitionPath, (Object)writeStat.getFileId()));
        }).collect(Collectors.toList());
        ArrayList lookupList = new ArrayList();
        while (lookupList.size() < 128) {
            lookupList.addAll(partitionFileIdPairList);
        }
        int timelineServerPort = useExistingTimelineServer ? this.timelineService.getServerPort() : ((EmbeddedTimelineService)writeClient.getTimelineServer().get()).getRemoteFileSystemViewConfig().getRemoteViewServerPort().intValue();
        LOG.info("Connecting to Timeline Server: " + timelineServerPort);
        RemoteHoodieTableFileSystemView view = new RemoteHoodieTableFileSystemView("localhost", timelineServerPort, newMetaClient);
        List<TestViewLookUpCallable> callableList = lookupList.stream().map(pair -> new TestViewLookUpCallable(view, (Pair<String, String>)pair, compactionCommit.getTimestamp(), basePathStr)).collect(Collectors.toList());
        ArrayList resultList = new ArrayList();
        ExecutorService pool = Executors.newCachedThreadPool();
        callableList.forEach(callable -> resultList.add(pool.submit(callable)));
        Assertions.assertTrue((boolean)resultList.stream().map(future -> {
            try {
                return (Boolean)future.get();
            }
            catch (Exception e) {
                LOG.error("Get result error", (Throwable)e);
                return false;
            }
        }).reduce((a, b) -> a != false && b != false).get());
        pool.shutdown();
    }

    private String initializeTable(String dataset) throws IOException {
        java.nio.file.Path basePath = this.tempDir.resolve(dataset);
        Files.createDirectories(basePath, new FileAttribute[0]);
        String basePathStr = basePath.toAbsolutePath().toString();
        HoodieTestUtils.init((StorageConfiguration)this.storageConf, (String)basePathStr, (HoodieTableType)HoodieTableType.MERGE_ON_READ, (Properties)new Properties());
        return basePathStr;
    }

    protected HoodieTableType getTableType() {
        return HoodieTableType.MERGE_ON_READ;
    }

    private SparkRDDWriteClient createWriteClient(String basePath, String tableName, boolean reuseTimelineServer, Option<TimelineService> timelineService) {
        HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder().withPath(basePath).withSchema("{\"type\": \"record\",\"name\": \"triprec\",\"fields\": [ {\"name\": \"timestamp\",\"type\": \"long\"},{\"name\": \"_row_key\", \"type\": \"string\"},{\"name\": \"partition_path\", \"type\": [\"null\", \"string\"], \"default\": null },{\"name\": \"trip_type\", \"type\": {\"type\": \"enum\", \"name\": \"TripType\", \"symbols\": [\"UNKNOWN\", \"UBERX\", \"BLACK\"], \"default\": \"UNKNOWN\"}},{\"name\": \"rider\", \"type\": \"string\"},{\"name\": \"driver\", \"type\": \"string\"},{\"name\": \"begin_lat\", \"type\": \"double\"},{\"name\": \"begin_lon\", \"type\": \"double\"},{\"name\": \"end_lat\", \"type\": \"double\"},{\"name\": \"end_lon\", \"type\": \"double\"},{\"name\": \"distance_in_meters\", \"type\": \"int\"},{\"name\": \"seconds_since_epoch\", \"type\": \"long\"},{\"name\": \"weight\", \"type\": \"float\"},{\"name\": \"nation\", \"type\": \"bytes\"},{\"name\":\"current_date\",\"type\": {\"type\": \"int\", \"logicalType\": \"date\"}},{\"name\":\"current_ts\",\"type\": {\"type\": \"long\"}},{\"name\":\"height\",\"type\":{\"type\":\"fixed\",\"name\":\"abc\",\"size\":5,\"logicalType\":\"decimal\",\"precision\":10,\"scale\":6}},{\"name\": \"city_to_state\", \"type\": {\"type\": \"map\", \"values\": \"string\"}},{\"name\": \"fare\",\"type\": {\"type\":\"record\", \"name\":\"fare\",\"fields\": [{\"name\": \"amount\",\"type\": \"double\"},{\"name\": \"currency\", \"type\": \"string\"}]}},{\"name\": \"tip_history\", \"default\": [], \"type\": {\"type\": \"array\", \"default\": [], \"items\": {\"type\": \"record\", \"default\": null, \"name\": \"tip_history\", \"fields\": [{\"name\": \"amount\", \"type\": \"double\"}, {\"name\": \"currency\", \"type\": \"string\"}]}}},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false} ]}").withParallelism(2, 2).withBulkInsertParallelism(2).withFinalizeWriteParallelism(2).withDeleteParallelism(2).withTimelineLayoutVersion(TimelineLayoutVersion.CURR_VERSION.intValue()).withMergeSmallFileGroupCandidatesLimit(0).withCompactionConfig(HoodieCompactionConfig.newBuilder().withMaxNumDeltaCommitsBeforeCompaction(3).build()).withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder().withStorageType(FileSystemViewStorageType.REMOTE_ONLY).withRemoteServerPort(timelineService.isPresent() ? Integer.valueOf(((TimelineService)timelineService.get()).getServerPort()) : (Integer)FileSystemViewStorageConfig.REMOTE_PORT_NUM.defaultValue()).build()).withEmbeddedTimelineServerReuseEnabled(reuseTimelineServer).withAutoCommit(false).forTable(tableName).build();
        return new SparkRDDWriteClient((HoodieEngineContext)this.context, writeConfig, timelineService);
    }

    private void writeToTable(int round, SparkRDDWriteClient writeClient) throws IOException {
        String instantTime = HoodieActiveTimeline.createNewInstantTime();
        writeClient.startCommitWithTime(instantTime);
        List records = round == 0 ? this.dataGen.generateInserts(instantTime, Integer.valueOf(100)) : this.dataGen.generateUpdates(instantTime, Integer.valueOf(100));
        JavaRDD writeStatusRDD = writeClient.upsert(this.jsc.parallelize(records, 1), instantTime);
        writeClient.commit(instantTime, (Object)writeStatusRDD, Option.empty(), "deltacommit", Collections.emptyMap());
        writeClient.scheduleCompaction(Option.empty());
        writeClient.runAnyPendingCompactions();
    }

    class TestViewLookUpCallable
    implements Callable<Boolean> {
        private final RemoteHoodieTableFileSystemView view;
        private final Pair<String, String> partitionFileIdPair;
        private final String expectedCommitTime;
        private final String expectedBasePath;

        public TestViewLookUpCallable(RemoteHoodieTableFileSystemView view, Pair<String, String> partitionFileIdPair, String expectedCommitTime, String expectedBasePath) {
            this.view = view;
            this.partitionFileIdPair = partitionFileIdPair;
            this.expectedCommitTime = expectedCommitTime;
            this.expectedBasePath = expectedBasePath;
        }

        @Override
        public Boolean call() throws Exception {
            boolean result;
            Option latestFileSlice = this.view.getLatestFileSlice((String)this.partitionFileIdPair.getLeft(), (String)this.partitionFileIdPair.getRight());
            String latestBaseFilePath = ((HoodieBaseFile)((FileSlice)latestFileSlice.get()).getBaseFile().get()).getPath();
            boolean bl = result = latestFileSlice.isPresent() && latestBaseFilePath.startsWith(this.expectedBasePath) && this.expectedCommitTime.equals(FSUtils.getCommitTime((String)new Path(latestBaseFilePath).getName()));
            if (!result) {
                LOG.error("The timeline server does not return the correct result: latestFileSliceReturned=" + latestFileSlice + " expectedCommitTime=" + this.expectedCommitTime);
            }
            return result;
        }
    }

    private static enum TestCase {
        USE_EXISTING_TIMELINE_SERVER(true, false),
        EMBEDDED_TIMELINE_SERVER_PER_TABLE(false, false),
        SINGLE_EMBEDDED_TIMELINE_SERVER(false, true);

        private final boolean useExistingTimelineServer;
        private final boolean reuseTimelineServer;

        private TestCase(boolean useExistingTimelineServer, boolean reuseTimelineServer) {
            this.useExistingTimelineServer = useExistingTimelineServer;
            this.reuseTimelineServer = reuseTimelineServer;
        }
    }
}

