/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.sink.cdc;

import java.io.Serializable;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
import org.apache.flink.streaming.api.transformations.PartitionTransformation;
import org.apache.paimon.catalog.CatalogLoader;
import org.apache.paimon.flink.FlinkCatalogFactory;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.sink.cdc.CdcMultiplexRecord;
import org.apache.paimon.flink.sink.cdc.FlinkCdcMultiTableSink;
import org.apache.paimon.options.Options;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

public class FlinkCdcMultiTableSinkTest {
    @Test
    public void testTransformationParallelism() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(8);
        int inputParallelism = ThreadLocalRandom.current().nextInt(8) + 1;
        DataStreamSource input = env.fromData(CdcMultiplexRecord.class, (Object[])new CdcMultiplexRecord[]{new CdcMultiplexRecord("", "", null)}).setParallelism(inputParallelism);
        FlinkCdcMultiTableSink sink = new FlinkCdcMultiTableSink((CatalogLoader & Serializable)() -> FlinkCatalogFactory.createPaimonCatalog((Options)new Options()), ((Double)FlinkConnectorOptions.SINK_WRITER_CPU.defaultValue()).doubleValue(), null, ((Double)FlinkConnectorOptions.SINK_COMMITTER_CPU.defaultValue()).doubleValue(), null, UUID.randomUUID().toString(), false, null);
        DataStreamSink dataStreamSink = sink.sinkFrom((DataStream)input);
        Transformation end = dataStreamSink.getTransformation();
        Assertions.assertThat((String)end.getName()).isEqualTo("end");
        OneInputTransformation committer = (OneInputTransformation)end.getInputs().get(0);
        Assertions.assertThat((String)committer.getName()).isEqualTo("Multiplex Global Committer");
        Assertions.assertThat((int)committer.getParallelism()).isEqualTo(inputParallelism);
        PartitionTransformation partitioner = (PartitionTransformation)committer.getInputs().get(0);
        Assertions.assertThat((int)partitioner.getParallelism()).isEqualTo(inputParallelism);
        OneInputTransformation writer = (OneInputTransformation)partitioner.getInputs().get(0);
        Assertions.assertThat((String)writer.getName()).isEqualTo("CDC MultiplexWriter");
        Assertions.assertThat((int)writer.getParallelism()).isEqualTo(inputParallelism);
    }
}

