/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.spark.extensions;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.iceberg.ContentScanTask;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.Files;
import org.apache.iceberg.RowLevelOperationMode;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.data.FileHelpers;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.io.SeekableInputStream;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.spark.SparkCatalog;
import org.apache.iceberg.spark.extensions.Employee;
import org.apache.iceberg.spark.extensions.SparkExtensionsTestBase;
import org.apache.iceberg.util.CharSequenceSet;
import org.apache.iceberg.util.Pair;
import org.apache.spark.SparkEnv;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
import org.apache.spark.storage.memory.MemoryStore;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runners.Parameterized;

public class TestSparkExecutorCache
extends SparkExtensionsTestBase {
    private static final String UPDATES_VIEW_NAME = "updates";
    private static final AtomicInteger JOB_COUNTER = new AtomicInteger();
    private static final Map<String, CustomInputFile> INPUT_FILES = Collections.synchronizedMap(Maps.newHashMap());
    private String targetTableName;
    private TableIdentifier targetTableIdent;

    @Parameterized.Parameters(name="catalogName = {0}, implementation = {1}, config = {2}")
    public static Object[][] parameters() {
        return new Object[][]{{"testhive", SparkCatalog.class.getName(), ImmutableMap.of((Object)"type", (Object)"hive", (Object)"io-impl", (Object)CustomFileIO.class.getName(), (Object)"default-namespace", (Object)"default")}};
    }

    public TestSparkExecutorCache(String catalogName, String implementation, Map<String, String> config) {
        super(catalogName, implementation, config);
    }

    @Before
    public void configureTargetTableName() {
        String name = "target_exec_cache_" + JOB_COUNTER.incrementAndGet();
        this.targetTableName = this.tableName(name);
        this.targetTableIdent = TableIdentifier.of((Namespace)Namespace.of((String[])new String[]{"default"}), (String)name);
    }

    @After
    public void releaseResources() {
        this.sql("DROP TABLE IF EXISTS %s", new Object[]{this.targetTableName});
        this.sql("DROP TABLE IF EXISTS %s", new Object[]{UPDATES_VIEW_NAME});
        INPUT_FILES.clear();
    }

    @Test
    public void testCopyOnWriteDelete() throws Exception {
        this.checkDelete(RowLevelOperationMode.COPY_ON_WRITE);
    }

    @Test
    public void testMergeOnReadDelete() throws Exception {
        this.checkDelete(RowLevelOperationMode.MERGE_ON_READ);
    }

    private void checkDelete(RowLevelOperationMode mode) throws Exception {
        List<DeleteFile> deleteFiles = this.createAndInitTable("write.delete.mode", mode);
        this.sql("DELETE FROM %s WHERE id = 1 OR id = 4", new Object[]{this.targetTableName});
        int maxRequestCount = mode == RowLevelOperationMode.COPY_ON_WRITE ? 3 : 1;
        Assertions.assertThat(deleteFiles).allMatch(deleteFile -> this.streamCount((DeleteFile)deleteFile) <= maxRequestCount);
        this.assertEquals("Should have expected rows", (List)ImmutableList.of(), this.sql("SELECT * FROM %s ORDER BY id ASC", new Object[]{this.targetTableName}));
    }

    @Test
    public void testCopyOnWriteUpdate() throws Exception {
        this.checkUpdate(RowLevelOperationMode.COPY_ON_WRITE);
    }

    @Test
    public void testMergeOnReadUpdate() throws Exception {
        this.checkUpdate(RowLevelOperationMode.MERGE_ON_READ);
    }

    private void checkUpdate(RowLevelOperationMode mode) throws Exception {
        List<DeleteFile> deleteFiles = this.createAndInitTable("write.update.mode", mode);
        Dataset updateDS = spark.createDataset((List)ImmutableList.of((Object)1, (Object)4), Encoders.INT());
        updateDS.createOrReplaceTempView(UPDATES_VIEW_NAME);
        this.sql("UPDATE %s SET id = -1 WHERE id IN (SELECT * FROM %s)", new Object[]{this.targetTableName, UPDATES_VIEW_NAME});
        int maxRequestCount = mode == RowLevelOperationMode.COPY_ON_WRITE ? 5 : 1;
        Assertions.assertThat(deleteFiles).allMatch(deleteFile -> this.streamCount((DeleteFile)deleteFile) <= maxRequestCount);
        this.assertEquals("Should have expected rows", (List)ImmutableList.of((Object)this.row(new Object[]{-1, "hr"}), (Object)this.row(new Object[]{-1, "hr"})), this.sql("SELECT * FROM %s ORDER BY id ASC", new Object[]{this.targetTableName}));
    }

    @Test
    public void testCopyOnWriteMerge() throws Exception {
        this.checkMerge(RowLevelOperationMode.COPY_ON_WRITE);
    }

    @Test
    public void testMergeOnReadMerge() throws Exception {
        this.checkMerge(RowLevelOperationMode.MERGE_ON_READ);
    }

    private void checkMerge(RowLevelOperationMode mode) throws Exception {
        List<DeleteFile> deleteFiles = this.createAndInitTable("write.merge.mode", mode);
        Dataset updateDS = spark.createDataset((List)ImmutableList.of((Object)1, (Object)4), Encoders.INT());
        updateDS.createOrReplaceTempView(UPDATES_VIEW_NAME);
        this.sql("MERGE INTO %s t USING %s s ON t.id == s.value WHEN MATCHED THEN   UPDATE SET id = 100 WHEN NOT MATCHED THEN   INSERT (id, dep) VALUES (-1, 'unknown')", new Object[]{this.targetTableName, UPDATES_VIEW_NAME});
        int maxRequestCount = mode == RowLevelOperationMode.COPY_ON_WRITE ? 3 : 1;
        Assertions.assertThat(deleteFiles).allMatch(deleteFile -> this.streamCount((DeleteFile)deleteFile) <= maxRequestCount);
        this.assertEquals("Should have expected rows", (List)ImmutableList.of((Object)this.row(new Object[]{100, "hr"}), (Object)this.row(new Object[]{100, "hr"})), this.sql("SELECT * FROM %s ORDER BY id ASC", new Object[]{this.targetTableName}));
    }

    private int streamCount(DeleteFile deleteFile) {
        CustomInputFile inputFile = INPUT_FILES.get(deleteFile.path().toString());
        return inputFile.streamCount();
    }

    private List<DeleteFile> createAndInitTable(String operation, RowLevelOperationMode mode) throws Exception {
        this.sql("CREATE TABLE %s (id INT, dep STRING) USING iceberg TBLPROPERTIES ('%s' '%s', '%s' '%s', '%s' '%s')", new Object[]{this.targetTableName, "write.metadata.path", this.temp.toString().replaceFirst("file:", ""), "write.data.path", this.temp.toString().replaceFirst("file:", ""), operation, mode.modeName()});
        this.append(this.targetTableName, new Employee(0, "hr"), new Employee(1, "hr"), new Employee(2, "hr"));
        this.append(this.targetTableName, new Employee(3, "hr"), new Employee(4, "hr"), new Employee(5, "hr"));
        Table table = this.validationCatalog.loadTable(this.targetTableIdent);
        List<Pair<CharSequence, Long>> posDeletes = this.dataFiles(table).stream().map(dataFile -> Pair.of((Object)dataFile.path(), (Object)0L)).collect(Collectors.toList());
        Pair<DeleteFile, CharSequenceSet> posDeleteResult = this.writePosDeletes(table, posDeletes);
        DeleteFile posDeleteFile = (DeleteFile)posDeleteResult.first();
        CharSequenceSet referencedDataFiles = (CharSequenceSet)posDeleteResult.second();
        DeleteFile eqDeleteFile = this.writeEqDeletes(table, "id", 2, 5);
        table.newRowDelta().validateFromSnapshot(table.currentSnapshot().snapshotId()).validateDataFilesExist((Iterable)referencedDataFiles).addDeletes(posDeleteFile).addDeletes(eqDeleteFile).commit();
        this.sql("REFRESH TABLE %s", new Object[]{this.targetTableName});
        SparkEnv sparkEnv = SparkEnv.get();
        MemoryStore memoryStore = sparkEnv.blockManager().memoryStore();
        memoryStore.clear();
        return ImmutableList.of((Object)posDeleteFile, (Object)eqDeleteFile);
    }

    private DeleteFile writeEqDeletes(Table table, String col, Object ... values) throws IOException {
        Schema deleteSchema = table.schema().select(new String[]{col});
        GenericRecord delete = GenericRecord.create((Schema)deleteSchema);
        ArrayList deletes = Lists.newArrayList();
        for (Object value : values) {
            deletes.add(delete.copy(col, value));
        }
        OutputFile out = Files.localOutput((File)this.temp.newFile("eq-deletes-" + UUID.randomUUID()));
        return FileHelpers.writeDeleteFile((Table)table, (OutputFile)out, null, (List)deletes, (Schema)deleteSchema);
    }

    private Pair<DeleteFile, CharSequenceSet> writePosDeletes(Table table, List<Pair<CharSequence, Long>> deletes) throws IOException {
        OutputFile out = Files.localOutput((File)this.temp.newFile("pos-deletes-" + UUID.randomUUID()));
        return FileHelpers.writeDeleteFile((Table)table, (OutputFile)out, null, deletes);
    }

    private void append(String target, Employee ... employees) throws NoSuchTableException {
        List<Employee> input = Arrays.asList(employees);
        Dataset inputDF = spark.createDataFrame(input, Employee.class);
        inputDF.coalesce(1).writeTo(target).append();
    }

    private Collection<DataFile> dataFiles(Table table) {
        ImmutableList immutableList;
        block8: {
            CloseableIterable tasks = table.newScan().planFiles();
            try {
                immutableList = ImmutableList.copyOf((Iterable)Iterables.transform((Iterable)tasks, ContentScanTask::file));
                if (tasks == null) break block8;
            }
            catch (Throwable throwable) {
                try {
                    if (tasks != null) {
                        try {
                            tasks.close();
                        }
                        catch (Throwable throwable2) {
                            throwable.addSuppressed(throwable2);
                        }
                    }
                    throw throwable;
                }
                catch (IOException e) {
                    throw new RuntimeIOException(e);
                }
            }
            tasks.close();
        }
        return immutableList;
    }

    public static class CustomInputFile
    implements InputFile {
        private final InputFile delegate;
        private final AtomicInteger streamCount;

        public CustomInputFile(String path) {
            this.delegate = Files.localInput((String)path);
            this.streamCount = new AtomicInteger();
        }

        public long getLength() {
            return this.delegate.getLength();
        }

        public SeekableInputStream newStream() {
            this.streamCount.incrementAndGet();
            return this.delegate.newStream();
        }

        public int streamCount() {
            return this.streamCount.get();
        }

        public String location() {
            return this.delegate.location();
        }

        public boolean exists() {
            return this.delegate.exists();
        }
    }

    public static class CustomFileIO
    implements FileIO {
        public InputFile newInputFile(String path) {
            return INPUT_FILES.computeIfAbsent(path, key -> new CustomInputFile(path));
        }

        public OutputFile newOutputFile(String path) {
            return Files.localOutput((String)path);
        }

        public void deleteFile(String path) {
            File file = new File(path);
            if (!file.delete()) {
                throw new RuntimeIOException("Failed to delete file: " + path, new Object[0]);
            }
        }
    }
}

