/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.spark;

import com.github.benmanes.caffeine.cache.Cache;
import java.io.File;
import java.io.IOException;
import java.lang.reflect.Field;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.iceberg.ContentScanTask;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.Files;
import org.apache.iceberg.Parameters;
import org.apache.iceberg.RowLevelOperationMode;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.data.FileHelpers;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.io.SeekableInputStream;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.spark.Employee;
import org.apache.iceberg.spark.SparkCatalog;
import org.apache.iceberg.spark.SparkExecutorCache;
import org.apache.iceberg.spark.TestBaseWithCatalog;
import org.apache.iceberg.util.CharSequenceSet;
import org.apache.iceberg.util.Pair;
import org.apache.spark.SparkEnv;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
import org.apache.spark.storage.memory.MemoryStore;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestTemplate;

public class TestSparkExecutorCache
extends TestBaseWithCatalog {
    private static final String UPDATES_VIEW_NAME = "updates";
    private static final AtomicInteger JOB_COUNTER = new AtomicInteger();
    private static final Map<String, CustomInputFile> INPUT_FILES = Collections.synchronizedMap(Maps.newHashMap());
    private String targetTableName;
    private TableIdentifier targetTableIdent;

    @Parameters(name="catalogName = {0}, implementation = {1}, config = {2}")
    protected static Object[][] parameters() {
        return new Object[][]{{"testhive", SparkCatalog.class.getName(), ImmutableMap.of((Object)"type", (Object)"hive", (Object)"io-impl", (Object)CustomFileIO.class.getName(), (Object)"default-namespace", (Object)"default")}};
    }

    @BeforeEach
    public void configureTargetTableName() {
        String name = "target_exec_cache_" + JOB_COUNTER.incrementAndGet();
        this.targetTableName = this.tableName(name);
        this.targetTableIdent = TableIdentifier.of((Namespace)Namespace.of((String[])new String[]{"default"}), (String)name);
    }

    @AfterEach
    public void releaseResources() {
        this.sql("DROP TABLE IF EXISTS %s", this.targetTableName);
        this.sql("DROP TABLE IF EXISTS %s", UPDATES_VIEW_NAME);
        INPUT_FILES.clear();
    }

    @TestTemplate
    public void testCacheValueWeightOverflow() {
        SparkExecutorCache.CacheValue cacheValue = new SparkExecutorCache.CacheValue((Object)"v", 0x80000000L);
        Assertions.assertThat((int)cacheValue.weight()).isEqualTo(Integer.MAX_VALUE);
    }

    @TestTemplate
    public void testCacheEnabledConfig() {
        this.withSQLConf((Map<String, String>)ImmutableMap.of((Object)"spark.sql.iceberg.executor-cache.enabled", (Object)"true"), () -> {
            SparkExecutorCache.Conf conf = new SparkExecutorCache.Conf();
            Assertions.assertThat((boolean)conf.cacheEnabled()).isTrue();
        });
        this.withSQLConf((Map<String, String>)ImmutableMap.of((Object)"spark.sql.iceberg.executor-cache.enabled", (Object)"false"), () -> {
            SparkExecutorCache.Conf conf = new SparkExecutorCache.Conf();
            Assertions.assertThat((boolean)conf.cacheEnabled()).isFalse();
        });
    }

    @TestTemplate
    public void testTimeoutConfig() {
        this.withSQLConf((Map<String, String>)ImmutableMap.of((Object)"spark.sql.iceberg.executor-cache.timeout", (Object)"10s"), () -> {
            SparkExecutorCache.Conf conf = new SparkExecutorCache.Conf();
            Assertions.assertThat((Duration)conf.timeout()).hasSeconds(10L);
        });
        this.withSQLConf((Map<String, String>)ImmutableMap.of((Object)"spark.sql.iceberg.executor-cache.timeout", (Object)"2m"), () -> {
            SparkExecutorCache.Conf conf = new SparkExecutorCache.Conf();
            Assertions.assertThat((Duration)conf.timeout()).hasMinutes(2L);
        });
    }

    @TestTemplate
    public void testMaxEntrySizeConfig() {
        this.withSQLConf((Map<String, String>)ImmutableMap.of((Object)"spark.sql.iceberg.executor-cache.max-entry-size", (Object)"128"), () -> {
            SparkExecutorCache.Conf conf = new SparkExecutorCache.Conf();
            Assertions.assertThat((long)conf.maxEntrySize()).isEqualTo(128L);
        });
    }

    @TestTemplate
    public void testMaxTotalSizeConfig() {
        this.withSQLConf((Map<String, String>)ImmutableMap.of((Object)"spark.sql.iceberg.executor-cache.max-total-size", (Object)"512"), () -> {
            SparkExecutorCache.Conf conf = new SparkExecutorCache.Conf();
            Assertions.assertThat((long)conf.maxTotalSize()).isEqualTo(512L);
        });
    }

    @TestTemplate
    public void testConcurrentAccess() throws InterruptedException {
        SparkExecutorCache cache = SparkExecutorCache.getOrCreate();
        String table1 = "table1";
        String table2 = "table2";
        HashSet loadedInternalKeys = Sets.newHashSet();
        String key1 = "key1";
        String key2 = "key2";
        long valueSize = 100L;
        int threadCount = 10;
        ExecutorService executorService = Executors.newFixedThreadPool(threadCount);
        for (int threadNumber = 0; threadNumber < threadCount; ++threadNumber) {
            String group = threadNumber % 2 == 0 ? table1 : table2;
            executorService.submit(() -> {
                for (int batch = 0; batch < 3; ++batch) {
                    cache.getOrLoad(group, key1, () -> {
                        String internalKey = TestSparkExecutorCache.toInternalKey(group, key1);
                        Set set = loadedInternalKeys;
                        synchronized (set) {
                            Assertions.assertThat((boolean)loadedInternalKeys.contains(internalKey)).isFalse();
                            loadedInternalKeys.add(internalKey);
                        }
                        return "value1";
                    }, valueSize);
                    cache.getOrLoad(group, key2, () -> {
                        String internalKey = TestSparkExecutorCache.toInternalKey(group, key2);
                        Set set = loadedInternalKeys;
                        synchronized (set) {
                            Assertions.assertThat((boolean)loadedInternalKeys.contains(internalKey)).isFalse();
                            loadedInternalKeys.add(internalKey);
                        }
                        return "value2";
                    }, valueSize);
                }
            });
        }
        executorService.shutdown();
        Assertions.assertThat((boolean)executorService.awaitTermination(1L, TimeUnit.MINUTES)).isTrue();
        cache.invalidate(table1);
        cache.invalidate(table2);
        Cache<String, ?> state = TestSparkExecutorCache.fetchInternalCacheState();
        Set liveKeys = state.asMap().keySet();
        Assertions.assertThat(liveKeys).noneMatch(key -> key.startsWith(table1) || key.startsWith(table2));
    }

    @TestTemplate
    public void testCopyOnWriteDelete() throws Exception {
        this.checkDelete(RowLevelOperationMode.COPY_ON_WRITE);
    }

    @TestTemplate
    public void testMergeOnReadDelete() throws Exception {
        this.checkDelete(RowLevelOperationMode.MERGE_ON_READ);
    }

    private void checkDelete(RowLevelOperationMode mode) throws Exception {
        List<DeleteFile> deleteFiles = this.createAndInitTable("write.delete.mode", mode);
        this.sql("DELETE FROM %s WHERE id = 1 OR id = 4", this.targetTableName);
        int maxRequestCount = mode == RowLevelOperationMode.COPY_ON_WRITE ? 3 : 1;
        Assertions.assertThat(deleteFiles).allMatch(deleteFile -> this.streamCount((DeleteFile)deleteFile) <= maxRequestCount);
        this.assertEquals("Should have expected rows", (List<Object[]>)ImmutableList.of(), this.sql("SELECT * FROM %s ORDER BY id ASC", this.targetTableName));
    }

    @TestTemplate
    public void testCopyOnWriteUpdate() throws Exception {
        this.checkUpdate(RowLevelOperationMode.COPY_ON_WRITE);
    }

    @TestTemplate
    public void testMergeOnReadUpdate() throws Exception {
        this.checkUpdate(RowLevelOperationMode.MERGE_ON_READ);
    }

    private void checkUpdate(RowLevelOperationMode mode) throws Exception {
        List<DeleteFile> deleteFiles = this.createAndInitTable("write.update.mode", mode);
        Dataset updateDS = spark.createDataset((List)ImmutableList.of((Object)1, (Object)4), Encoders.INT());
        updateDS.createOrReplaceTempView(UPDATES_VIEW_NAME);
        this.sql("UPDATE %s SET id = -1 WHERE id IN (SELECT * FROM %s)", this.targetTableName, UPDATES_VIEW_NAME);
        int maxRequestCount = mode == RowLevelOperationMode.COPY_ON_WRITE ? 5 : 1;
        Assertions.assertThat(deleteFiles).allMatch(deleteFile -> this.streamCount((DeleteFile)deleteFile) <= maxRequestCount);
        this.assertEquals("Should have expected rows", (List<Object[]>)ImmutableList.of((Object)this.row(-1, "hr"), (Object)this.row(-1, "hr")), this.sql("SELECT * FROM %s ORDER BY id ASC", this.targetTableName));
    }

    @TestTemplate
    public void testCopyOnWriteMerge() throws Exception {
        this.checkMerge(RowLevelOperationMode.COPY_ON_WRITE);
    }

    @TestTemplate
    public void testMergeOnReadMerge() throws Exception {
        this.checkMerge(RowLevelOperationMode.MERGE_ON_READ);
    }

    private void checkMerge(RowLevelOperationMode mode) throws Exception {
        List<DeleteFile> deleteFiles = this.createAndInitTable("write.merge.mode", mode);
        Dataset updateDS = spark.createDataset((List)ImmutableList.of((Object)1, (Object)4), Encoders.INT());
        updateDS.createOrReplaceTempView(UPDATES_VIEW_NAME);
        this.sql("MERGE INTO %s t USING %s s ON t.id == s.value WHEN MATCHED THEN   UPDATE SET id = 100 WHEN NOT MATCHED THEN   INSERT (id, dep) VALUES (-1, 'unknown')", this.targetTableName, UPDATES_VIEW_NAME);
        int maxRequestCount = mode == RowLevelOperationMode.COPY_ON_WRITE ? 3 : 1;
        Assertions.assertThat(deleteFiles).allMatch(deleteFile -> this.streamCount((DeleteFile)deleteFile) <= maxRequestCount);
        this.assertEquals("Should have expected rows", (List<Object[]>)ImmutableList.of((Object)this.row(100, "hr"), (Object)this.row(100, "hr")), this.sql("SELECT * FROM %s ORDER BY id ASC", this.targetTableName));
    }

    private int streamCount(DeleteFile deleteFile) {
        CustomInputFile inputFile = INPUT_FILES.get(deleteFile.path().toString());
        return inputFile.streamCount();
    }

    private List<DeleteFile> createAndInitTable(String operation, RowLevelOperationMode mode) throws Exception {
        this.sql("CREATE TABLE %s (id INT, dep STRING) USING iceberg TBLPROPERTIES ('%s' '%s', '%s' '%s', '%s' '%s')", this.targetTableName, "write.metadata.path", this.temp.toString().replaceFirst("file:", ""), "write.data.path", this.temp.toString().replaceFirst("file:", ""), operation, mode.modeName());
        this.append(this.targetTableName, new Employee(0, "hr"), new Employee(1, "hr"), new Employee(2, "hr"));
        this.append(this.targetTableName, new Employee(3, "hr"), new Employee(4, "hr"), new Employee(5, "hr"));
        Table table = this.validationCatalog.loadTable(this.targetTableIdent);
        List<Pair<CharSequence, Long>> posDeletes = this.dataFiles(table).stream().map(dataFile -> Pair.of((Object)dataFile.path(), (Object)0L)).collect(Collectors.toList());
        Pair<DeleteFile, CharSequenceSet> posDeleteResult = this.writePosDeletes(table, posDeletes);
        DeleteFile posDeleteFile = (DeleteFile)posDeleteResult.first();
        CharSequenceSet referencedDataFiles = (CharSequenceSet)posDeleteResult.second();
        DeleteFile eqDeleteFile = this.writeEqDeletes(table, "id", 2, 5);
        table.newRowDelta().validateFromSnapshot(table.currentSnapshot().snapshotId()).validateDataFilesExist((Iterable)referencedDataFiles).addDeletes(posDeleteFile).addDeletes(eqDeleteFile).commit();
        this.sql("REFRESH TABLE %s", this.targetTableName);
        SparkEnv sparkEnv = SparkEnv.get();
        MemoryStore memoryStore = sparkEnv.blockManager().memoryStore();
        memoryStore.clear();
        return ImmutableList.of((Object)posDeleteFile, (Object)eqDeleteFile);
    }

    private DeleteFile writeEqDeletes(Table table, String col, Object ... values) throws IOException {
        Schema deleteSchema = table.schema().select(new String[]{col});
        GenericRecord delete = GenericRecord.create((Schema)deleteSchema);
        ArrayList deletes = Lists.newArrayList();
        for (Object value : values) {
            deletes.add(delete.copy(col, value));
        }
        OutputFile out = Files.localOutput((File)new File(this.temp, "eq-deletes-" + UUID.randomUUID()));
        return FileHelpers.writeDeleteFile((Table)table, (OutputFile)out, null, (List)deletes, (Schema)deleteSchema);
    }

    private Pair<DeleteFile, CharSequenceSet> writePosDeletes(Table table, List<Pair<CharSequence, Long>> deletes) throws IOException {
        OutputFile out = Files.localOutput((File)new File(this.temp, "pos-deletes-" + UUID.randomUUID()));
        return FileHelpers.writeDeleteFile((Table)table, (OutputFile)out, null, deletes);
    }

    private void append(String target, Employee ... employees) throws NoSuchTableException {
        List<Employee> input = Arrays.asList(employees);
        Dataset inputDF = spark.createDataFrame(input, Employee.class);
        inputDF.coalesce(1).writeTo(target).append();
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    private Collection<DataFile> dataFiles(Table table) {
        try (CloseableIterable tasks = table.newScan().planFiles();){
            ImmutableList immutableList = ImmutableList.copyOf((Iterable)Iterables.transform((Iterable)tasks, ContentScanTask::file));
            return immutableList;
        }
        catch (IOException e) {
            throw new RuntimeIOException(e);
        }
    }

    private static Cache<String, ?> fetchInternalCacheState() {
        try {
            Field stateField = SparkExecutorCache.class.getDeclaredField("state");
            stateField.setAccessible(true);
            SparkExecutorCache cache = SparkExecutorCache.get();
            return (Cache)stateField.get(cache);
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private static String toInternalKey(String group, String key) {
        return group + "_" + key;
    }

    public static class CustomInputFile
    implements InputFile {
        private final InputFile delegate;
        private final AtomicInteger streamCount;

        public CustomInputFile(String path) {
            this.delegate = Files.localInput((String)path);
            this.streamCount = new AtomicInteger();
        }

        public long getLength() {
            return this.delegate.getLength();
        }

        public SeekableInputStream newStream() {
            this.streamCount.incrementAndGet();
            return this.delegate.newStream();
        }

        public int streamCount() {
            return this.streamCount.get();
        }

        public String location() {
            return this.delegate.location();
        }

        public boolean exists() {
            return this.delegate.exists();
        }
    }

    public static class CustomFileIO
    implements FileIO {
        public InputFile newInputFile(String path) {
            return INPUT_FILES.computeIfAbsent(path, key -> new CustomInputFile(path));
        }

        public OutputFile newOutputFile(String path) {
            return Files.localOutput((String)path);
        }

        public void deleteFile(String path) {
            File file = new File(path);
            if (!file.delete()) {
                throw new RuntimeIOException("Failed to delete file: " + path, new Object[0]);
            }
        }
    }
}

