/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.sink.cdc;

import java.io.Serializable;
import java.util.Collections;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.transformations.LegacySinkTransformation;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
import org.apache.flink.streaming.api.transformations.PartitionTransformation;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.flink.FlinkCatalogFactory;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.sink.cdc.CdcMultiplexRecord;
import org.apache.paimon.flink.sink.cdc.FlinkCdcMultiTableSink;
import org.apache.paimon.options.Options;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

public class FlinkCdcMultiTableSinkTest {
    @Test
    public void testTransformationParallelism() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(8);
        int inputParallelism = ThreadLocalRandom.current().nextInt(8) + 1;
        DataStreamSource input = env.addSource((SourceFunction)new ParallelSourceFunction<CdcMultiplexRecord>(){

            public void run(SourceFunction.SourceContext<CdcMultiplexRecord> ctx) {
            }

            public void cancel() {
            }
        }).setParallelism(inputParallelism);
        FlinkCdcMultiTableSink sink = new FlinkCdcMultiTableSink((Catalog.Loader & Serializable)() -> FlinkCatalogFactory.createPaimonCatalog((Options)new Options()), ((Double)FlinkConnectorOptions.SINK_COMMITTER_CPU.defaultValue()).doubleValue(), null);
        DataStreamSink dataStreamSink = sink.sinkFrom((DataStream)input, Collections.emptyMap());
        LegacySinkTransformation end = (LegacySinkTransformation)dataStreamSink.getTransformation();
        Assertions.assertThat((String)end.getName()).isEqualTo("end");
        OneInputTransformation committer = (OneInputTransformation)end.getInputs().get(0);
        Assertions.assertThat((String)committer.getName()).isEqualTo("Multiplex Global Committer");
        Assertions.assertThat((int)committer.getParallelism()).isEqualTo(inputParallelism);
        PartitionTransformation partitioner = (PartitionTransformation)committer.getInputs().get(0);
        Assertions.assertThat((int)partitioner.getParallelism()).isEqualTo(inputParallelism);
        OneInputTransformation writer = (OneInputTransformation)partitioner.getInputs().get(0);
        Assertions.assertThat((String)writer.getName()).isEqualTo("CDC MultiplexWriter");
        Assertions.assertThat((int)writer.getParallelism()).isEqualTo(inputParallelism);
    }
}

