/*
 * Decompiled with CFR 0.152.
 */
package io.trino.plugin.deltalake;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.reflect.ClassPath;
import io.airlift.concurrent.MoreFutures;
import io.trino.filesystem.Location;
import io.trino.filesystem.TrinoFileSystem;
import io.trino.filesystem.TrinoFileSystemFactory;
import io.trino.filesystem.TrinoOutputFile;
import io.trino.plugin.deltalake.DeltaLakeQueryRunner;
import io.trino.plugin.deltalake.TestingDeltaLakeUtils;
import io.trino.spi.security.ConnectorIdentity;
import io.trino.sql.query.QueryAssertions;
import io.trino.testing.AbstractTestQueryFramework;
import io.trino.testing.QueryAssertions;
import io.trino.testing.QueryRunner;
import io.trino.testing.TestingNames;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.LongStream;
import org.assertj.core.api.AssertProvider;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;

@TestInstance(value=TestInstance.Lifecycle.PER_CLASS)
public class TestDeltaLakeLocalConcurrentWritesTest
extends AbstractTestQueryFramework {
    protected QueryRunner createQueryRunner() throws Exception {
        return DeltaLakeQueryRunner.builder().addDeltaProperty("delta.unique-table-location", "true").addDeltaProperty("delta.register-table-procedure.enabled", "true").build();
    }

    @Test
    public void testConcurrentInsertsReconciliationForBlindInserts() throws Exception {
        this.testConcurrentInsertsReconciliationForBlindInserts(false);
        this.testConcurrentInsertsReconciliationForBlindInserts(true);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testConcurrentInsertsReconciliationForBlindInserts(boolean partitioned) throws Exception {
        int threads = 3;
        CyclicBarrier barrier = new CyclicBarrier(threads);
        ExecutorService executor = Executors.newFixedThreadPool(threads);
        String tableName = "test_concurrent_inserts_table_" + TestingNames.randomNameSuffix();
        this.assertUpdate("CREATE TABLE " + tableName + " (a INT, part INT) " + (partitioned ? " WITH (partitioned_by = ARRAY['part'])" : ""));
        try {
            executor.invokeAll(ImmutableList.builder().add(() -> {
                barrier.await(10L, TimeUnit.SECONDS);
                this.getQueryRunner().execute("INSERT INTO " + tableName + " VALUES (1, 10)");
                return null;
            }).add(() -> {
                barrier.await(10L, TimeUnit.SECONDS);
                this.getQueryRunner().execute("INSERT INTO " + tableName + " VALUES (11, 20)");
                return null;
            }).add(() -> {
                barrier.await(10L, TimeUnit.SECONDS);
                this.getQueryRunner().execute("INSERT INTO " + tableName + " VALUES (21, 30)");
                return null;
            }).build()).forEach(MoreFutures::getDone);
            ((QueryAssertions.QueryAssert)Assertions.assertThat((AssertProvider)this.query("SELECT * FROM " + tableName))).matches("VALUES (1, 10), (11, 20), (21, 30)");
            this.assertQuery("SELECT version, operation, isolation_level, read_version, is_blind_append FROM \"" + tableName + "$history\"", "VALUES\n    (0, 'CREATE TABLE', 'WriteSerializable', 0, true),\n    (1, 'WRITE', 'WriteSerializable', 0, true),\n    (2, 'WRITE', 'WriteSerializable', 1, true),\n    (3, 'WRITE', 'WriteSerializable', 2, true)\n");
        }
        finally {
            this.assertUpdate("DROP TABLE " + tableName);
            executor.shutdownNow();
            Assertions.assertThat((boolean)executor.awaitTermination(10L, TimeUnit.SECONDS)).isTrue();
        }
    }

    @Test
    public void testConcurrentInsertsSelectingFromTheSameTable() throws Exception {
        this.testConcurrentInsertsSelectingFromTheSameTable(true);
        this.testConcurrentInsertsSelectingFromTheSameTable(false);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testConcurrentInsertsSelectingFromTheSameTable(boolean partitioned) throws Exception {
        int threads = 3;
        CyclicBarrier barrier = new CyclicBarrier(threads);
        ExecutorService executor = Executors.newFixedThreadPool(threads);
        String tableName = "test_concurrent_inserts_select_from_same_table_" + TestingNames.randomNameSuffix();
        this.assertUpdate("CREATE TABLE " + tableName + " (a, part) " + (partitioned ? " WITH (partitioned_by = ARRAY['part'])" : "") + "  AS VALUES (0, 10)", 1L);
        try {
            List futures = (List)IntStream.range(0, threads).mapToObj(threadNumber -> executor.submit(() -> {
                barrier.await(10L, TimeUnit.SECONDS);
                try {
                    this.getQueryRunner().execute("INSERT INTO " + tableName + " SELECT COUNT(*), 10 AS part FROM " + tableName);
                    return true;
                }
                catch (Exception e) {
                    RuntimeException trinoException = QueryAssertions.getTrinoExceptionCause((Throwable)e);
                    try {
                        Assertions.assertThat((Throwable)trinoException).hasMessage("Failed to write Delta Lake transaction log entry");
                    }
                    catch (Throwable verifyFailure) {
                        if (verifyFailure != e) {
                            verifyFailure.addSuppressed(e);
                        }
                        throw verifyFailure;
                    }
                    return false;
                }
            })).collect(ImmutableList.toImmutableList());
            long successfulInsertsCount = futures.stream().map(MoreFutures::getFutureValue).filter(success -> success).count();
            Assertions.assertThat((long)successfulInsertsCount).isGreaterThanOrEqualTo(1L);
            String string = "(%d, 10)";
            this.assertQuery("SELECT * FROM " + tableName, "VALUES (0, 10)" + LongStream.rangeClosed(1L, successfulInsertsCount).boxed().map(arg_0 -> TestDeltaLakeLocalConcurrentWritesTest.lambda$testConcurrentInsertsSelectingFromTheSameTable$3("(%d, 10)", arg_0)).collect(Collectors.joining(", ", ", ", "")));
            this.assertQuery("SELECT version, operation, isolation_level, read_version, is_blind_append FROM \"" + tableName + "$history\"", "VALUES (0, 'CREATE TABLE AS SELECT', 'WriteSerializable', 0, true)" + LongStream.rangeClosed(1L, successfulInsertsCount).boxed().map(version -> "(%s, 'WRITE', 'WriteSerializable', %s, false)".formatted(version, version - 1L)).collect(Collectors.joining(", ", ", ", "")));
        }
        finally {
            this.assertUpdate("DROP TABLE " + tableName);
            executor.shutdownNow();
            Assertions.assertThat((boolean)executor.awaitTermination(10L, TimeUnit.SECONDS)).isTrue();
        }
    }

    @Test
    @Disabled
    public void testConcurrentInsertsSelectingFromTheSameVersionedTable() throws Exception {
        this.testConcurrentInsertsSelectingFromTheSameVersionedTable(true);
        this.testConcurrentInsertsSelectingFromTheSameVersionedTable(false);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testConcurrentInsertsSelectingFromTheSameVersionedTable(boolean partitioned) throws Exception {
        int threads = 3;
        CyclicBarrier barrier = new CyclicBarrier(threads);
        ExecutorService executor = Executors.newFixedThreadPool(threads);
        String tableName = "test_concurrent_inserts_select_from_same_versioned_table_" + TestingNames.randomNameSuffix();
        this.assertUpdate("CREATE TABLE " + tableName + " (a, part) " + (partitioned ? " WITH (partitioned_by = ARRAY['part'])" : "") + "  AS VALUES (0, 'a')", 1L);
        try {
            executor.invokeAll(ImmutableList.builder().add(() -> {
                barrier.await(10L, TimeUnit.SECONDS);
                this.getQueryRunner().execute("INSERT INTO " + tableName + " SELECT 1, 'b' AS part FROM " + tableName + " FOR VERSION AS OF 0");
                return null;
            }).add(() -> {
                barrier.await(10L, TimeUnit.SECONDS);
                this.getQueryRunner().execute("INSERT INTO " + tableName + " SELECT 2, 'c' AS part FROM " + tableName + " FOR VERSION AS OF 0");
                return null;
            }).add(() -> {
                barrier.await(10L, TimeUnit.SECONDS);
                this.getQueryRunner().execute("INSERT INTO " + tableName + " SELECT 3, 'd' AS part FROM " + tableName + " FOR VERSION AS OF 0");
                return null;
            }).build()).forEach(MoreFutures::getDone);
            this.assertQuery("SELECT * FROM " + tableName, "VALUES (0, 'a'), (1, 'b'), (2, 'c'), (3, 'd')");
            this.assertQuery("SELECT version, operation, isolation_level, read_version, is_blind_append FROM \"" + tableName + "$history\"", "VALUES\n    (0, 'CREATE TABLE AS SELECT', 'WriteSerializable', 0, true),\n    (1, 'WRITE', 'WriteSerializable', 0, true),\n    (2, 'WRITE', 'WriteSerializable', 1, true),\n    (3, 'WRITE', 'WriteSerializable', 2, true)\n");
        }
        finally {
            this.assertUpdate("DROP TABLE " + tableName);
            executor.shutdownNow();
            Assertions.assertThat((boolean)executor.awaitTermination(10L, TimeUnit.SECONDS)).isTrue();
        }
    }

    @Test
    void testConcurrentInsertsSelectingFromTheSameTemporalVersionedTable() throws Exception {
        this.testConcurrentInsertsSelectingFromTheSameTemporalVersionedTable(true);
        this.testConcurrentInsertsSelectingFromTheSameTemporalVersionedTable(false);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testConcurrentInsertsSelectingFromTheSameTemporalVersionedTable(boolean partitioned) throws Exception {
        DateTimeFormatter timestampWithTimeZoneFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS VV");
        int threads = 3;
        CyclicBarrier barrier = new CyclicBarrier(threads);
        ExecutorService executor = Executors.newFixedThreadPool(threads);
        String tableName = "test_concurrent_inserts_select_from_same_temporal_versioned_table_" + TestingNames.randomNameSuffix();
        this.assertUpdate("CREATE TABLE " + tableName + " (a, part) " + (partitioned ? " WITH (partitioned_by = ARRAY['part'])" : "") + "  AS VALUES (0, 'a')", 1L);
        String timeAfterCreateTable = ZonedDateTime.now().format(timestampWithTimeZoneFormatter);
        try {
            executor.invokeAll(ImmutableList.builder().add(() -> {
                barrier.await(10L, TimeUnit.SECONDS);
                this.getQueryRunner().execute("INSERT INTO " + tableName + " SELECT 1, 'b' AS part FROM " + tableName + " FOR TIMESTAMP AS OF TIMESTAMP '" + timeAfterCreateTable + "'");
                return null;
            }).add(() -> {
                barrier.await(10L, TimeUnit.SECONDS);
                this.getQueryRunner().execute("INSERT INTO " + tableName + " SELECT 2, 'c' AS part FROM " + tableName + " FOR TIMESTAMP AS OF TIMESTAMP '" + timeAfterCreateTable + "'");
                return null;
            }).add(() -> {
                barrier.await(10L, TimeUnit.SECONDS);
                this.getQueryRunner().execute("INSERT INTO " + tableName + " SELECT 3, 'd' AS part FROM " + tableName + " FOR TIMESTAMP AS OF TIMESTAMP '" + timeAfterCreateTable + "'");
                return null;
            }).build()).forEach(MoreFutures::getDone);
            this.assertQuery("SELECT * FROM " + tableName, "VALUES (0, 'a'), (1, 'b'), (2, 'c'), (3, 'd')");
            this.assertQuery("SELECT version, operation, isolation_level, read_version, is_blind_append FROM \"" + tableName + "$history\"", "VALUES\n    (0, 'CREATE TABLE AS SELECT', 'WriteSerializable', 0, true),\n    (1, 'WRITE', 'WriteSerializable', 0, true),\n    (2, 'WRITE', 'WriteSerializable', 1, true),\n    (3, 'WRITE', 'WriteSerializable', 2, true)\n");
        }
        finally {
            this.assertUpdate("DROP TABLE " + tableName);
            executor.shutdownNow();
            Assertions.assertThat((boolean)executor.awaitTermination(10L, TimeUnit.SECONDS)).isTrue();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testConcurrentInsertsSelectingFromTheSamePartition() throws Exception {
        int threads = 3;
        CyclicBarrier barrier = new CyclicBarrier(threads);
        ExecutorService executor = Executors.newFixedThreadPool(threads);
        String tableName = "test_concurrent_inserts_select_from_same_partition_" + TestingNames.randomNameSuffix();
        this.assertUpdate("CREATE TABLE " + tableName + " (a, part) WITH (partitioned_by = ARRAY['part']) AS VALUES (0, 10), (11, 20), (22, 30)", 3L);
        try {
            List futures = (List)IntStream.range(0, threads).mapToObj(threadNumber -> executor.submit(() -> {
                barrier.await(10L, TimeUnit.SECONDS);
                try {
                    this.getQueryRunner().execute("INSERT INTO " + tableName + " SELECT COUNT(*) as a, 10 as part FROM " + tableName + " WHERE part = 10");
                    return true;
                }
                catch (Exception e) {
                    RuntimeException trinoException = QueryAssertions.getTrinoExceptionCause((Throwable)e);
                    try {
                        Assertions.assertThat((Throwable)trinoException).hasMessage("Failed to write Delta Lake transaction log entry");
                    }
                    catch (Throwable verifyFailure) {
                        if (verifyFailure != e) {
                            verifyFailure.addSuppressed(e);
                        }
                        throw verifyFailure;
                    }
                    return false;
                }
            })).collect(ImmutableList.toImmutableList());
            long successfulInsertsCount = futures.stream().map(MoreFutures::getFutureValue).filter(success -> success).count();
            Assertions.assertThat((long)successfulInsertsCount).isGreaterThanOrEqualTo(1L);
            String string = "(%d, 10)";
            this.assertQuery("SELECT * FROM " + tableName, "VALUES (0, 10), (11, 20), (22, 30)" + LongStream.rangeClosed(1L, successfulInsertsCount).boxed().map(arg_0 -> TestDeltaLakeLocalConcurrentWritesTest.lambda$testConcurrentInsertsSelectingFromTheSamePartition$3("(%d, 10)", arg_0)).collect(Collectors.joining(", ", ", ", "")));
            this.assertQuery("SELECT version, operation, isolation_level, read_version, is_blind_append FROM \"" + tableName + "$history\"", "VALUES (0, 'CREATE TABLE AS SELECT', 'WriteSerializable', 0, true)" + LongStream.rangeClosed(1L, successfulInsertsCount).boxed().map(version -> "(%s, 'WRITE', 'WriteSerializable', %s, false)".formatted(version, version - 1L)).collect(Collectors.joining(", ", ", ", "")));
        }
        finally {
            this.assertUpdate("DROP TABLE " + tableName);
            executor.shutdownNow();
            Assertions.assertThat((boolean)executor.awaitTermination(10L, TimeUnit.SECONDS)).isTrue();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    @Disabled
    public void testConcurrentInsertsReconciliationForMixedInserts() throws Exception {
        int threads = 3;
        CyclicBarrier barrier = new CyclicBarrier(threads);
        ExecutorService executor = Executors.newFixedThreadPool(threads);
        String tableName = "test_concurrent_mixed_inserts_table_" + TestingNames.randomNameSuffix();
        this.assertUpdate("CREATE TABLE " + tableName + " (a, part) WITH (partitioned_by = ARRAY['part']) AS VALUES (0, 10), (11, 20)", 2L);
        try {
            executor.invokeAll(ImmutableList.builder().add(() -> {
                barrier.await(10L, TimeUnit.SECONDS);
                this.getQueryRunner().execute("INSERT INTO " + tableName + " SELECT COUNT(*) AS a, 10 AS part FROM " + tableName + " WHERE part = 10");
                return null;
            }).add(() -> {
                barrier.await(10L, TimeUnit.SECONDS);
                this.getQueryRunner().execute("INSERT INTO " + tableName + " SELECT COUNT(*) AS a, 20 AS part FROM " + tableName + " WHERE part = 20");
                return null;
            }).add(() -> {
                barrier.await(10L, TimeUnit.SECONDS);
                this.getQueryRunner().execute("INSERT INTO " + tableName + " VALUES (22, 30)");
                return null;
            }).build()).forEach(MoreFutures::getDone);
            this.assertQuery("SELECT * FROM " + tableName, "VALUES (0, 10), (1, 10), (11, 20), (1, 20), (22, 30)");
            this.assertQuery("SELECT operation, isolation_level, is_blind_append FROM \"" + tableName + "$history\"", "VALUES\n    ('CREATE TABLE AS SELECT', 'WriteSerializable', true),\n    ('WRITE', 'WriteSerializable', false),\n    ('WRITE', 'WriteSerializable', false),\n    ('WRITE', 'WriteSerializable', true)\n");
        }
        finally {
            this.assertUpdate("DROP TABLE " + tableName);
            executor.shutdownNow();
            Assertions.assertThat((boolean)executor.awaitTermination(10L, TimeUnit.SECONDS)).isTrue();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testConcurrentInsertsSelectingFromDifferentPartitionsOfSameTable() throws Exception {
        int threads = 3;
        CyclicBarrier barrier = new CyclicBarrier(threads);
        ExecutorService executor = Executors.newFixedThreadPool(threads);
        String tableName = "test_concurrent_mixed_inserts_table_" + TestingNames.randomNameSuffix();
        this.assertUpdate("CREATE TABLE " + tableName + " (a, part) WITH (partitioned_by = ARRAY['part']) AS VALUES (0, 10), (11, 20), (22, 30)", 3L);
        try {
            executor.invokeAll(ImmutableList.builder().add(() -> {
                barrier.await(10L, TimeUnit.SECONDS);
                this.getQueryRunner().execute("INSERT INTO " + tableName + " SELECT a + 1,  40 as part FROM " + tableName + " WHERE part = 10");
                return null;
            }).add(() -> {
                barrier.await(10L, TimeUnit.SECONDS);
                this.getQueryRunner().execute("INSERT INTO " + tableName + " SELECT a + 2,  40 as part FROM " + tableName + " WHERE part = 20");
                return null;
            }).add(() -> {
                barrier.await(10L, TimeUnit.SECONDS);
                this.getQueryRunner().execute("INSERT INTO " + tableName + " SELECT a + 3,  40 as part FROM " + tableName + " WHERE part = 30");
                return null;
            }).build()).forEach(MoreFutures::getDone);
            this.assertQuery("SELECT * FROM " + tableName, "VALUES (0, 10), (11, 20), (22, 30), (1, 40), (13, 40), (25, 40)");
            this.assertQuery("SELECT operation, isolation_level, is_blind_append FROM \"" + tableName + "$history\"", "VALUES\n    ('CREATE TABLE AS SELECT', 'WriteSerializable', true),\n    ('WRITE', 'WriteSerializable', false),\n    ('WRITE', 'WriteSerializable', false),\n    ('WRITE', 'WriteSerializable', false)\n");
        }
        finally {
            this.assertUpdate("DROP TABLE " + tableName);
            executor.shutdownNow();
            Assertions.assertThat((boolean)executor.awaitTermination(10L, TimeUnit.SECONDS)).isTrue();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testConcurrentInsertsSelectingFromMultipleNonoverlappingPartitionsOfSameTable() throws Exception {
        int threads = 3;
        CyclicBarrier barrier = new CyclicBarrier(threads);
        ExecutorService executor = Executors.newFixedThreadPool(threads);
        String tableName = "test_concurrent_mixed_inserts_table_" + TestingNames.randomNameSuffix();
        this.assertUpdate("CREATE TABLE " + tableName + " (a, part) WITH (partitioned_by = ARRAY['part']) AS VALUES (0, 10), (11, 20), (22, 30), (33, 40), (44, 50), (55, 60)", 6L);
        this.assertUpdate("INSERT INTO " + tableName + " VALUES (2, 10), (13, 20), (24, 30), (35, 40), (46, 50), (57, 60)", 6L);
        try {
            executor.invokeAll(ImmutableList.builder().add(() -> {
                barrier.await(10L, TimeUnit.SECONDS);
                this.getQueryRunner().execute("INSERT INTO " + tableName + " SELECT a + 1, part FROM " + tableName + " WHERE part IN (10, 20)");
                return null;
            }).add(() -> {
                barrier.await(10L, TimeUnit.SECONDS);
                this.getQueryRunner().execute("INSERT INTO " + tableName + " SELECT a + 1, part FROM " + tableName + " WHERE part IN (30, 40)");
                return null;
            }).add(() -> {
                barrier.await(10L, TimeUnit.SECONDS);
                this.getQueryRunner().execute("INSERT INTO " + tableName + " SELECT a + 1, part FROM " + tableName + " WHERE part IN (50, 60)");
                return null;
            }).build()).forEach(MoreFutures::getDone);
            this.assertQuery("SELECT * FROM " + tableName, "VALUES\n    (0, 10), (1, 10), (2, 10), (3, 10),\n    (11, 20), (12, 20), (13, 20), (14, 20),\n    (22, 30), (23, 30),(24, 30), (25, 30),\n    (33, 40), (34, 40), (35, 40), (36, 40),\n    (44, 50), (45, 50), (46, 50), (47, 50),\n    (55, 60), (56,60), (57, 60), (58,60)\n");
            this.assertQuery("SELECT version, operation, isolation_level, read_version, is_blind_append FROM \"" + tableName + "$history\"", "VALUES\n    (0, 'CREATE TABLE AS SELECT', 'WriteSerializable', 0, true),\n    (1, 'WRITE', 'WriteSerializable', 0, true),\n    (2, 'WRITE', 'WriteSerializable', 1, false),\n    (3, 'WRITE', 'WriteSerializable', 2, false),\n    (4, 'WRITE', 'WriteSerializable', 3, false)\n");
        }
        finally {
            this.assertUpdate("DROP TABLE " + tableName);
            executor.shutdownNow();
            Assertions.assertThat((boolean)executor.awaitTermination(10L, TimeUnit.SECONDS)).isTrue();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testConcurrentSerializableBlindInsertsReconciliationFailure() throws Exception {
        int threads = 5;
        CyclicBarrier barrier = new CyclicBarrier(threads);
        ExecutorService executor = Executors.newFixedThreadPool(threads);
        String tableName = "test_concurrent_serializable_blind_inserts_table_reconciliation" + TestingNames.randomNameSuffix();
        this.registerTableFromResources(tableName, "deltalake/serializable_partitioned_table", this.getQueryRunner());
        ((QueryAssertions.QueryAssert)Assertions.assertThat((AssertProvider)this.query("SELECT * FROM " + tableName))).matches("VALUES (0, 10), (33, 40)");
        try {
            List futures = (List)IntStream.range(0, threads).mapToObj(threadNumber -> executor.submit(() -> {
                barrier.await(10L, TimeUnit.SECONDS);
                try {
                    this.getQueryRunner().execute("INSERT INTO " + tableName + " VALUES (1, 10)");
                    return true;
                }
                catch (Exception e) {
                    RuntimeException trinoException = QueryAssertions.getTrinoExceptionCause((Throwable)e);
                    try {
                        Assertions.assertThat((Throwable)trinoException).hasMessage("Failed to write Delta Lake transaction log entry");
                    }
                    catch (Throwable verifyFailure) {
                        if (verifyFailure != e) {
                            verifyFailure.addSuppressed(e);
                        }
                        throw verifyFailure;
                    }
                    return false;
                }
            })).collect(ImmutableList.toImmutableList());
            long successfulInsertsCount = futures.stream().map(MoreFutures::getFutureValue).filter(success -> success).count();
            Assertions.assertThat((long)successfulInsertsCount).isGreaterThanOrEqualTo(1L);
        }
        finally {
            this.assertUpdate("DROP TABLE " + tableName);
            executor.shutdownNow();
            Assertions.assertThat((boolean)executor.awaitTermination(10L, TimeUnit.SECONDS)).isTrue();
        }
    }

    @Test
    public void testConcurrentInsertsReconciliationFailure() throws Exception {
        this.testConcurrentInsertsReconciliationFailure(false);
        this.testConcurrentInsertsReconciliationFailure(true);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testConcurrentInsertsReconciliationFailure(boolean partitioned) throws Exception {
        int threads = 5;
        CyclicBarrier barrier = new CyclicBarrier(threads);
        ExecutorService executor = Executors.newFixedThreadPool(threads);
        String tableName = "test_concurrent_inserts_table_" + TestingNames.randomNameSuffix();
        this.assertUpdate("CREATE TABLE " + tableName + " (a, part)" + (partitioned ? " WITH (partitioned_by = ARRAY['part'])" : "") + " AS VALUES (1, 10)", 1L);
        try {
            List futures = (List)IntStream.range(0, threads).mapToObj(threadNumber -> executor.submit(() -> {
                barrier.await(10L, TimeUnit.SECONDS);
                try {
                    this.getQueryRunner().execute("INSERT INTO " + tableName + " SELECT * FROM " + tableName + " WHERE part = 10");
                    return true;
                }
                catch (Exception e) {
                    RuntimeException trinoException = QueryAssertions.getTrinoExceptionCause((Throwable)e);
                    try {
                        Assertions.assertThat((Throwable)trinoException).hasMessage("Failed to write Delta Lake transaction log entry");
                    }
                    catch (Throwable verifyFailure) {
                        if (verifyFailure != e) {
                            verifyFailure.addSuppressed(e);
                        }
                        throw verifyFailure;
                    }
                    return false;
                }
            })).collect(ImmutableList.toImmutableList());
            long successfulInsertsCount = futures.stream().map(MoreFutures::getFutureValue).filter(success -> success).count();
            Assertions.assertThat((long)successfulInsertsCount).isGreaterThanOrEqualTo(1L);
        }
        finally {
            this.assertUpdate("DROP TABLE " + tableName);
            executor.shutdownNow();
            Assertions.assertThat((boolean)executor.awaitTermination(10L, TimeUnit.SECONDS)).isTrue();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testConcurrentDeletePushdownReconciliation() throws Exception {
        int threads = 3;
        CyclicBarrier barrier = new CyclicBarrier(threads);
        ExecutorService executor = Executors.newFixedThreadPool(threads);
        String tableName = "test_concurrent_inserts_table_" + TestingNames.randomNameSuffix();
        this.assertUpdate("CREATE TABLE " + tableName + " (a, part)  WITH (partitioned_by = ARRAY['part']) AS VALUES (1, 10), (11, 20), (21, 30), (31, 40)", 4L);
        try {
            executor.invokeAll(ImmutableList.builder().add(() -> {
                barrier.await(10L, TimeUnit.SECONDS);
                this.getQueryRunner().execute("DELETE FROM " + tableName + " WHERE part = 10");
                return null;
            }).add(() -> {
                barrier.await(10L, TimeUnit.SECONDS);
                this.getQueryRunner().execute("DELETE FROM " + tableName + " WHERE part = 20");
                return null;
            }).add(() -> {
                barrier.await(10L, TimeUnit.SECONDS);
                this.getQueryRunner().execute("DELETE FROM " + tableName + " WHERE part = 30");
                return null;
            }).build()).forEach(MoreFutures::getDone);
            ((QueryAssertions.QueryAssert)Assertions.assertThat((AssertProvider)this.query("SELECT * FROM " + tableName))).matches("VALUES (31, 40)");
            this.assertQuery("SELECT version, operation, isolation_level, is_blind_append FROM \"" + tableName + "$history\"", "VALUES\n    (0, 'CREATE TABLE AS SELECT', 'WriteSerializable', true),\n    (1, 'DELETE', 'WriteSerializable', false),\n    (2, 'DELETE', 'WriteSerializable', false),\n    (3, 'DELETE', 'WriteSerializable', false)\n");
        }
        finally {
            this.assertUpdate("DROP TABLE " + tableName);
            executor.shutdownNow();
            Assertions.assertThat((boolean)executor.awaitTermination(10L, TimeUnit.SECONDS)).isTrue();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testConcurrentDeletePushdownFromTheSamePartition() throws Exception {
        int threads = 3;
        CyclicBarrier barrier = new CyclicBarrier(threads);
        ExecutorService executor = Executors.newFixedThreadPool(threads);
        String tableName = "test_concurrent_delete_from_same_partition_" + TestingNames.randomNameSuffix();
        this.assertUpdate("CREATE TABLE " + tableName + " (a, part) WITH (partitioned_by = ARRAY['part']) AS VALUES (0, 10), (11, 20), (22, 30)", 3L);
        try {
            List futures = (List)IntStream.range(0, threads).mapToObj(threadNumber -> executor.submit(() -> {
                barrier.await(10L, TimeUnit.SECONDS);
                try {
                    this.getQueryRunner().execute("DELETE FROM " + tableName + "  WHERE part = 10");
                    return true;
                }
                catch (Exception e) {
                    RuntimeException trinoException = QueryAssertions.getTrinoExceptionCause((Throwable)e);
                    try {
                        Assertions.assertThat((Throwable)trinoException).hasMessage("Failed to write Delta Lake transaction log entry");
                    }
                    catch (Throwable verifyFailure) {
                        if (verifyFailure != e) {
                            verifyFailure.addSuppressed(e);
                        }
                        throw verifyFailure;
                    }
                    return false;
                }
            })).collect(ImmutableList.toImmutableList());
            long successfulDeletesCount = futures.stream().map(MoreFutures::getFutureValue).filter(success -> success).count();
            Assertions.assertThat((long)successfulDeletesCount).isGreaterThanOrEqualTo(1L);
            this.assertQuery("SELECT * FROM " + tableName, "VALUES (11, 20), (22, 30)");
            String string = "(%s, 'DELETE', 'WriteSerializable', false)";
            this.assertQuery("SELECT version, operation, isolation_level, is_blind_append FROM \"" + tableName + "$history\"", "VALUES (0, 'CREATE TABLE AS SELECT', 'WriteSerializable', true)" + LongStream.rangeClosed(1L, successfulDeletesCount).boxed().map(arg_0 -> TestDeltaLakeLocalConcurrentWritesTest.lambda$testConcurrentDeletePushdownFromTheSamePartition$3("(%s, 'DELETE', 'WriteSerializable', false)", arg_0)).collect(Collectors.joining(", ", ", ", "")));
        }
        finally {
            this.assertUpdate("DROP TABLE " + tableName);
            executor.shutdownNow();
            Assertions.assertThat((boolean)executor.awaitTermination(10L, TimeUnit.SECONDS)).isTrue();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testConcurrentTruncateReconciliationFailure() throws Exception {
        int threads = 3;
        CyclicBarrier barrier = new CyclicBarrier(threads);
        ExecutorService executor = Executors.newFixedThreadPool(threads);
        String tableName = "test_concurrent_delete_from_same_partition_" + TestingNames.randomNameSuffix();
        this.assertUpdate("CREATE TABLE " + tableName + " (a, part) WITH (partitioned_by = ARRAY['part']) AS VALUES (0, 10), (11, 20), (22, 30)", 3L);
        try {
            List futures = (List)IntStream.range(0, threads).mapToObj(threadNumber -> executor.submit(() -> {
                barrier.await(10L, TimeUnit.SECONDS);
                try {
                    this.getQueryRunner().execute("TRUNCATE TABLE " + tableName);
                    return true;
                }
                catch (Exception e) {
                    RuntimeException trinoException = QueryAssertions.getTrinoExceptionCause((Throwable)e);
                    try {
                        Assertions.assertThat((Throwable)trinoException).hasMessage("Failed to write Delta Lake transaction log entry");
                    }
                    catch (Throwable verifyFailure) {
                        if (verifyFailure != e) {
                            verifyFailure.addSuppressed(e);
                        }
                        throw verifyFailure;
                    }
                    return false;
                }
            })).collect(ImmutableList.toImmutableList());
            long successfulTruncatesCount = futures.stream().map(MoreFutures::getFutureValue).filter(success -> success).count();
            Assertions.assertThat((long)successfulTruncatesCount).isGreaterThanOrEqualTo(1L);
            ((QueryAssertions.QueryAssert)Assertions.assertThat((AssertProvider)this.query("SELECT * FROM " + tableName))).returnsEmptyResult();
            String string = "(%s, 'TRUNCATE', 'WriteSerializable', false)";
            this.assertQuery("SELECT version, operation, isolation_level, is_blind_append FROM \"" + tableName + "$history\"", "VALUES (0, 'CREATE TABLE AS SELECT', 'WriteSerializable', true)" + LongStream.rangeClosed(1L, successfulTruncatesCount).boxed().map(arg_0 -> TestDeltaLakeLocalConcurrentWritesTest.lambda$testConcurrentTruncateReconciliationFailure$3("(%s, 'TRUNCATE', 'WriteSerializable', false)", arg_0)).collect(Collectors.joining(", ", ", ", "")));
        }
        finally {
            this.assertUpdate("DROP TABLE " + tableName);
            executor.shutdownNow();
            Assertions.assertThat((boolean)executor.awaitTermination(10L, TimeUnit.SECONDS)).isTrue();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testConcurrentDeletePushdownAndBlindInsertsReconciliation() throws Exception {
        int threads = 3;
        CyclicBarrier barrier = new CyclicBarrier(threads);
        ExecutorService executor = Executors.newFixedThreadPool(threads);
        String tableName = "test_concurrent_delete_and_inserts_table_" + TestingNames.randomNameSuffix();
        this.assertUpdate("CREATE TABLE " + tableName + " (a, part)  WITH (partitioned_by = ARRAY['part']) AS VALUES (1, 10), (11, 20)", 2L);
        try {
            executor.invokeAll(ImmutableList.builder().add(() -> {
                barrier.await(10L, TimeUnit.SECONDS);
                this.getQueryRunner().execute("DELETE FROM " + tableName);
                return null;
            }).add(() -> {
                barrier.await(10L, TimeUnit.SECONDS);
                this.getQueryRunner().execute("INSERT INTO " + tableName + " VALUES (21, 30)");
                return null;
            }).add(() -> {
                barrier.await(10L, TimeUnit.SECONDS);
                this.getQueryRunner().execute("INSERT INTO " + tableName + " VALUES (31, 40)");
                return null;
            }).build()).forEach(MoreFutures::getDone);
            Assertions.assertThat((long)((Long)this.computeActual("SELECT sum(a) FROM " + tableName).getOnlyValue())).isIn(new Object[]{0L, 21L, 31L, 52L});
            this.assertQuery("SELECT operation, isolation_level FROM \"" + tableName + "$history\"", "VALUES\n    ('CREATE TABLE AS SELECT', 'WriteSerializable'),\n    ('DELETE', 'WriteSerializable'),\n    ('WRITE', 'WriteSerializable'),\n    ('WRITE', 'WriteSerializable')\n");
        }
        finally {
            this.assertUpdate("DROP TABLE " + tableName);
            executor.shutdownNow();
            Assertions.assertThat((boolean)executor.awaitTermination(10L, TimeUnit.SECONDS)).isTrue();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testConcurrentTruncateAndBlindInsertsReconciliation() throws Exception {
        int threads = 3;
        CyclicBarrier barrier = new CyclicBarrier(threads);
        ExecutorService executor = Executors.newFixedThreadPool(threads);
        String tableName = "test_concurrent_truncate_and_inserts_table_" + TestingNames.randomNameSuffix();
        this.assertUpdate("CREATE TABLE " + tableName + " (a, part)  WITH (partitioned_by = ARRAY['part']) AS VALUES (1, 10), (11, 20)", 2L);
        try {
            executor.invokeAll(ImmutableList.builder().add(() -> {
                barrier.await(10L, TimeUnit.SECONDS);
                this.getQueryRunner().execute("TRUNCATE TABLE " + tableName);
                return null;
            }).add(() -> {
                barrier.await(10L, TimeUnit.SECONDS);
                this.getQueryRunner().execute("INSERT INTO " + tableName + " VALUES (21, 30)");
                return null;
            }).add(() -> {
                barrier.await(10L, TimeUnit.SECONDS);
                this.getQueryRunner().execute("INSERT INTO " + tableName + " VALUES (31, 40)");
                return null;
            }).build()).forEach(MoreFutures::getDone);
            Assertions.assertThat((long)((Long)this.computeActual("SELECT sum(a) FROM " + tableName).getOnlyValue())).isIn(new Object[]{0L, 21L, 31L, 52L, 64L});
            this.assertQuery("SELECT operation, isolation_level, is_blind_append FROM \"" + tableName + "$history\"", "VALUES\n    ('CREATE TABLE AS SELECT', 'WriteSerializable', true),\n    ('TRUNCATE', 'WriteSerializable', false),\n    ('WRITE', 'WriteSerializable', true),\n    ('WRITE', 'WriteSerializable', true)\n");
        }
        finally {
            this.assertUpdate("DROP TABLE " + tableName);
            executor.shutdownNow();
            Assertions.assertThat((boolean)executor.awaitTermination(10L, TimeUnit.SECONDS)).isTrue();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testConcurrentDeletePushdownAndNonBlindInsertsReconciliation() throws Exception {
        int threads = 3;
        CyclicBarrier barrier = new CyclicBarrier(threads);
        ExecutorService executor = Executors.newFixedThreadPool(threads);
        String tableName = "test_concurrent_delete_and_inserts_table_" + TestingNames.randomNameSuffix();
        this.assertUpdate("CREATE TABLE " + tableName + " (a, part)  WITH (partitioned_by = ARRAY['part']) AS VALUES (1, 10), (11, 20), (21, 30)", 3L);
        this.assertUpdate("INSERT INTO " + tableName + " VALUES (2, 10)", 1L);
        try {
            executor.invokeAll(ImmutableList.builder().add(() -> {
                barrier.await(10L, TimeUnit.SECONDS);
                this.getQueryRunner().execute("DELETE FROM " + tableName + " WHERE part = 10");
                return null;
            }).add(() -> {
                barrier.await(10L, TimeUnit.SECONDS);
                this.getQueryRunner().execute("INSERT INTO " + tableName + " SELECT a + 1, part FROM " + tableName + " WHERE part = 20");
                return null;
            }).add(() -> {
                barrier.await(10L, TimeUnit.SECONDS);
                this.getQueryRunner().execute("DELETE FROM " + tableName + " WHERE part = 30");
                return null;
            }).build()).forEach(MoreFutures::getDone);
            ((QueryAssertions.QueryAssert)Assertions.assertThat((AssertProvider)this.query("SELECT * FROM " + tableName))).matches("VALUES (11, 20), (12, 20)");
            this.assertQuery("SELECT operation, isolation_level, is_blind_append FROM \"" + tableName + "$history\"", "VALUES\n    ('CREATE TABLE AS SELECT', 'WriteSerializable', true),\n    ('WRITE', 'WriteSerializable', true),\n    ('DELETE', 'WriteSerializable', false),\n    ('DELETE', 'WriteSerializable', false),\n    ('WRITE', 'WriteSerializable', false)\n");
        }
        finally {
            this.assertUpdate("DROP TABLE " + tableName);
            executor.shutdownNow();
            Assertions.assertThat((boolean)executor.awaitTermination(10L, TimeUnit.SECONDS)).isTrue();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testConcurrentSerializableDeletesPushdownReconciliationFailure() throws Exception {
        int threads = 5;
        CyclicBarrier barrier = new CyclicBarrier(threads);
        ExecutorService executor = Executors.newFixedThreadPool(threads);
        String tableName = "test_concurrent_serializable_delete_reconciliation" + TestingNames.randomNameSuffix();
        this.registerTableFromResources(tableName, "deltalake/serializable_partitioned_table", this.getQueryRunner());
        ((QueryAssertions.QueryAssert)Assertions.assertThat((AssertProvider)this.query("SELECT * FROM " + tableName))).matches("VALUES (0, 10), (33, 40)");
        try {
            List futures = (List)IntStream.range(0, threads).mapToObj(threadNumber -> executor.submit(() -> {
                barrier.await(10L, TimeUnit.SECONDS);
                try {
                    this.getQueryRunner().execute("DELETE FROM " + tableName + " WHERE part = 10");
                    return true;
                }
                catch (Exception e) {
                    RuntimeException trinoException = QueryAssertions.getTrinoExceptionCause((Throwable)e);
                    try {
                        Assertions.assertThat((Throwable)trinoException).hasMessage("Failed to write Delta Lake transaction log entry");
                    }
                    catch (Throwable verifyFailure) {
                        if (verifyFailure != e) {
                            verifyFailure.addSuppressed(e);
                        }
                        throw verifyFailure;
                    }
                    return false;
                }
            })).collect(ImmutableList.toImmutableList());
            long successfulDeletesCount = futures.stream().map(MoreFutures::getFutureValue).filter(success -> success).count();
            Assertions.assertThat((long)successfulDeletesCount).isGreaterThanOrEqualTo(1L);
            ((QueryAssertions.QueryAssert)Assertions.assertThat((AssertProvider)this.query("SELECT * FROM " + tableName))).matches("VALUES (33, 40)");
        }
        finally {
            this.assertUpdate("DROP TABLE " + tableName);
            executor.shutdownNow();
            Assertions.assertThat((boolean)executor.awaitTermination(10L, TimeUnit.SECONDS)).isTrue();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testConcurrentSerializableTruncateReconciliationFailure() throws Exception {
        int threads = 5;
        CyclicBarrier barrier = new CyclicBarrier(threads);
        ExecutorService executor = Executors.newFixedThreadPool(threads);
        String tableName = "test_concurrent_serializable_delete_reconciliation" + TestingNames.randomNameSuffix();
        this.registerTableFromResources(tableName, "deltalake/serializable_partitioned_table", this.getQueryRunner());
        ((QueryAssertions.QueryAssert)Assertions.assertThat((AssertProvider)this.query("SELECT * FROM " + tableName))).matches("VALUES (0, 10), (33, 40)");
        try {
            List futures = (List)IntStream.range(0, threads).mapToObj(threadNumber -> executor.submit(() -> {
                barrier.await(10L, TimeUnit.SECONDS);
                try {
                    this.getQueryRunner().execute("TRUNCATE TABLE " + tableName);
                    return true;
                }
                catch (Exception e) {
                    RuntimeException trinoException = QueryAssertions.getTrinoExceptionCause((Throwable)e);
                    try {
                        Assertions.assertThat((Throwable)trinoException).hasMessage("Failed to write Delta Lake transaction log entry");
                    }
                    catch (Throwable verifyFailure) {
                        if (verifyFailure != e) {
                            verifyFailure.addSuppressed(e);
                        }
                        throw verifyFailure;
                    }
                    return false;
                }
            })).collect(ImmutableList.toImmutableList());
            long successfulTruncatesCount = futures.stream().map(MoreFutures::getFutureValue).filter(success -> success).count();
            Assertions.assertThat((long)successfulTruncatesCount).isGreaterThanOrEqualTo(1L);
            ((QueryAssertions.QueryAssert)Assertions.assertThat((AssertProvider)this.query("SELECT * FROM " + tableName))).returnsEmptyResult();
        }
        finally {
            this.assertUpdate("DROP TABLE " + tableName);
            executor.shutdownNow();
            Assertions.assertThat((boolean)executor.awaitTermination(10L, TimeUnit.SECONDS)).isTrue();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testConcurrentUpdateReconciliation() throws Exception {
        int threads = 3;
        CyclicBarrier barrier = new CyclicBarrier(threads);
        ExecutorService executor = Executors.newFixedThreadPool(threads);
        String tableName = "test_concurrent_updates_table_" + TestingNames.randomNameSuffix();
        this.assertUpdate("CREATE TABLE " + tableName + " (a, part)  WITH (partitioned_by = ARRAY['part']) AS VALUES (1, 10), (11, 20), (21, 30), (31, 40)", 4L);
        try {
            executor.invokeAll(ImmutableList.builder().add(() -> {
                barrier.await(10L, TimeUnit.SECONDS);
                this.getQueryRunner().execute("UPDATE " + tableName + " SET a = a + 1 WHERE part = 10");
                return null;
            }).add(() -> {
                barrier.await(10L, TimeUnit.SECONDS);
                this.getQueryRunner().execute("UPDATE " + tableName + " SET a = a + 1  WHERE part = 20");
                return null;
            }).add(() -> {
                barrier.await(10L, TimeUnit.SECONDS);
                this.getQueryRunner().execute("UPDATE " + tableName + " SET a = a + 1  WHERE part = 30");
                return null;
            }).build()).forEach(MoreFutures::getDone);
            ((QueryAssertions.QueryAssert)Assertions.assertThat((AssertProvider)this.query("SELECT * FROM " + tableName))).matches("VALUES (2, 10), (12, 20), (22, 30), (31, 40)");
            this.assertQuery("SELECT version, operation, isolation_level, is_blind_append FROM \"" + tableName + "$history\"", "VALUES\n    (0, 'CREATE TABLE AS SELECT', 'WriteSerializable', true),\n    (1, 'MERGE', 'WriteSerializable', false),\n    (2, 'MERGE', 'WriteSerializable', false),\n    (3, 'MERGE', 'WriteSerializable', false)\n");
        }
        finally {
            this.assertUpdate("DROP TABLE " + tableName);
            executor.shutdownNow();
            Assertions.assertThat((boolean)executor.awaitTermination(10L, TimeUnit.SECONDS)).isTrue();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testConcurrentDeleteReconciliation() throws Exception {
        int threads = 3;
        CyclicBarrier barrier = new CyclicBarrier(threads);
        ExecutorService executor = Executors.newFixedThreadPool(threads);
        String tableName = "test_concurrent_deletes_table_" + TestingNames.randomNameSuffix();
        this.assertUpdate("CREATE TABLE " + tableName + " (a, part)  WITH (partitioned_by = ARRAY['part']) AS VALUES (1, 10), (11, 20), (21, 30), (31, 40)", 4L);
        this.assertUpdate("INSERT INTO " + tableName + " VALUES (2, 10)", 1L);
        try {
            executor.invokeAll(ImmutableList.builder().add(() -> {
                barrier.await(10L, TimeUnit.SECONDS);
                this.getQueryRunner().execute("DELETE FROM " + tableName + " WHERE part = 10 AND a IN (1, 2)");
                return null;
            }).add(() -> {
                barrier.await(10L, TimeUnit.SECONDS);
                this.getQueryRunner().execute("DELETE FROM " + tableName + " WHERE part = 20 AND a = 11");
                return null;
            }).add(() -> {
                barrier.await(10L, TimeUnit.SECONDS);
                this.getQueryRunner().execute("DELETE FROM " + tableName + " WHERE part = 30 AND a = 21");
                return null;
            }).build()).forEach(MoreFutures::getDone);
            ((QueryAssertions.QueryAssert)Assertions.assertThat((AssertProvider)this.query("SELECT * FROM " + tableName))).matches("VALUES (31, 40)");
            this.assertQuery("SELECT version, operation, isolation_level, is_blind_append FROM \"" + tableName + "$history\"", "VALUES\n    (0, 'CREATE TABLE AS SELECT', 'WriteSerializable', true),\n    (1, 'WRITE', 'WriteSerializable', true),\n    (2, 'MERGE', 'WriteSerializable', false),\n    (3, 'MERGE', 'WriteSerializable', false),\n    (4, 'MERGE', 'WriteSerializable', false)\n");
        }
        finally {
            this.assertUpdate("DROP TABLE " + tableName);
            executor.shutdownNow();
            Assertions.assertThat((boolean)executor.awaitTermination(10L, TimeUnit.SECONDS)).isTrue();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testConcurrentMergeReconciliation() throws Exception {
        int threads = 3;
        CyclicBarrier barrier = new CyclicBarrier(threads);
        ExecutorService executor = Executors.newFixedThreadPool(threads);
        String tableName = "test_concurrent_merges_table_" + TestingNames.randomNameSuffix();
        this.assertUpdate("CREATE TABLE " + tableName + " (a, part)  WITH (partitioned_by = ARRAY['part']) AS VALUES (1, 10), (11, 20), (21, 30), (31, 40)", 4L);
        this.assertUpdate("INSERT INTO " + tableName + " VALUES (22, 30)", 1L);
        try {
            executor.invokeAll(ImmutableList.builder().add(() -> {
                barrier.await(10L, TimeUnit.SECONDS);
                this.getQueryRunner().execute("MERGE INTO %s t USING (VALUES (12, 20)) AS s(a, part)\n  ON (FALSE)\n    WHEN NOT MATCHED THEN INSERT (a, part) VALUES(s.a, s.part)\n".formatted(tableName));
                return null;
            }).add(() -> {
                barrier.await(10L, TimeUnit.SECONDS);
                this.getQueryRunner().execute("MERGE INTO %s t USING (VALUES (21, 30)) AS s(a, part)\n  ON (t.part = s.part)\n    WHEN MATCHED THEN DELETE\n".formatted(tableName));
                return null;
            }).add(() -> {
                barrier.await(10L, TimeUnit.SECONDS);
                this.getQueryRunner().execute("MERGE INTO %s t USING (VALUES (32, 40)) AS s(a, part)\n  ON (t.part = s.part)\n    WHEN MATCHED THEN UPDATE SET a = s.a\n".formatted(tableName));
                return null;
            }).build()).forEach(MoreFutures::getDone);
            ((QueryAssertions.QueryAssert)Assertions.assertThat((AssertProvider)this.query("SELECT * FROM " + tableName))).matches("VALUES (1, 10), (11, 20), (12, 20), (32, 40)");
            this.assertQuery("SELECT operation, isolation_level, is_blind_append FROM \"" + tableName + "$history\"", "VALUES\n    ('CREATE TABLE AS SELECT', 'WriteSerializable', true),\n    ('WRITE', 'WriteSerializable', true),\n    ('MERGE', 'WriteSerializable', true),\n    ('MERGE', 'WriteSerializable', false),\n    ('MERGE', 'WriteSerializable', false)\n");
        }
        finally {
            this.assertUpdate("DROP TABLE " + tableName);
            executor.shutdownNow();
            Assertions.assertThat((boolean)executor.awaitTermination(10L, TimeUnit.SECONDS)).isTrue();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testConcurrentSerializableMergeReconciliationFailure() throws Exception {
        int threads = 5;
        CyclicBarrier barrier = new CyclicBarrier(threads);
        ExecutorService executor = Executors.newFixedThreadPool(threads);
        String tableName = "test_concurrent_serializable_merge_reconciliation" + TestingNames.randomNameSuffix();
        this.registerTableFromResources(tableName, "deltalake/serializable_partitioned_table", this.getQueryRunner());
        ((QueryAssertions.QueryAssert)Assertions.assertThat((AssertProvider)this.query("SELECT * FROM " + tableName))).matches("VALUES (0, 10), (33, 40)");
        try {
            List futures = (List)IntStream.range(0, threads).mapToObj(threadNumber -> executor.submit(() -> {
                barrier.await(10L, TimeUnit.SECONDS);
                try {
                    this.getQueryRunner().execute("MERGE INTO %s t USING (VALUES (12, 20)) AS s(a, part)\n  ON (FALSE)\n    WHEN NOT MATCHED THEN INSERT (a, part) VALUES(s.a, s.part)\n".formatted(tableName));
                    return true;
                }
                catch (Exception e) {
                    RuntimeException trinoException = QueryAssertions.getTrinoExceptionCause((Throwable)e);
                    try {
                        Assertions.assertThat((Throwable)trinoException).hasMessage("Failed to write Delta Lake transaction log entry");
                    }
                    catch (Throwable verifyFailure) {
                        if (verifyFailure != e) {
                            verifyFailure.addSuppressed(e);
                        }
                        throw verifyFailure;
                    }
                    return false;
                }
            })).collect(ImmutableList.toImmutableList());
            long successfulMergeOperationsCount = futures.stream().map(MoreFutures::getFutureValue).filter(success -> success).count();
            Assertions.assertThat((long)successfulMergeOperationsCount).isGreaterThanOrEqualTo(1L);
            Assertions.assertThat((long)((Long)this.computeScalar("SELECT count(*) FROM " + tableName + " WHERE part = 20"))).isGreaterThanOrEqualTo(1L);
        }
        finally {
            this.assertUpdate("DROP TABLE " + tableName);
            executor.shutdownNow();
            Assertions.assertThat((boolean)executor.awaitTermination(10L, TimeUnit.SECONDS)).isTrue();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testConcurrentDeleteAndNonBlindInsertsReconciliation() throws Exception {
        int threads = 3;
        CyclicBarrier barrier = new CyclicBarrier(threads);
        ExecutorService executor = Executors.newFixedThreadPool(threads);
        String tableName = "test_concurrent_delete_and_inserts_table_" + TestingNames.randomNameSuffix();
        this.assertUpdate("CREATE TABLE " + tableName + " (a, part)  WITH (partitioned_by = ARRAY['part']) AS VALUES (1, 10), (11, 20), (21, 30)", 3L);
        this.assertUpdate("INSERT INTO " + tableName + " VALUES (2, 10)", 1L);
        try {
            executor.invokeAll(ImmutableList.builder().add(() -> {
                barrier.await(10L, TimeUnit.SECONDS);
                this.getQueryRunner().execute("DELETE FROM " + tableName + " WHERE part = 10 AND a IN (1, 2)");
                return null;
            }).add(() -> {
                barrier.await(10L, TimeUnit.SECONDS);
                this.getQueryRunner().execute("INSERT INTO " + tableName + " SELECT a + 1, part FROM " + tableName + " WHERE part = 20");
                return null;
            }).add(() -> {
                barrier.await(10L, TimeUnit.SECONDS);
                this.getQueryRunner().execute("DELETE FROM " + tableName + " WHERE part = 30 AND a BETWEEN 20 AND 30");
                return null;
            }).build()).forEach(MoreFutures::getDone);
            ((QueryAssertions.QueryAssert)Assertions.assertThat((AssertProvider)this.query("SELECT * FROM " + tableName))).matches("VALUES (11, 20), (12, 20)");
            this.assertQuery("SELECT operation, isolation_level, is_blind_append FROM \"" + tableName + "$history\"", "VALUES\n    ('CREATE TABLE AS SELECT', 'WriteSerializable', true),\n    ('WRITE', 'WriteSerializable', true),\n    ('MERGE', 'WriteSerializable', false),\n    ('MERGE', 'WriteSerializable', false),\n    ('WRITE', 'WriteSerializable', false)\n");
        }
        finally {
            this.assertUpdate("DROP TABLE " + tableName);
            executor.shutdownNow();
            Assertions.assertThat((boolean)executor.awaitTermination(10L, TimeUnit.SECONDS)).isTrue();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testConcurrentDeleteAndBlindInsertsReconciliation() throws Exception {
        int threads = 3;
        CyclicBarrier barrier = new CyclicBarrier(threads);
        ExecutorService executor = Executors.newFixedThreadPool(threads);
        String tableName = "test_concurrent_delete_and_inserts_table_" + TestingNames.randomNameSuffix();
        this.assertUpdate("CREATE TABLE " + tableName + " (a, part)  WITH (partitioned_by = ARRAY['part']) AS VALUES (1, 10), (11, 20)", 2L);
        try {
            executor.invokeAll(ImmutableList.builder().add(() -> {
                barrier.await(10L, TimeUnit.SECONDS);
                this.getQueryRunner().execute("DELETE FROM " + tableName + " WHERE a > 10");
                return null;
            }).add(() -> {
                barrier.await(10L, TimeUnit.SECONDS);
                this.getQueryRunner().execute("INSERT INTO " + tableName + " VALUES (8, 10)");
                return null;
            }).add(() -> {
                barrier.await(10L, TimeUnit.SECONDS);
                this.getQueryRunner().execute("INSERT INTO " + tableName + " VALUES (21, 30)");
                return null;
            }).build()).forEach(MoreFutures::getDone);
            Assertions.assertThat((long)((Long)this.computeActual("SELECT sum(a) FROM " + tableName).getOnlyValue())).isIn(new Object[]{1L, 9L, 22L, 30L});
            this.assertQuery("SELECT operation, isolation_level FROM \"" + tableName + "$history\"", "VALUES\n    ('CREATE TABLE AS SELECT', 'WriteSerializable'),\n    ('MERGE', 'WriteSerializable'),\n    ('WRITE', 'WriteSerializable'),\n    ('WRITE', 'WriteSerializable')\n");
        }
        finally {
            this.assertUpdate("DROP TABLE " + tableName);
            executor.shutdownNow();
            Assertions.assertThat((boolean)executor.awaitTermination(10L, TimeUnit.SECONDS)).isTrue();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testConcurrentUpdateAndBlindInsertsReconciliation() throws Exception {
        int threads = 3;
        CyclicBarrier barrier = new CyclicBarrier(threads);
        ExecutorService executor = Executors.newFixedThreadPool(threads);
        String tableName = "test_concurrent_update_and_inserts_table_" + TestingNames.randomNameSuffix();
        this.assertUpdate("CREATE TABLE " + tableName + " (a, part)  WITH (partitioned_by = ARRAY['part']) AS VALUES (1, 10), (11, 20)", 2L);
        try {
            executor.invokeAll(ImmutableList.builder().add(() -> {
                barrier.await(10L, TimeUnit.SECONDS);
                this.getQueryRunner().execute("UPDATE " + tableName + " SET a = a + 1");
                return null;
            }).add(() -> {
                barrier.await(10L, TimeUnit.SECONDS);
                this.getQueryRunner().execute("INSERT INTO " + tableName + " VALUES (13, 20)");
                return null;
            }).add(() -> {
                barrier.await(10L, TimeUnit.SECONDS);
                this.getQueryRunner().execute("INSERT INTO " + tableName + " VALUES (21, 30)");
                return null;
            }).build()).forEach(MoreFutures::getDone);
            Assertions.assertThat((long)((Long)this.computeActual("SELECT sum(a) FROM " + tableName).getOnlyValue())).isIn(new Object[]{48L, 49L, 49L, 50L});
            this.assertQuery("SELECT operation, isolation_level FROM \"" + tableName + "$history\"", "VALUES\n    ('CREATE TABLE AS SELECT', 'WriteSerializable'),\n    ('MERGE', 'WriteSerializable'),\n    ('WRITE', 'WriteSerializable'),\n    ('WRITE', 'WriteSerializable')\n");
        }
        finally {
            this.assertUpdate("DROP TABLE " + tableName);
            executor.shutdownNow();
            Assertions.assertThat((boolean)executor.awaitTermination(10L, TimeUnit.SECONDS)).isTrue();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testConcurrentMergeAndBlindInsertsReconciliation() throws Exception {
        int threads = 3;
        CyclicBarrier barrier = new CyclicBarrier(threads);
        ExecutorService executor = Executors.newFixedThreadPool(threads);
        String tableName = "test_concurrent_merge_and_inserts_table_" + TestingNames.randomNameSuffix();
        this.assertUpdate("CREATE TABLE " + tableName + " (a, part)  WITH (partitioned_by = ARRAY['part']) AS VALUES (1, 10), (11, 20)", 2L);
        try {
            executor.invokeAll(ImmutableList.builder().add(() -> {
                barrier.await(10L, TimeUnit.SECONDS);
                this.getQueryRunner().execute("MERGE INTO %s t USING (VALUES (11, 20), (8, 10), (21, 30)) AS s(a, part)\n  ON (t.a = s.a AND t.part = s.part)\n    WHEN MATCHED THEN DELETE\n".formatted(tableName));
                return null;
            }).add(() -> {
                barrier.await(10L, TimeUnit.SECONDS);
                this.getQueryRunner().execute("INSERT INTO " + tableName + " VALUES (8, 10)");
                return null;
            }).add(() -> {
                barrier.await(10L, TimeUnit.SECONDS);
                this.getQueryRunner().execute("INSERT INTO " + tableName + " VALUES (21, 30)");
                return null;
            }).build()).forEach(MoreFutures::getDone);
            Assertions.assertThat((long)((Long)this.computeActual("SELECT sum(a) FROM " + tableName).getOnlyValue())).isIn(new Object[]{1L, 9L, 22L, 30L});
            this.assertQuery("SELECT operation, isolation_level FROM \"" + tableName + "$history\"", "VALUES\n    ('CREATE TABLE AS SELECT', 'WriteSerializable'),\n    ('MERGE', 'WriteSerializable'),\n    ('WRITE', 'WriteSerializable'),\n    ('WRITE', 'WriteSerializable')\n");
        }
        finally {
            this.assertUpdate("DROP TABLE " + tableName);
            executor.shutdownNow();
            Assertions.assertThat((boolean)executor.awaitTermination(10L, TimeUnit.SECONDS)).isTrue();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testConcurrentDeleteAndDeletePushdownAndNonBlindInsertsReconciliation() throws Exception {
        int threads = 3;
        CyclicBarrier barrier = new CyclicBarrier(threads);
        ExecutorService executor = Executors.newFixedThreadPool(threads);
        String tableName = "test_concurrent_delete_and_inserts_table_" + TestingNames.randomNameSuffix();
        this.assertUpdate("CREATE TABLE " + tableName + " (a, part)  WITH (partitioned_by = ARRAY['part']) AS VALUES (1, 10), (11, 20), (21, 30)", 3L);
        this.assertUpdate("INSERT INTO " + tableName + " VALUES (2, 10)", 1L);
        try {
            executor.invokeAll(ImmutableList.builder().add(() -> {
                barrier.await(10L, TimeUnit.SECONDS);
                this.getQueryRunner().execute("DELETE FROM " + tableName + " WHERE part = 10 AND a IN (1, 2)");
                return null;
            }).add(() -> {
                barrier.await(10L, TimeUnit.SECONDS);
                this.getQueryRunner().execute("INSERT INTO " + tableName + " SELECT a + 1, part FROM " + tableName + " WHERE part = 20");
                return null;
            }).add(() -> {
                barrier.await(10L, TimeUnit.SECONDS);
                this.getQueryRunner().execute("DELETE FROM " + tableName + " WHERE part = 30");
                return null;
            }).build()).forEach(MoreFutures::getDone);
            ((QueryAssertions.QueryAssert)Assertions.assertThat((AssertProvider)this.query("SELECT * FROM " + tableName))).matches("VALUES (11, 20), (12, 20)");
            this.assertQuery("SELECT operation, isolation_level, is_blind_append FROM \"" + tableName + "$history\"", "VALUES\n    ('CREATE TABLE AS SELECT', 'WriteSerializable', true),\n    ('WRITE', 'WriteSerializable', true),\n    ('MERGE', 'WriteSerializable', false),\n    ('DELETE', 'WriteSerializable', false),\n    ('WRITE', 'WriteSerializable', false)\n");
        }
        finally {
            this.assertUpdate("DROP TABLE " + tableName);
            executor.shutdownNow();
            Assertions.assertThat((boolean)executor.awaitTermination(10L, TimeUnit.SECONDS)).isTrue();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testConcurrentOptimizeReconciliation() throws Exception {
        int threads = 3;
        CyclicBarrier barrier = new CyclicBarrier(threads);
        ExecutorService executor = Executors.newFixedThreadPool(threads);
        String tableName = "test_concurrent_optimize_table_" + TestingNames.randomNameSuffix();
        this.assertUpdate("CREATE TABLE " + tableName + " (a, part)  WITH (partitioned_by = ARRAY['part']) AS VALUES (1, 10), (11, 20), (21, 30)", 3L);
        this.assertUpdate("INSERT INTO " + tableName + " VALUES (2, 10), (12, 20), (22, 30)", 3L);
        Set<String> beforeOptimizeActiveFiles = this.getActiveFiles(tableName);
        try {
            executor.invokeAll(ImmutableList.builder().add(() -> {
                barrier.await(10L, TimeUnit.SECONDS);
                this.getQueryRunner().execute("ALTER TABLE " + tableName + " EXECUTE OPTIMIZE WHERE part = 10");
                return null;
            }).add(() -> {
                barrier.await(10L, TimeUnit.SECONDS);
                this.getQueryRunner().execute("ALTER TABLE " + tableName + " EXECUTE OPTIMIZE WHERE part = 20");
                return null;
            }).add(() -> {
                barrier.await(10L, TimeUnit.SECONDS);
                this.getQueryRunner().execute("ALTER TABLE " + tableName + " EXECUTE OPTIMIZE WHERE part = 30");
                return null;
            }).build()).forEach(MoreFutures::getDone);
            Assertions.assertThat(beforeOptimizeActiveFiles).isNotEqualTo(this.getActiveFiles(tableName));
            ((QueryAssertions.QueryAssert)Assertions.assertThat((AssertProvider)this.query("SELECT * FROM " + tableName))).matches("VALUES (1, 10), (2, 10), (11, 20), (12, 20), (21, 30), (22, 30)");
            this.assertQuery("SELECT operation, isolation_level, is_blind_append FROM \"" + tableName + "$history\"", "VALUES\n    ('CREATE TABLE AS SELECT', 'WriteSerializable', true),\n    ('WRITE', 'WriteSerializable', true),\n    ('OPTIMIZE', 'WriteSerializable', false),\n    ('OPTIMIZE', 'WriteSerializable', false),\n    ('OPTIMIZE', 'WriteSerializable', false)\n");
        }
        finally {
            this.assertUpdate("DROP TABLE " + tableName);
            executor.shutdownNow();
            Assertions.assertThat((boolean)executor.awaitTermination(10L, TimeUnit.SECONDS)).isTrue();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testConcurrentSerializableOptimizeReconciliationFailure() throws Exception {
        int threads = 5;
        CyclicBarrier barrier = new CyclicBarrier(threads);
        ExecutorService executor = Executors.newFixedThreadPool(threads);
        String tableName = "test_concurrent_serializable_optimize_reconciliation" + TestingNames.randomNameSuffix();
        this.registerTableFromResources(tableName, "deltalake/serializable_partitioned_table", this.getQueryRunner());
        ((QueryAssertions.QueryAssert)Assertions.assertThat((AssertProvider)this.query("SELECT * FROM " + tableName))).matches("VALUES (0, 10), (33, 40)");
        try {
            List futures = (List)IntStream.range(0, threads).mapToObj(threadNumber -> executor.submit(() -> {
                barrier.await(10L, TimeUnit.SECONDS);
                try {
                    this.getQueryRunner().execute("ALTER TABLE " + tableName + " EXECUTE OPTIMIZE WHERE part = 10");
                    return true;
                }
                catch (Exception e) {
                    RuntimeException trinoException = QueryAssertions.getTrinoExceptionCause((Throwable)e);
                    try {
                        Assertions.assertThat((Throwable)trinoException).hasMessage("Failed to write Delta Lake transaction log entry");
                    }
                    catch (Throwable verifyFailure) {
                        if (verifyFailure != e) {
                            verifyFailure.addSuppressed(e);
                        }
                        throw verifyFailure;
                    }
                    return false;
                }
            })).collect(ImmutableList.toImmutableList());
            long successfulOptimizeOperationsCount = futures.stream().map(MoreFutures::getFutureValue).filter(success -> success).count();
            Assertions.assertThat((long)successfulOptimizeOperationsCount).isGreaterThanOrEqualTo(1L);
            ((QueryAssertions.QueryAssert)Assertions.assertThat((AssertProvider)this.query("SELECT * FROM " + tableName))).matches("VALUES  (0, 10), (33, 40)");
        }
        finally {
            this.assertUpdate("DROP TABLE " + tableName);
            executor.shutdownNow();
            Assertions.assertThat((boolean)executor.awaitTermination(10L, TimeUnit.SECONDS)).isTrue();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testConcurrentOptimizeAndBlindInsertsReconciliation() throws Exception {
        int threads = 3;
        CyclicBarrier barrier = new CyclicBarrier(threads);
        ExecutorService executor = Executors.newFixedThreadPool(threads);
        String tableName = "test_concurrent_optimize_and_inserts_table_" + TestingNames.randomNameSuffix();
        this.assertUpdate("CREATE TABLE " + tableName + " (a, part)  WITH (partitioned_by = ARRAY['part']) AS VALUES (1, 10), (21, 30)", 2L);
        this.assertUpdate("INSERT INTO " + tableName + " VALUES (22, 30)", 1L);
        Set beforeOptimizeActiveFilesOnPartition30 = (Set)this.computeActual("SELECT DISTINCT \"$path\" FROM " + tableName + " WHERE part = 30").getOnlyColumnAsSet().stream().map(String.class::cast).collect(ImmutableSet.toImmutableSet());
        try {
            executor.invokeAll(ImmutableList.builder().add(() -> {
                barrier.await(10L, TimeUnit.SECONDS);
                this.getQueryRunner().execute("ALTER TABLE " + tableName + " EXECUTE OPTIMIZE WHERE part > 20");
                return null;
            }).add(() -> {
                barrier.await(10L, TimeUnit.SECONDS);
                this.getQueryRunner().execute("INSERT INTO " + tableName + " VALUES (8, 10)");
                return null;
            }).add(() -> {
                barrier.await(10L, TimeUnit.SECONDS);
                this.getQueryRunner().execute("INSERT INTO " + tableName + " VALUES (11, 20)");
                return null;
            }).build()).forEach(MoreFutures::getDone);
            Set afterOptimizeActiveFilesOnPartition30 = (Set)this.computeActual("SELECT DISTINCT \"$path\" FROM " + tableName + " WHERE part = 30").getOnlyColumnAsSet().stream().map(String.class::cast).collect(ImmutableSet.toImmutableSet());
            Assertions.assertThat((Collection)beforeOptimizeActiveFilesOnPartition30).isNotEqualTo((Object)afterOptimizeActiveFilesOnPartition30);
            ((QueryAssertions.QueryAssert)Assertions.assertThat((AssertProvider)this.query("SELECT * FROM " + tableName))).matches("VALUES  (1, 10), (8, 10), (11, 20), (21, 30), (22, 30)");
            this.assertQuery("SELECT operation, isolation_level FROM \"" + tableName + "$history\"", "VALUES\n    ('CREATE TABLE AS SELECT', 'WriteSerializable'),\n    ('OPTIMIZE', 'WriteSerializable'),\n    ('WRITE', 'WriteSerializable'),\n    ('WRITE', 'WriteSerializable'),\n    ('WRITE', 'WriteSerializable')\n");
        }
        finally {
            this.assertUpdate("DROP TABLE " + tableName);
            executor.shutdownNow();
            Assertions.assertThat((boolean)executor.awaitTermination(10L, TimeUnit.SECONDS)).isTrue();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testConcurrentOptimizeAndNonBlindInsertsReconciliation() throws Exception {
        int threads = 3;
        CyclicBarrier barrier = new CyclicBarrier(threads);
        ExecutorService executor = Executors.newFixedThreadPool(threads);
        String tableName = "test_concurrent_optimize_and_inserts_table_" + TestingNames.randomNameSuffix();
        this.assertUpdate("CREATE TABLE " + tableName + " (a, part)  WITH (partitioned_by = ARRAY['part']) AS VALUES (1, 10), (11, 20), (21, 30)", 3L);
        this.assertUpdate("INSERT INTO " + tableName + " VALUES (2, 10)", 1L);
        Set beforeOptimizeActiveFilesOnPartition10 = (Set)this.computeActual("SELECT DISTINCT \"$path\" FROM " + tableName + " WHERE part = 10").getOnlyColumnAsSet().stream().map(String.class::cast).collect(ImmutableSet.toImmutableSet());
        try {
            executor.invokeAll(ImmutableList.builder().add(() -> {
                barrier.await(10L, TimeUnit.SECONDS);
                this.getQueryRunner().execute("ALTER TABLE " + tableName + " EXECUTE OPTIMIZE WHERE part = 10");
                return null;
            }).add(() -> {
                barrier.await(10L, TimeUnit.SECONDS);
                this.getQueryRunner().execute("INSERT INTO " + tableName + " SELECT a + 1, part FROM " + tableName + " WHERE part = 20");
                return null;
            }).add(() -> {
                barrier.await(10L, TimeUnit.SECONDS);
                this.getQueryRunner().execute("DELETE FROM " + tableName + " WHERE part = 30 AND a BETWEEN 20 AND 30");
                return null;
            }).build()).forEach(MoreFutures::getDone);
            Set afterOptimizeActiveFilesOnPartition10 = (Set)this.computeActual("SELECT DISTINCT \"$path\" FROM " + tableName + " WHERE part = 10").getOnlyColumnAsSet().stream().map(String.class::cast).collect(ImmutableSet.toImmutableSet());
            Assertions.assertThat((Collection)beforeOptimizeActiveFilesOnPartition10).isNotEqualTo((Object)afterOptimizeActiveFilesOnPartition10);
            ((QueryAssertions.QueryAssert)Assertions.assertThat((AssertProvider)this.query("SELECT * FROM " + tableName))).matches("VALUES (1, 10), (2, 10), (11, 20), (12, 20)");
            this.assertQuery("SELECT operation, isolation_level, is_blind_append FROM \"" + tableName + "$history\"", "VALUES\n    ('CREATE TABLE AS SELECT', 'WriteSerializable', true),\n    ('WRITE', 'WriteSerializable', true),\n    ('OPTIMIZE', 'WriteSerializable', false),\n    ('MERGE', 'WriteSerializable', false),\n    ('WRITE', 'WriteSerializable', false)\n");
        }
        finally {
            this.assertUpdate("DROP TABLE " + tableName);
            executor.shutdownNow();
            Assertions.assertThat((boolean)executor.awaitTermination(10L, TimeUnit.SECONDS)).isTrue();
        }
    }

    protected void registerTableFromResources(String table, String resourcePath, QueryRunner queryRunner) throws IOException {
        TrinoFileSystem fileSystem = TestingDeltaLakeUtils.getConnectorService(queryRunner, TrinoFileSystemFactory.class).create(ConnectorIdentity.ofUser((String)"test"));
        String tableLocation = "local:///" + table;
        fileSystem.createDirectory(Location.of((String)tableLocation));
        try {
            List resources = (List)ClassPath.from((ClassLoader)((Object)((Object)this)).getClass().getClassLoader()).getResources().stream().filter(resourceInfo -> resourceInfo.getResourceName().startsWith(resourcePath + "/")).collect(ImmutableList.toImmutableList());
            for (ClassPath.ResourceInfo resourceInfo2 : resources) {
                String fileName = resourceInfo2.getResourceName().replaceFirst("^" + Pattern.quote(resourcePath), Matcher.quoteReplacement(tableLocation));
                byte[] bytes = resourceInfo2.asByteSource().read();
                TrinoOutputFile trinoOutputFile = fileSystem.newOutputFile(Location.of((String)fileName));
                trinoOutputFile.createOrOverwrite(bytes);
            }
        }
        catch (IOException e) {
            throw new UncheckedIOException(e);
        }
        queryRunner.execute(String.format("CALL system.register_table(CURRENT_SCHEMA, '%s', '%s')", table, tableLocation));
    }

    private Set<String> getActiveFiles(String tableName) {
        return (Set)this.computeActual("SELECT DISTINCT \"$path\" FROM " + tableName).getOnlyColumnAsSet().stream().map(String.class::cast).collect(ImmutableSet.toImmutableSet());
    }

    private static /* synthetic */ String lambda$testConcurrentTruncateReconciliationFailure$3(String rec$, Object xva$0) {
        return "(%s, 'TRUNCATE', 'WriteSerializable', false)".formatted(xva$0);
    }

    private static /* synthetic */ String lambda$testConcurrentDeletePushdownFromTheSamePartition$3(String rec$, Object xva$0) {
        return "(%s, 'DELETE', 'WriteSerializable', false)".formatted(xva$0);
    }

    private static /* synthetic */ String lambda$testConcurrentInsertsSelectingFromTheSamePartition$3(String rec$, Object xva$0) {
        return "(%d, 10)".formatted(xva$0);
    }

    private static /* synthetic */ String lambda$testConcurrentInsertsSelectingFromTheSameTable$3(String rec$, Object xva$0) {
        return "(%d, 10)".formatted(xva$0);
    }
}

