/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink;

import java.io.File;
import java.time.Duration;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.ReaderOutput;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.core.io.InputStatus;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.Snapshot;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.flink.CatalogITCaseBase;
import org.apache.paimon.flink.source.AbstractNonCoordinatedSource;
import org.apache.paimon.flink.source.AbstractNonCoordinatedSourceReader;
import org.apache.paimon.flink.source.SimpleSourceSplit;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.reader.RecordReaderIterator;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.table.source.TableScan;
import org.apache.paimon.utils.FailingFileIO;
import org.apache.paimon.utils.TimeUtils;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;

public class AppendTableITCase
extends CatalogITCaseBase {
    private static final Random RANDOM = new Random();

    @Test
    public void testReadEmpty() {
        Assertions.assertThat(this.batchSql("SELECT * FROM append_table", new Object[0])).isEmpty();
    }

    @Test
    public void testReadWrite() {
        this.batchSql("INSERT INTO append_table VALUES (1, 'AAA'), (2, 'BBB')", new Object[0]);
        List<Row> rows = this.batchSql("SELECT * FROM append_table", new Object[0]);
        Assertions.assertThat((int)rows.size()).isEqualTo(2);
        Assertions.assertThat(rows).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, "AAA"}), Row.of((Object[])new Object[]{2, "BBB"})});
        rows = this.batchSql("SELECT id FROM append_table", new Object[0]);
        Assertions.assertThat((int)rows.size()).isEqualTo(2);
        Assertions.assertThat(rows).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1}), Row.of((Object[])new Object[]{2})});
        rows = this.batchSql("SELECT data from append_table", new Object[0]);
        Assertions.assertThat((int)rows.size()).isEqualTo(2);
        Assertions.assertThat(rows).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{"AAA"}), Row.of((Object[])new Object[]{"BBB"})});
    }

    @Test
    public void testSkipDedup() {
        this.batchSql("INSERT INTO append_table VALUES (1, 'AAA'), (1, 'AAA'), (2, 'BBB'), (3, 'BBB')", new Object[0]);
        List<Row> rows = this.batchSql("SELECT * FROM append_table", new Object[0]);
        Assertions.assertThat((int)rows.size()).isEqualTo(4);
        Assertions.assertThat(rows).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, "AAA"}), Row.of((Object[])new Object[]{1, "AAA"}), Row.of((Object[])new Object[]{2, "BBB"}), Row.of((Object[])new Object[]{3, "BBB"})});
        rows = this.batchSql("SELECT id FROM append_table", new Object[0]);
        Assertions.assertThat((int)rows.size()).isEqualTo(4);
        Assertions.assertThat(rows).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1}), Row.of((Object[])new Object[]{1}), Row.of((Object[])new Object[]{2}), Row.of((Object[])new Object[]{3})});
        rows = this.batchSql("SELECT data FROM append_table", new Object[0]);
        Assertions.assertThat((int)rows.size()).isEqualTo(4);
        Assertions.assertThat(rows).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{"AAA"}), Row.of((Object[])new Object[]{"AAA"}), Row.of((Object[])new Object[]{"BBB"}), Row.of((Object[])new Object[]{"BBB"})});
    }

    @Test
    public void testIngestFromSource() {
        List<Row> input = Arrays.asList(Row.ofKind((RowKind)RowKind.INSERT, (Object[])new Object[]{1, "AAA"}), Row.ofKind((RowKind)RowKind.INSERT, (Object[])new Object[]{1, "AAA"}), Row.ofKind((RowKind)RowKind.INSERT, (Object[])new Object[]{1, "BBB"}), Row.ofKind((RowKind)RowKind.INSERT, (Object[])new Object[]{2, "AAA"}));
        String id = TestValuesTableFactory.registerData(input);
        this.batchSql("CREATE TEMPORARY TABLE source (id INT, data STRING) WITH ('connector'='values', 'bounded'='true', 'data-id'='%s')", id);
        this.batchSql("INSERT INTO append_table SELECT * FROM source", new Object[0]);
        List<Row> rows = this.batchSql("SELECT * FROM append_table", new Object[0]);
        Assertions.assertThat((int)rows.size()).isEqualTo(4);
        Assertions.assertThat(rows).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, "AAA"}), Row.of((Object[])new Object[]{1, "AAA"}), Row.of((Object[])new Object[]{1, "BBB"}), Row.of((Object[])new Object[]{2, "AAA"})});
        rows = this.batchSql("SELECT id FROM append_table", new Object[0]);
        Assertions.assertThat((int)rows.size()).isEqualTo(4);
        Assertions.assertThat(rows).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1}), Row.of((Object[])new Object[]{1}), Row.of((Object[])new Object[]{1}), Row.of((Object[])new Object[]{2})});
        rows = this.batchSql("SELECT data FROM append_table", new Object[0]);
        Assertions.assertThat((int)rows.size()).isEqualTo(4);
        Assertions.assertThat(rows).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{"AAA"}), Row.of((Object[])new Object[]{"AAA"}), Row.of((Object[])new Object[]{"BBB"}), Row.of((Object[])new Object[]{"AAA"})});
    }

    @Test
    public void testNoCompactionInBatchMode() {
        this.batchSql("ALTER TABLE append_table SET ('compaction.min.file-num' = '4')", new Object[0]);
        this.assertExecuteExpected("INSERT INTO append_table VALUES (1, 'AAA'), (2, 'BBB')", 1L, Snapshot.CommitKind.APPEND);
        this.assertExecuteExpected("INSERT INTO append_table VALUES (3, 'CCC'), (4, 'DDD')", 2L, Snapshot.CommitKind.APPEND);
        this.assertExecuteExpected("INSERT INTO append_table VALUES (1, 'AAA'), (2, 'BBB'), (3, 'CCC'), (4, 'DDD')", 3L, Snapshot.CommitKind.APPEND);
        this.assertExecuteExpected("INSERT INTO append_table VALUES (5, 'EEE'), (6, 'FFF')", 4L, Snapshot.CommitKind.APPEND);
        this.assertExecuteExpected("INSERT INTO append_table VALUES (7, 'HHH'), (8, 'III')", 5L, Snapshot.CommitKind.APPEND);
        this.assertExecuteExpected("INSERT INTO append_table VALUES (9, 'JJJ'), (10, 'KKK')", 6L, Snapshot.CommitKind.APPEND);
        this.assertExecuteExpected("INSERT INTO append_table VALUES (11, 'LLL'), (12, 'MMM')", 7L, Snapshot.CommitKind.APPEND);
        this.assertExecuteExpected("INSERT INTO append_table VALUES (13, 'NNN'), (14, 'OOO')", 8L, Snapshot.CommitKind.APPEND);
        List<Row> rows = this.batchSql("SELECT * FROM append_table", new Object[0]);
        Assertions.assertThat((int)rows.size()).isEqualTo(18);
        Assertions.assertThat(rows).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, "AAA"}), Row.of((Object[])new Object[]{2, "BBB"}), Row.of((Object[])new Object[]{3, "CCC"}), Row.of((Object[])new Object[]{4, "DDD"}), Row.of((Object[])new Object[]{1, "AAA"}), Row.of((Object[])new Object[]{2, "BBB"}), Row.of((Object[])new Object[]{3, "CCC"}), Row.of((Object[])new Object[]{4, "DDD"}), Row.of((Object[])new Object[]{5, "EEE"}), Row.of((Object[])new Object[]{6, "FFF"}), Row.of((Object[])new Object[]{7, "HHH"}), Row.of((Object[])new Object[]{8, "III"}), Row.of((Object[])new Object[]{9, "JJJ"}), Row.of((Object[])new Object[]{10, "KKK"}), Row.of((Object[])new Object[]{11, "LLL"}), Row.of((Object[])new Object[]{12, "MMM"}), Row.of((Object[])new Object[]{13, "NNN"}), Row.of((Object[])new Object[]{14, "OOO"})});
    }

    @Test
    public void testCompactionInStreamingMode() throws Exception {
        this.batchSql("ALTER TABLE append_table SET ('compaction.min.file-num' = '4')", new Object[0]);
        this.batchSql("ALTER TABLE append_table SET ('continuous.discovery-interval' = '1 s')", new Object[0]);
        this.sEnv.getConfig().getConfiguration().setString("execution.checkpointing.interval", TimeUtils.formatWithHighestUnit((Duration)Duration.ofMillis(500L)));
        this.sEnv.executeSql("CREATE TEMPORARY TABLE Orders_in (\n    f0        INT,\n    f1        STRING\n) WITH (\n    'connector' = 'datagen',\n    'rows-per-second' = '1'\n)");
        this.assertStreamingHasCompact("INSERT INTO append_table SELECT * FROM Orders_in", 60000L);
        Thread.sleep(5000L);
    }

    @Test
    public void testCompactionInStreamingModeWithMaxWatermark() throws Exception {
        this.batchSql("ALTER TABLE append_table SET ('compaction.min.file-num' = '4')", new Object[0]);
        this.batchSql("ALTER TABLE append_table SET ('continuous.discovery-interval' = '1 s')", new Object[0]);
        this.sEnv.getConfig().getConfiguration().setString("execution.checkpointing.interval", TimeUtils.formatWithHighestUnit((Duration)Duration.ofMillis(500L)));
        this.sEnv.executeSql("CREATE TEMPORARY TABLE Orders_in (\n    f0        INT,\n    f1        STRING,\n    ts        TIMESTAMP(3),\nWATERMARK FOR ts AS ts - INTERVAL '0' SECOND) WITH (\n    'connector' = 'datagen',\n    'rows-per-second' = '1',\n    'number-of-rows' = '10'\n)");
        this.assertStreamingHasCompact("INSERT INTO append_table SELECT f0, f1 FROM Orders_in", 60000L);
        Thread.sleep(5000L);
        Snapshot snapshot = this.findLatestSnapshot("append_table");
        org.junit.jupiter.api.Assertions.assertNotNull((Object)snapshot);
        Long watermark = snapshot.watermark();
        org.junit.jupiter.api.Assertions.assertNotNull((Object)watermark);
        org.junit.jupiter.api.Assertions.assertTrue((watermark > Long.MIN_VALUE ? 1 : 0) != 0);
    }

    @Test
    public void testRejectDelete() {
        this.testRejectChanges(RowKind.DELETE);
    }

    @Test
    public void testRejectUpdateBefore() {
        this.testRejectChanges(RowKind.UPDATE_BEFORE);
    }

    @Test
    public void testRejectUpdateAfter() {
        this.testRejectChanges(RowKind.UPDATE_BEFORE);
    }

    @Test
    public void testComplexType() {
        this.batchSql("INSERT INTO complex_table VALUES (1, CAST(NULL AS MAP<INT, INT>))", new Object[0]);
        Assertions.assertThat(this.batchSql("SELECT * FROM complex_table", new Object[0])).containsExactly((Object[])new Row[]{Row.of((Object[])new Object[]{1, null})});
    }

    @Test
    public void testTimestampLzType() {
        this.sql("CREATE TABLE t_table (id INT, data TIMESTAMP_LTZ(3))", new Object[0]);
        this.batchSql("INSERT INTO t_table VALUES (1, TIMESTAMP '2023-02-03 20:20:20')", new Object[0]);
        Assertions.assertThat(this.batchSql("SELECT * FROM t_table", new Object[0])).containsExactly((Object[])new Row[]{Row.of((Object[])new Object[]{1, LocalDateTime.parse("2023-02-03T20:20:20").atZone(ZoneId.systemDefault()).toInstant()})});
    }

    @Test
    public void testReadWriteFailRandom() throws Exception {
        this.setFailRate(100, 1000);
        int size = 1000;
        ArrayList<Row> results = new ArrayList<Row>();
        StringBuilder values = new StringBuilder("");
        for (int i = 0; i < size; ++i) {
            Integer j = RANDOM.nextInt();
            results.add(Row.of((Object[])new Object[]{j, String.valueOf(j)}));
            values.append("(" + j + ",'" + j + "'),");
        }
        FailingFileIO.retryArtificialException(() -> this.batchSql(String.format("INSERT INTO append_table VALUES %s", values.toString().substring(0, values.length() - 1)), new Object[0]));
        FailingFileIO.retryArtificialException(() -> {
            this.batchSql("SELECT * FROM append_table", new Object[0]);
            List<Row> rows = this.batchSql("SELECT * FROM append_table", new Object[0]);
            Assertions.assertThat((int)rows.size()).isGreaterThanOrEqualTo(size);
            Assertions.assertThat(rows).containsExactlyInAnyOrder((Object[])results.toArray(new Row[0]));
        });
    }

    @Test
    public void testReadWriteFailRandomString() throws Exception {
        this.setFailRate(100, 1000);
        int size = 1000;
        ArrayList<Row> results = new ArrayList<Row>();
        StringBuilder values = new StringBuilder("");
        for (int i = 0; i < size; ++i) {
            Integer j = RANDOM.nextInt();
            String v = String.valueOf(RANDOM.nextInt());
            results.add(Row.of((Object[])new Object[]{j, v}));
            values.append("(" + j + ",'" + v + "'),");
        }
        FailingFileIO.retryArtificialException(() -> this.batchSql(String.format("INSERT INTO append_table VALUES %s", values.toString().substring(0, values.length() - 1)), new Object[0]));
        FailingFileIO.retryArtificialException(() -> {
            this.batchSql("SELECT * FROM append_table", new Object[0]);
            List<Row> rows = this.batchSql("SELECT * FROM append_table", new Object[0]);
            Assertions.assertThat((int)rows.size()).isGreaterThanOrEqualTo(size);
            Assertions.assertThat(rows).containsExactlyInAnyOrder((Object[])results.toArray(new Row[0]));
        });
    }

    @Test
    public void testLimit() {
        this.sql("INSERT INTO append_table VALUES (1, 'AAA')", new Object[0]);
        this.sql("INSERT INTO append_table VALUES (2, 'BBB')", new Object[0]);
        Assertions.assertThat(this.sql("SELECT * FROM append_table LIMIT 1", new Object[0])).hasSize(1);
    }

    @Test
    public void testFileIndex() {
        this.batchSql("INSERT INTO index_table VALUES (1, 'a', 'AAA'), (1, 'a', 'AAA'), (2, 'c', 'BBB'), (3, 'c', 'BBB')", new Object[0]);
        this.batchSql("INSERT INTO index_table VALUES (1, 'a', 'AAA'), (1, 'a', 'AAA'), (2, 'd', 'BBB'), (3, 'd', 'BBB')", new Object[0]);
        Assertions.assertThat(this.batchSql("SELECT * FROM index_table WHERE indexc = 'c' and (id = 2 or id = 3)", new Object[0])).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{2, "c", "BBB"}), Row.of((Object[])new Object[]{3, "c", "BBB"})});
    }

    @Timeout(value=60L)
    @Test
    public void testStatelessWriter() throws Exception {
        FileStoreTable table = FileStoreTableFactory.create((FileIO)LocalFileIO.create(), (Path)new Path(this.path, "default.db/append_table"));
        StreamExecutionEnvironment env = this.streamExecutionEnvironmentBuilder().streamingMode().parallelism(2).checkpointIntervalMs(500).build();
        DataStream source = env.fromSource((Source)new TestStatelessWriterSource(table), WatermarkStrategy.noWatermarks(), "TestStatelessWriterSource").setParallelism(2).forward();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create((StreamExecutionEnvironment)env);
        tEnv.registerCatalog("mycat", (Catalog)this.sEnv.getCatalog("PAIMON").get());
        tEnv.executeSql("USE CATALOG mycat");
        tEnv.createTemporaryView("S", tEnv.fromDataStream(source).as("id", new String[0]));
        tEnv.executeSql("INSERT INTO append_table SELECT id, 'test' FROM S").await();
        Assertions.assertThat(this.batchSql("SELECT * FROM append_table", new Object[0])).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, "test"}), Row.of((Object[])new Object[]{2, "test"})});
    }

    @ParameterizedTest
    @EnumSource(value=CoreOptions.PartitionSinkStrategy.class)
    public void testPartitionStrategyForPartitionedTable(CoreOptions.PartitionSinkStrategy strategy) throws Catalog.TableNotExistException {
        int partitionNums = 5;
        int largerSinkParallelism = 7;
        int lessSinkParallelism = 3;
        int hashStrategyResultFileCount = 1;
        this.batchSql("CREATE TABLE IF NOT EXISTS partition_strategy_table_larger (id INT, data STRING, dt STRING) PARTITIONED BY (dt) WITH ('bucket' = '-1','%s' = '%s','sink.parallelism' = '7')", CoreOptions.PARTITION_SINK_STRATEGY.key(), strategy);
        this.batchSql("CREATE TABLE IF NOT EXISTS partition_strategy_table_less (id INT, data STRING, dt STRING) PARTITIONED BY (dt) WITH ('bucket' = '-1','%s' = '%s','sink.parallelism' = '3')", CoreOptions.PARTITION_SINK_STRATEGY.key(), strategy);
        StringBuilder values = new StringBuilder();
        for (int i = 1; i <= 30; ++i) {
            for (int j = 1; j <= partitionNums; ++j) {
                values.append(String.format("(%s, 'HXH', '2025030%s'),", j, j));
            }
        }
        this.batchSql("INSERT INTO partition_strategy_table_larger VALUES " + values.substring(0, values.length() - 1), new Object[0]);
        this.batchSql("INSERT INTO partition_strategy_table_less VALUES " + values.substring(0, values.length() - 1), new Object[0]);
        Assertions.assertThat((int)this.batchSql("SELECT * FROM partition_strategy_table_larger", new Object[0]).size()).isEqualTo(150);
        Assertions.assertThat((int)this.batchSql("SELECT * FROM partition_strategy_table_less", new Object[0]).size()).isEqualTo(150);
        FileStoreTable fileStoreTableLarger = this.paimonTable("partition_strategy_table_larger");
        List partitionEntriesLarger = fileStoreTableLarger.newReadBuilder().newScan().listPartitionEntries();
        Assertions.assertThat((int)partitionEntriesLarger.size()).isEqualTo(partitionNums);
        int fileCountLarger = strategy == CoreOptions.PartitionSinkStrategy.HASH ? hashStrategyResultFileCount : largerSinkParallelism;
        partitionEntriesLarger.forEach(x -> Assertions.assertThat((long)x.fileCount()).isEqualTo((long)fileCountLarger));
        FileStoreTable fileStoreTableLess = this.paimonTable("partition_strategy_table_less");
        List partitionEntriesLess = fileStoreTableLess.newReadBuilder().newScan().listPartitionEntries();
        Assertions.assertThat((int)partitionEntriesLess.size()).isEqualTo(partitionNums);
        int fileCountLess = strategy == CoreOptions.PartitionSinkStrategy.HASH ? hashStrategyResultFileCount : lessSinkParallelism;
        partitionEntriesLess.forEach(x -> Assertions.assertThat((long)x.fileCount()).isEqualTo((long)fileCountLess));
    }

    @Override
    protected List<String> ddl() {
        return Arrays.asList("CREATE TABLE IF NOT EXISTS append_table (id INT, data STRING) WITH ('bucket' = '-1')", "CREATE TABLE IF NOT EXISTS part_table (id INT, data STRING, dt STRING) PARTITIONED BY (dt) WITH ('bucket' = '-1')", "CREATE TABLE IF NOT EXISTS complex_table (id INT, data MAP<INT, INT>) WITH ('bucket' = '-1')", "CREATE TABLE IF NOT EXISTS index_table (id INT, indexc STRING, data STRING) WITH ('bucket' = '-1', 'file-index.bloom-filter.columns'='indexc', 'file-index.bloom-filter.indexc.items' = '500')");
    }

    @Override
    protected String toWarehouse(String path) {
        File file = new File(path);
        String dirName = file.getName();
        String dirPath = file.getPath();
        FailingFileIO.reset((String)dirName, (int)0, (int)1);
        return FailingFileIO.getFailingPath((String)dirName, (String)dirPath);
    }

    private void setFailRate(int maxFails, int failPossibility) {
        FailingFileIO.reset((String)new Path(this.path).getName(), (int)maxFails, (int)failPossibility);
    }

    private void testRejectChanges(RowKind kind) {
        List<Row> input = Collections.singletonList(Row.ofKind((RowKind)kind, (Object[])new Object[]{1, "AAA"}));
        String id = TestValuesTableFactory.registerData(input);
        this.batchSql("CREATE TEMPORARY TABLE source (id INT, data STRING) WITH ('connector'='values', 'bounded'='true', 'data-id'='%s')", id);
        Assertions.assertThatThrownBy(() -> this.batchSql("INSERT INTO append_table SELECT * FROM source", new Object[0])).hasRootCauseInstanceOf(IllegalStateException.class).hasRootCauseMessage("Append only writer can not accept row with RowKind %s", new Object[]{kind});
    }

    private void assertExecuteExpected(String sql, long expectedSnapshotId, Snapshot.CommitKind expectedCommitKind) {
        this.batchSql(sql, new Object[0]);
        Snapshot snapshot = this.findLatestSnapshot("append_table");
        Assertions.assertThat((long)snapshot.id()).isEqualTo(expectedSnapshotId);
        Assertions.assertThat((Comparable)snapshot.commitKind()).isEqualTo((Object)expectedCommitKind);
    }

    private void assertStreamingHasCompact(String sql, long timeout) throws Exception {
        long start = System.currentTimeMillis();
        long currentId = 1L;
        this.sEnv.executeSql(sql);
        while (true) {
            long now;
            Snapshot snapshot;
            if ((snapshot = this.findSnapshot("append_table", currentId)) != null) {
                if (snapshot.commitKind() == Snapshot.CommitKind.COMPACT) break;
                ++currentId;
            }
            if ((now = System.currentTimeMillis()) - start > timeout) {
                throw new RuntimeException("Time up for streaming execute, don't get expected result.");
            }
            Thread.sleep(1000L);
        }
    }

    private static class TestStatelessWriterSource
    extends AbstractNonCoordinatedSource<Integer> {
        private final FileStoreTable table;

        private TestStatelessWriterSource(FileStoreTable table) {
            this.table = table;
        }

        public Boundedness getBoundedness() {
            return Boundedness.CONTINUOUS_UNBOUNDED;
        }

        public SourceReader<Integer, SimpleSourceSplit> createReader(SourceReaderContext sourceReaderContext) throws Exception {
            return new Reader(sourceReaderContext.getIndexOfSubtask());
        }

        private int countNumRecords() throws Exception {
            int ret = 0;
            RecordReader reader = this.table.newRead().createReader((TableScan.Plan)this.table.newSnapshotReader().read());
            try (RecordReaderIterator it = new RecordReaderIterator(reader);){
                while (it.hasNext()) {
                    it.next();
                    ++ret;
                }
            }
            return ret;
        }

        private class Reader
        extends AbstractNonCoordinatedSourceReader<Integer> {
            private final int taskId;
            private int waitCount;

            private Reader(int taskId) {
                this.taskId = taskId;
                this.waitCount = taskId == 0 ? 0 : 10;
            }

            public InputStatus pollNext(ReaderOutput<Integer> readerOutput) throws Exception {
                if (this.taskId == 0) {
                    if (this.waitCount == 0) {
                        readerOutput.collect((Object)1);
                    } else if (TestStatelessWriterSource.this.countNumRecords() >= 1) {
                        Thread.sleep(1000L);
                        return InputStatus.END_OF_INPUT;
                    }
                } else {
                    int numRecords = TestStatelessWriterSource.this.countNumRecords();
                    if (numRecords >= 1) {
                        if (this.waitCount == 0) {
                            readerOutput.collect((Object)2);
                        } else if (TestStatelessWriterSource.this.countNumRecords() >= 2) {
                            Thread.sleep(1000L);
                            return InputStatus.END_OF_INPUT;
                        }
                    }
                }
                --this.waitCount;
                Thread.sleep(1000L);
                return InputStatus.MORE_AVAILABLE;
            }
        }
    }
}

