/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.source.operator;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.groups.OperatorMetricGroup;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.event.WatermarkEvent;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.SourceOperator;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.runtime.typeutils.InternalSerializers;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.function.SupplierWithException;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.catalog.CatalogFactory;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.flink.source.operator.MonitorSource;
import org.apache.paimon.flink.source.operator.ReadOperator;
import org.apache.paimon.flink.source.operator.TestingSourceOperator;
import org.apache.paimon.flink.utils.TestingMetricUtils;
import org.apache.paimon.fs.Path;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.sink.BatchTableCommit;
import org.apache.paimon.table.sink.BatchTableWrite;
import org.apache.paimon.table.sink.BatchWriteBuilder;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.TableRead;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.utils.SerializableSupplier;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

public class OperatorSourceTest {
    @TempDir
    java.nio.file.Path tempDir;
    private Table table;

    @BeforeEach
    public void before() throws Catalog.TableAlreadyExistException, Catalog.DatabaseNotExistException, Catalog.TableNotExistException, Catalog.DatabaseAlreadyExistException {
        Catalog catalog = CatalogFactory.createCatalog((CatalogContext)CatalogContext.create((Path)new Path(this.tempDir.toUri())));
        Schema schema = Schema.newBuilder().column("a", (DataType)DataTypes.INT()).column("b", (DataType)DataTypes.INT()).column("c", (DataType)DataTypes.INT()).primaryKey(new String[]{"a"}).option(CoreOptions.CONSUMER_ID.key(), "my_consumer").option("bucket", "1").build();
        Identifier identifier = Identifier.create((String)"default", (String)"t");
        catalog.createDatabase("default", false);
        catalog.createTable(identifier, schema, false);
        this.table = catalog.getTable(identifier);
    }

    private void writeToTable(int a, int b, int c) throws Exception {
        BatchWriteBuilder writeBuilder = this.table.newBatchWriteBuilder();
        BatchTableWrite write = writeBuilder.newWrite();
        write.write((InternalRow)GenericRow.of((Object[])new Object[]{a, b, c}));
        BatchTableCommit commit = writeBuilder.newCommit();
        commit.commit(write.prepareCommit());
        write.close();
        commit.close();
    }

    private List<List<Integer>> readSplit(Split split) throws IOException {
        TableRead read = this.table.newReadBuilder().newRead();
        ArrayList<List<Integer>> result = new ArrayList<List<Integer>>();
        read.createReader(split).forEachRemaining(row -> result.add(Arrays.asList(row.getInt(0), row.getInt(1), row.getInt(2))));
        return result;
    }

    @Test
    public void testMonitorSourceWhenIsBoundedIsTrue() throws Exception {
        MonitorSource source = new MonitorSource(this.table.newReadBuilder(), 10L, false, true);
        TestingSourceOperator operator = (TestingSourceOperator)TestingSourceOperator.createTestOperator(source.createReader(null), WatermarkStrategy.noWatermarks(), false);
        AbstractStreamOperatorTestHarness testHarness = new AbstractStreamOperatorTestHarness((StreamOperator)operator, 1, 1, 0);
        testHarness.open();
        this.testReadSplit((SourceOperator<Split, ?>)operator, (SupplierWithException)() -> 1, 1, 1, 1);
    }

