/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink;

import java.io.Closeable;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.flink.CatalogITCaseBase;
import org.apache.paimon.flink.query.RemoteTableQuery;
import org.apache.paimon.flink.service.QueryService;
import org.apache.paimon.io.DataFileTestUtils;
import org.apache.paimon.service.ServiceManager;
import org.apache.paimon.service.network.stats.DisabledServiceRequestStats;
import org.apache.paimon.service.network.stats.ServiceRequestStats;
import org.apache.paimon.service.server.KvQueryServer;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.query.LocalTableQuery;
import org.apache.paimon.table.query.TableQuery;
import org.apache.paimon.table.sink.BatchTableCommit;
import org.apache.paimon.table.sink.BatchTableWrite;
import org.apache.paimon.table.sink.BatchWriteBuilder;
import org.apache.paimon.table.sink.CommitMessageImpl;
import org.apache.paimon.utils.BlockingIterator;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ObjectAssert;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

public class RemoteLookupJoinITCase
extends CatalogITCaseBase {
    @Override
    public List<String> ddl() {
        return Collections.singletonList("CREATE TABLE T (i INT, `proctime` AS PROCTIME())");
    }

    @Override
    protected int defaultParallelism() {
        return 1;
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    public void testQueryServiceLookup(boolean isNamedArgument) throws Exception {
        this.sql("CREATE TABLE DIM (k INT PRIMARY KEY NOT ENFORCED, v INT) WITH ('bucket' = '2', 'continuous.discovery-interval' = '1ms')", new Object[0]);
        CloseableIterator<Row> service = isNamedArgument ? this.streamSqlIter("CALL sys.query_service(`table` => 'default.DIM', parallelism => 2)", new Object[0]) : this.streamSqlIter("CALL sys.query_service('default.DIM', 2)", new Object[0]);
        RemoteTableQuery query = new RemoteTableQuery((Table)this.paimonTable("DIM"));
        this.sql("INSERT INTO DIM VALUES (1, 11), (2, 22), (3, 33), (4, 44)", new Object[0]);
        Thread.sleep(2000L);
        ((ObjectAssert)Assertions.assertThat((Object)query.lookup(DataFileTestUtils.row((int[])new int[0]), 0, (InternalRow)DataFileTestUtils.row((int)1))).isNotNull()).extracting(r -> r.getInt(1)).isEqualTo((Object)11);
        ((ObjectAssert)Assertions.assertThat((Object)query.lookup(DataFileTestUtils.row((int[])new int[0]), 0, (InternalRow)DataFileTestUtils.row((int)2))).isNotNull()).extracting(r -> r.getInt(1)).isEqualTo((Object)22);
        ((ObjectAssert)Assertions.assertThat((Object)query.lookup(DataFileTestUtils.row((int[])new int[0]), 1, (InternalRow)DataFileTestUtils.row((int)3))).isNotNull()).extracting(r -> r.getInt(1)).isEqualTo((Object)33);
        ((ObjectAssert)Assertions.assertThat((Object)query.lookup(DataFileTestUtils.row((int[])new int[0]), 0, (InternalRow)DataFileTestUtils.row((int)4))).isNotNull()).extracting(r -> r.getInt(1)).isEqualTo((Object)44);
        Assertions.assertThat((Object)query.lookup(DataFileTestUtils.row((int[])new int[0]), 0, (InternalRow)DataFileTestUtils.row((int)5))).isNull();
        service.close();
        query.cancel().get();
    }

    @Test
    public void testLookupRemoteTable() throws Throwable {
        this.sql("CREATE TABLE DIM (i INT PRIMARY KEY NOT ENFORCED, j INT, k1 INT, k2 INT) WITH ('bucket' = '1')", new Object[0]);
        ServiceProxy proxy = this.launchQueryServer("DIM");
        proxy.write((InternalRow)GenericRow.of((Object[])new Object[]{1, 11, 111, 1111}));
        proxy.write((InternalRow)GenericRow.of((Object[])new Object[]{2, 22, 222, 2222}));
        String query = "SELECT T.i, D.j, D.k1 FROM T LEFT JOIN DIM for system_time as of T.proctime AS D ON T.i = D.i";
        BlockingIterator iterator = BlockingIterator.of((Iterator)this.sEnv.executeSql(query).collect());
        this.sql("INSERT INTO T VALUES (1), (2), (3)", new Object[0]);
        List result = iterator.collect(3);
        Assertions.assertThat((List)result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 11, 111}), Row.of((Object[])new Object[]{2, 22, 222}), Row.of((Object[])new Object[]{3, null, null})});
        proxy.write((InternalRow)GenericRow.of((Object[])new Object[]{2, 44, 444, 4444}));
        proxy.write((InternalRow)GenericRow.of((Object[])new Object[]{3, 33, 333, 3333}));
        Thread.sleep(2000L);
        this.sql("INSERT INTO T VALUES (1), (2), (3), (4)", new Object[0]);
        result = iterator.collect(4);
        Assertions.assertThat((List)result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 11, 111}), Row.of((Object[])new Object[]{2, 44, 444}), Row.of((Object[])new Object[]{3, 33, 333}), Row.of((Object[])new Object[]{4, null, null})});
        iterator.close();
        proxy.close();
    }

    @Disabled
    @Test
    public void testServiceFileCleaned() throws Exception {
        this.sql("CREATE TABLE DIM (k INT PRIMARY KEY NOT ENFORCED, v INT) WITH ('bucket' = '2', 'continuous.discovery-interval' = '1ms')", new Object[0]);
        JobClient client = this.queryService(this.paimonTable("DIM"));
        RemoteTableQuery query = new RemoteTableQuery((Table)this.paimonTable("DIM"));
        this.sql("INSERT INTO DIM VALUES (1, 11), (2, 22), (3, 33), (4, 44)", new Object[0]);
        Thread.sleep(2000L);
        ((ObjectAssert)Assertions.assertThat((Object)query.lookup(DataFileTestUtils.row((int[])new int[0]), 0, (InternalRow)DataFileTestUtils.row((int)1))).isNotNull()).extracting(r -> r.getInt(1)).isEqualTo((Object)11);
        client.cancel().get();
        query.cancel().get();
        ServiceManager serviceManager = this.paimonTable("DIM").store().newServiceManager();
        Assertions.assertThat((boolean)serviceManager.service("primary-key-lookup").isPresent()).isFalse();
    }

    private JobClient queryService(FileStoreTable table) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        QueryService.build((StreamExecutionEnvironment)env, (Table)table, (int)2);
        return env.executeAsync();
    }

    private ServiceProxy launchQueryServer(String tableName) throws Throwable {
        final FileStoreTable table = this.paimonTable(tableName);
        final LocalTableQuery query = table.newLocalTableQuery().withIOManager(IOManager.create((String)this.path));
        final KvQueryServer server = new KvQueryServer(0, 1, InetAddress.getLocalHost().getHostName(), Collections.singletonList(0).iterator(), Integer.valueOf(1), Integer.valueOf(1), (TableQuery)query, (ServiceRequestStats)new DisabledServiceRequestStats());
        server.start();
        InetSocketAddress[] addresses = new InetSocketAddress[]{server.getServerAddress()};
        ServiceManager serviceManager = table.store().newServiceManager();
        serviceManager.resetService("primary-key-lookup", addresses);
        return new ServiceProxy(){

            @Override
            public void write(InternalRow row) throws Exception {
                BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder();
                BatchTableWrite write = writeBuilder.newWrite();
                BatchTableCommit commit = writeBuilder.newCommit();
                write.write(row);
                List commitMessages = write.prepareCommit();
                commit.commit(commitMessages);
                CommitMessageImpl message = (CommitMessageImpl)commitMessages.get(0);
                query.refreshFiles(message.partition(), message.bucket(), Collections.emptyList(), message.newFilesIncrement().newFiles());
            }

            @Override
            public void close() throws IOException {
                server.shutdown();
                query.close();
            }
        };
    }

    private static interface ServiceProxy
    extends Closeable {
        public void write(InternalRow var1) throws Exception;
    }
}

