/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.source;

import java.util.Iterator;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.commons.collections.IteratorUtils;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.RowData;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.util.TestLogger;
import org.apache.paimon.flink.source.NumberSequenceRowSource;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;

public class IteratorSourcesITCase
extends TestLogger {
    private static final int PARALLELISM = 4;
    @ClassRule
    public static final MiniClusterWithClientResource MINI_CLUSTER = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setNumberTaskManagers(1).setNumberSlotsPerTaskManager(4).build());

    @Test
    public void testParallelSourceExecution() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(4);
        DataStreamSource stream = env.fromSource((Source)new NumberSequenceRowSource(1L, 1000L), WatermarkStrategy.noWatermarks(), "iterator source");
        List result = IteratorUtils.toList((Iterator)stream.executeAndCollect("Iterator Source Test"));
        IteratorSourcesITCase.verifySequence(result, 1L, 1000L);
    }

    private static void verifySequence(List<RowData> sequence, long from, long to) {
        if ((long)sequence.size() != to - from + 1L) {
            Assert.fail((String)String.format("Expected: Sequence [%d, %d]. Found: %s", from, to, sequence));
        }
        List list = sequence.stream().map(r -> r.getLong(0)).sorted(Long::compareTo).collect(Collectors.toList());
        int pos = 0;
        long value = from;
        while (value <= to) {
            if (value != (Long)list.get(pos)) {
                Assert.fail((String)String.format("Expected: Sequence [%d, %d]. Found: %s", from, to, list));
            }
            ++value;
            ++pos;
        }
    }
}