    @Test
    public void testMonitorSource() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        MonitorSource source = new MonitorSource(this.table.newReadBuilder(), 10L, false, false);
        TestingSourceOperator operator = (TestingSourceOperator)TestingSourceOperator.createTestOperator(source.createReader(null), WatermarkStrategy.noWatermarks(), false);
        AbstractStreamOperatorTestHarness testHarness = new AbstractStreamOperatorTestHarness((StreamOperator)operator, 1, 1, 0);
        testHarness.open();
        OperatorSubtaskState snapshot = (OperatorSubtaskState)this.testReadSplit((SourceOperator<Split, ?>)operator, (SupplierWithException)() -> testHarness.snapshot(0L, 0L), 1, 1, 1);
        MonitorSource sourceCopy1 = new MonitorSource(this.table.newReadBuilder(), 10L, false, false);
        TestingSourceOperator operatorCopy1 = (TestingSourceOperator)TestingSourceOperator.createTestOperator(sourceCopy1.createReader(null), WatermarkStrategy.noWatermarks(), false);
        AbstractStreamOperatorTestHarness testHarnessCopy1 = new AbstractStreamOperatorTestHarness((StreamOperator)operatorCopy1, 1, 1, 0);
        testHarnessCopy1.initializeState(snapshot);
        testHarnessCopy1.open();
        this.testReadSplit((SourceOperator<Split, ?>)operatorCopy1, (SupplierWithException)() -> {
            testHarnessCopy1.snapshot(1L, 1L);
            testHarnessCopy1.notifyOfCompletedCheckpoint(1L);
            return null;
        }, 2, 2, 2);
        MonitorSource sourceCopy2 = new MonitorSource(this.table.newReadBuilder(), 10L, false, false);
        TestingSourceOperator operatorCopy2 = (TestingSourceOperator)TestingSourceOperator.createTestOperator(sourceCopy2.createReader(null), WatermarkStrategy.noWatermarks(), false);
        AbstractStreamOperatorTestHarness testHarnessCopy2 = new AbstractStreamOperatorTestHarness((StreamOperator)operatorCopy2, 1, 1, 0);
        testHarnessCopy2.open();
        this.testReadSplit((SourceOperator<Split, ?>)operatorCopy2, (SupplierWithException)() -> null, 3, 3, 3);
    }

    @Test
    public void testReadOperator() throws Exception {
        ReadOperator readOperator = new ReadOperator((SerializableSupplier & Serializable)() -> this.table.newReadBuilder().newRead(), null, null);
        OneInputStreamOperatorTestHarness harness = new OneInputStreamOperatorTestHarness((OneInputStreamOperator)readOperator);
        harness.setup((TypeSerializer)InternalSerializers.create((RowType)RowType.of((LogicalType[])new LogicalType[]{new IntType(), new IntType(), new IntType()})));
        this.writeToTable(1, 1, 1);
        this.writeToTable(2, 2, 2);
        List splits = this.table.newReadBuilder().newScan().plan().splits();
        harness.open();
        for (Split split : splits) {
            harness.processElement(new StreamRecord((Object)split));
        }
        ArrayList values = new ArrayList(harness.getOutput());
        Assertions.assertThat(values).containsExactlyInAnyOrder(new Object[]{new StreamRecord((Object)GenericRowData.of((Object[])new Object[]{1, 1, 1})), new StreamRecord((Object)GenericRowData.of((Object[])new Object[]{2, 2, 2}))});
    }

    @Test
    public void testReadOperatorWithLimit() throws Exception {
        ReadOperator readOperator = new ReadOperator((SerializableSupplier & Serializable)() -> this.table.newReadBuilder().newRead(), null, Long.valueOf(2L));
        OneInputStreamOperatorTestHarness harness = new OneInputStreamOperatorTestHarness((OneInputStreamOperator)readOperator);
        harness.setup((TypeSerializer)InternalSerializers.create((RowType)RowType.of((LogicalType[])new LogicalType[]{new IntType(), new IntType(), new IntType()})));
        this.writeToTable(1, 1, 1);
        this.writeToTable(2, 2, 2);
        this.writeToTable(3, 3, 3);
        this.writeToTable(4, 4, 4);
        List splits = this.table.newReadBuilder().newScan().plan().splits();
        harness.open();
        for (Split split : splits) {
            harness.processElement(new StreamRecord((Object)split));
        }
        ArrayList values = new ArrayList(harness.getOutput());
        Assertions.assertThat(values).containsExactlyInAnyOrder(new Object[]{new StreamRecord((Object)GenericRowData.of((Object[])new Object[]{1, 1, 1})), new StreamRecord((Object)GenericRowData.of((Object[])new Object[]{2, 2, 2})), new StreamRecord((Object)GenericRowData.of((Object[])new Object[]{3, 3, 3}))});
    }

    @Test
    public void testReadOperatorMetricsRegisterAndUpdate() throws Exception {
        ReadOperator readOperator = new ReadOperator((SerializableSupplier & Serializable)() -> this.table.newReadBuilder().newRead(), null, null);
        OneInputStreamOperatorTestHarness harness = new OneInputStreamOperatorTestHarness((OneInputStreamOperator)readOperator);
        harness.setup((TypeSerializer)InternalSerializers.create((RowType)RowType.of((LogicalType[])new LogicalType[]{new IntType(), new IntType(), new IntType()})));
        this.writeToTable(1, 1, 1);
        this.writeToTable(2, 2, 2);
        List splits = this.table.newReadBuilder().newScan().plan().splits();
        Assertions.assertThat((int)splits.size()).isGreaterThan(0);
        OperatorMetricGroup readerOperatorMetricGroup = readOperator.getMetricGroup();
        harness.open();
        Assertions.assertThat((Object)TestingMetricUtils.getGauge((MetricGroup)readerOperatorMetricGroup, "currentFetchEventTimeLag").getValue()).isEqualTo((Object)-1L);
        Assertions.assertThat((Object)TestingMetricUtils.getGauge((MetricGroup)readerOperatorMetricGroup, "currentEmitEventTimeLag").getValue()).isEqualTo((Object)-1L);
        Thread.sleep(300L);
        Assertions.assertThat((Long)((Long)TestingMetricUtils.getGauge((MetricGroup)readerOperatorMetricGroup, "sourceIdleTime").getValue())).isGreaterThan(299L);
        harness.processElement(new StreamRecord(splits.get(0)));
        Assertions.assertThat((Long)((Long)TestingMetricUtils.getGauge((MetricGroup)readerOperatorMetricGroup, "currentFetchEventTimeLag").getValue())).isGreaterThan(0L);
        long emitEventTimeLag = (Long)TestingMetricUtils.getGauge((MetricGroup)readerOperatorMetricGroup, "currentEmitEventTimeLag").getValue();
        Assertions.assertThat((long)emitEventTimeLag).isGreaterThan(0L);
        Thread.sleep(100L);
        Assertions.assertThat((Long)((Long)TestingMetricUtils.getGauge((MetricGroup)readerOperatorMetricGroup, "currentEmitEventTimeLag").getValue())).isEqualTo(emitEventTimeLag);
        Assertions.assertThat((Long)((Long)TestingMetricUtils.getGauge((MetricGroup)readerOperatorMetricGroup, "sourceIdleTime").getValue())).isGreaterThan(99L).isLessThan(300L);
    }

    private <T> T testReadSplit(SourceOperator<Split, ?> operator, SupplierWithException<T, Exception> beforeClose, int a, int b, int c) throws Exception {
        Throwable[] error = new Throwable[1];
        final ArrayBlockingQueue queue = new ArrayBlockingQueue(10);
        AtomicReference iteratorRef = new AtomicReference();
        PushingAsyncDataInput.DataOutput<Split> output = new PushingAsyncDataInput.DataOutput<Split>(){

            public void emitRecord(StreamRecord<Split> streamRecord) {
                queue.add(streamRecord.getValue());
            }

            public void emitWatermark(Watermark watermark) {
            }

            public void emitWatermarkStatus(WatermarkStatus watermarkStatus) {
            }

            public void emitLatencyMarker(LatencyMarker latencyMarker) {
            }

            public void emitRecordAttributes(RecordAttributes recordAttributes) {
            }

            public void emitWatermark(WatermarkEvent watermarkEvent) {
            }
        };
        this.writeToTable(a, b, c);
        AtomicBoolean isRunning = new AtomicBoolean(true);
        Thread runner = new Thread(() -> OperatorSourceTest.lambda$testReadSplit$5(isRunning, operator, (PushingAsyncDataInput.DataOutput)output, error));
        runner.start();
        Split split = (Split)queue.poll(1L, TimeUnit.MINUTES);
        Assertions.assertThat(this.readSplit(split)).containsExactlyInAnyOrder((Object[])new List[]{Arrays.asList(a, b, c)});
        Object t = beforeClose.get();
        CloseableIterator iterator = (CloseableIterator)iteratorRef.get();
        if (iterator != null) {
            iterator.close();
        }
        isRunning.set(false);
        runner.join();
        Assertions.assertThat((Throwable)error[0]).isNull();
        return (T)t;
    }

    private static /* synthetic */ void lambda$testReadSplit$5(AtomicBoolean isRunning, SourceOperator operator, PushingAsyncDataInput.DataOutput output, Throwable[] error) {
        try {
            while (isRunning.get()) {
                operator.emitNext(output);
            }
        }
        catch (Throwable t) {
            t.printStackTrace();
            error[0] = t;
        }
    }
}

