/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.spark.extensions;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.RowLevelOperationMode;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.util.concurrent.MoreExecutors;
import org.apache.iceberg.spark.Spark3Util;
import org.apache.iceberg.spark.extensions.TestMerge;
import org.apache.iceberg.util.SnapshotUtil;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.internal.SQLConf;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Test;

public class TestCopyOnWriteMerge
extends TestMerge {
    public TestCopyOnWriteMerge(String catalogName, String implementation, Map<String, String> config, String fileFormat, boolean vectorized, String distributionMode, String branch) {
        super(catalogName, implementation, config, fileFormat, vectorized, distributionMode, branch);
    }

    @Override
    protected Map<String, String> extraTableProperties() {
        return ImmutableMap.of((Object)"write.merge.mode", (Object)RowLevelOperationMode.COPY_ON_WRITE.modeName());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public synchronized void testMergeWithConcurrentTableRefresh() throws Exception {
        Assume.assumeTrue((boolean)this.catalogName.equalsIgnoreCase("testhive"));
        this.createAndInitTable("id INT, dep STRING");
        this.createOrReplaceView("source", Collections.singletonList(1), Encoders.INT());
        this.sql("ALTER TABLE %s SET TBLPROPERTIES('%s' '%s')", new Object[]{this.tableName, "write.merge.isolation-level", "snapshot"});
        this.sql("INSERT INTO TABLE %s VALUES (1, 'hr')", new Object[]{this.tableName});
        this.createBranchIfNeeded();
        Table table = Spark3Util.loadIcebergTable((SparkSession)spark, (String)this.tableName);
        ExecutorService executorService = MoreExecutors.getExitingExecutorService((ThreadPoolExecutor)((ThreadPoolExecutor)Executors.newFixedThreadPool(2)));
        AtomicInteger barrier = new AtomicInteger(0);
        AtomicBoolean shouldAppend = new AtomicBoolean(true);
        Future<?> mergeFuture = executorService.submit(() -> {
            for (int numOperations = 0; numOperations < Integer.MAX_VALUE; ++numOperations) {
                while (barrier.get() < numOperations * 2) {
                    this.sleep(10L);
                }
                this.sql("MERGE INTO %s t USING source s ON t.id == s.value WHEN MATCHED THEN   UPDATE SET dep = 'x'", new Object[]{this.tableName});
                barrier.incrementAndGet();
            }
        });
        Future<?> appendFuture = executorService.submit(() -> {
            GenericRecord record = GenericRecord.create((Schema)table.schema());
            record.set(0, (Object)1);
            record.set(1, (Object)"hr");
            for (int numOperations = 0; numOperations < Integer.MAX_VALUE; ++numOperations) {
                while (shouldAppend.get() && barrier.get() < numOperations * 2) {
                    this.sleep(10L);
                }
                if (!shouldAppend.get()) {
                    return;
                }
                for (int numAppends = 0; numAppends < 5; ++numAppends) {
                    DataFile dataFile = this.writeDataFile(table, (List<GenericRecord>)ImmutableList.of((Object)record));
                    table.newFastAppend().appendFile(dataFile).commit();
                    this.sleep(10L);
                }
                barrier.incrementAndGet();
            }
        });
        try {
            ((AbstractThrowableAssert)((AbstractThrowableAssert)Assertions.assertThatThrownBy(mergeFuture::get).isInstanceOf(ExecutionException.class)).cause().isInstanceOf(IllegalStateException.class)).hasMessageContaining("the table has been concurrently modified");
        }
        finally {
            shouldAppend.set(false);
            appendFuture.cancel(true);
        }
        executorService.shutdown();
        Assert.assertTrue((String)"Timeout", (boolean)executorService.awaitTermination(2L, TimeUnit.MINUTES));
    }

    @Test
    public void testRuntimeFilteringWithReportedPartitioning() {
        this.createAndInitTable("id INT, dep STRING");
        this.sql("ALTER TABLE %s ADD PARTITION FIELD dep", new Object[]{this.tableName});
        this.append(this.tableName, "{ \"id\": 1, \"dep\": \"hr\" }\n{ \"id\": 3, \"dep\": \"hr\" }");
        this.createBranchIfNeeded();
        this.append(this.commitTarget(), "{ \"id\": 1, \"dep\": \"hardware\" }\n{ \"id\": 2, \"dep\": \"hardware\" }");
        this.createOrReplaceView("source", Collections.singletonList(2), Encoders.INT());
        ImmutableMap sqlConf = ImmutableMap.of((Object)SQLConf.V2_BUCKETING_ENABLED().key(), (Object)"true", (Object)"spark.sql.iceberg.planning.preserve-data-grouping", (Object)"true");
        this.withSQLConf((Map)sqlConf, () -> this.sql("MERGE INTO %s t USING source s ON t.id == s.value WHEN MATCHED THEN   UPDATE SET id = -1", new Object[]{this.commitTarget()}));
        Table table = this.validationCatalog.loadTable(this.tableIdent);
        Assert.assertEquals((String)"Should have 3 snapshots", (long)3L, (long)Iterables.size((Iterable)table.snapshots()));
        Snapshot currentSnapshot = SnapshotUtil.latestSnapshot((Table)table, (String)this.branch);
        this.validateCopyOnWrite(currentSnapshot, "1", "1", "1");
        this.assertEquals("Should have expected rows", (List)ImmutableList.of((Object)this.row(new Object[]{-1, "hardware"}), (Object)this.row(new Object[]{1, "hardware"}), (Object)this.row(new Object[]{1, "hr"}), (Object)this.row(new Object[]{3, "hr"})), this.sql("SELECT * FROM %s ORDER BY id, dep", new Object[]{this.selectTarget()}));
    }
}

