/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.source.operator;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.runtime.typeutils.InternalSerializers;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.function.SupplierWithException;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.catalog.CatalogFactory;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.flink.source.operator.MonitorFunction;
import org.apache.paimon.flink.source.operator.ReadOperator;
import org.apache.paimon.fs.Path;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.sink.BatchTableCommit;
import org.apache.paimon.table.sink.BatchTableWrite;
import org.apache.paimon.table.sink.BatchWriteBuilder;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.TableRead;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

public class OperatorSourceTest {
    @TempDir
    java.nio.file.Path tempDir;
    private Table table;

    @BeforeEach
    public void before() throws Catalog.TableAlreadyExistException, Catalog.DatabaseNotExistException, Catalog.TableNotExistException, Catalog.DatabaseAlreadyExistException {
        Catalog catalog = CatalogFactory.createCatalog((CatalogContext)CatalogContext.create((Path)new Path(this.tempDir.toUri())));
        Schema schema = Schema.newBuilder().column("a", (DataType)DataTypes.INT()).column("b", (DataType)DataTypes.INT()).column("c", (DataType)DataTypes.INT()).primaryKey(new String[]{"a"}).options(Collections.singletonMap(CoreOptions.CONSUMER_ID.key(), "my_consumer")).build();
        Identifier identifier = Identifier.create((String)"default", (String)"t");
        catalog.createDatabase("default", false);
        catalog.createTable(identifier, schema, false);
        this.table = catalog.getTable(identifier);
    }

    private void writeToTable(int a, int b, int c) throws Exception {
        BatchWriteBuilder writeBuilder = this.table.newBatchWriteBuilder();
        BatchTableWrite write = writeBuilder.newWrite();
        write.write((InternalRow)GenericRow.of((Object[])new Object[]{a, b, c}));
        BatchTableCommit commit = writeBuilder.newCommit();
        commit.commit(write.prepareCommit());
        write.close();
        commit.close();
    }

    private List<List<Integer>> readSplit(Split split) throws IOException {
        TableRead read = this.table.newReadBuilder().newRead();
        ArrayList<List<Integer>> result = new ArrayList<List<Integer>>();
        read.createReader(split).forEachRemaining(row -> result.add(Arrays.asList(row.getInt(0), row.getInt(1), row.getInt(2))));
        return result;
    }

    @Test
    public void testMonitorFunction() throws Exception {
        MonitorFunction function = new MonitorFunction(this.table.newReadBuilder(), 10L, false);
        StreamSource src = new StreamSource((SourceFunction)function);
        AbstractStreamOperatorTestHarness testHarness = new AbstractStreamOperatorTestHarness((StreamOperator)src, 1, 1, 0);
        testHarness.open();
        OperatorSubtaskState snapshot = (OperatorSubtaskState)this.testReadSplit(function, () -> testHarness.snapshot(0L, 0L), 1, 1, 1);
        MonitorFunction functionCopy1 = new MonitorFunction(this.table.newReadBuilder(), 10L, false);
        StreamSource srcCopy1 = new StreamSource((SourceFunction)functionCopy1);
        AbstractStreamOperatorTestHarness testHarnessCopy1 = new AbstractStreamOperatorTestHarness((StreamOperator)srcCopy1, 1, 1, 0);
        testHarnessCopy1.initializeState(snapshot);
        testHarnessCopy1.open();
        this.testReadSplit(functionCopy1, () -> {
            testHarnessCopy1.snapshot(1L, 1L);
            testHarnessCopy1.notifyOfCompletedCheckpoint(1L);
            return null;
        }, 2, 2, 2);
        MonitorFunction functionCopy2 = new MonitorFunction(this.table.newReadBuilder(), 10L, false);
        StreamSource srcCopy2 = new StreamSource((SourceFunction)functionCopy2);
        AbstractStreamOperatorTestHarness testHarnessCopy2 = new AbstractStreamOperatorTestHarness((StreamOperator)srcCopy2, 1, 1, 0);
        testHarnessCopy2.open();
        this.testReadSplit(functionCopy2, () -> null, 3, 3, 3);
    }

    @Test
    public void testReadOperator() throws Exception {
        ReadOperator readOperator = new ReadOperator(this.table.newReadBuilder());
        OneInputStreamOperatorTestHarness harness = new OneInputStreamOperatorTestHarness((OneInputStreamOperator)readOperator);
        harness.setup((TypeSerializer)InternalSerializers.create((RowType)RowType.of((LogicalType[])new LogicalType[]{new IntType(), new IntType(), new IntType()})));
        this.writeToTable(1, 1, 1);
        this.writeToTable(2, 2, 2);
        List splits = this.table.newReadBuilder().newScan().plan().splits();
        harness.open();
        for (Split split : splits) {
            harness.processElement(new StreamRecord((Object)split));
        }
        ArrayList values = new ArrayList(harness.getOutput());
        Assertions.assertThat(values).containsExactlyInAnyOrder(new Object[]{new StreamRecord((Object)GenericRowData.of((Object[])new Object[]{1, 1, 1})), new StreamRecord((Object)GenericRowData.of((Object[])new Object[]{2, 2, 2}))});
    }

    private <T> T testReadSplit(MonitorFunction function, SupplierWithException<T, Exception> beforeClose, int a, int b, int c) throws Exception {
        Throwable[] error = new Throwable[1];
        final ArrayBlockingQueue queue = new ArrayBlockingQueue(10);
        DummySourceContext sourceContext = new DummySourceContext(){

            public void collect(Split element) {
                queue.add(element);
            }
        };
        Thread runner = new Thread(() -> {
            try {
                function.run((SourceFunction.SourceContext)sourceContext);
            }
            catch (Throwable t) {
                t.printStackTrace();
                error[0] = t;
            }
        });
        runner.start();
        this.writeToTable(a, b, c);
        Split split = (Split)queue.poll(1L, TimeUnit.MINUTES);
        Assertions.assertThat(this.readSplit(split)).containsExactlyInAnyOrder((Object[])new List[]{Arrays.asList(a, b, c)});
        Object t = beforeClose.get();
        function.cancel();
        runner.join();
        Assertions.assertThat((Throwable)error[0]).isNull();
        return (T)t;
    }

    private static abstract class DummySourceContext
    implements SourceFunction.SourceContext<Split> {
        private final Object lock = new Object();

        private DummySourceContext() {
        }

        public void collectWithTimestamp(Split element, long timestamp) {
        }

        public void emitWatermark(Watermark mark) {
        }

        public void markAsTemporarilyIdle() {
        }

        public Object getCheckpointLock() {
            return this.lock;
        }

        public void close() {
        }
    }
}

