/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.sink;

import org.apache.flink.api.dag.Transformation;
import org.apache.flink.runtime.io.network.api.writer.SubtaskStateMapper;
import org.apache.flink.runtime.plugable.SerializationDelegate;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.transformations.PartitionTransformation;
import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.paimon.table.sink.ChannelComputer;

public class FlinkStreamPartitioner<T>
extends StreamPartitioner<T> {
    private final ChannelComputer<T> channelComputer;

    public FlinkStreamPartitioner(ChannelComputer<T> channelComputer) {
        this.channelComputer = channelComputer;
    }

    public void setup(int numberOfChannels) {
        super.setup(numberOfChannels);
        this.channelComputer.setup(numberOfChannels);
    }

    public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
        return this.channelComputer.channel(((StreamRecord)record.getInstance()).getValue());
    }

    public StreamPartitioner<T> copy() {
        return this;
    }

    public SubtaskStateMapper getDownstreamSubtaskStateMapper() {
        return SubtaskStateMapper.FULL;
    }

    public boolean isPointwise() {
        return false;
    }

    public String toString() {
        return this.channelComputer.toString();
    }

    public static <T> DataStream<T> partition(DataStream<T> input, ChannelComputer<T> channelComputer, Integer parallelism) {
        FlinkStreamPartitioner<T> partitioner = new FlinkStreamPartitioner<T>(channelComputer);
        PartitionTransformation partitioned = new PartitionTransformation(input.getTransformation(), partitioner);
        partitioned.setParallelism(parallelism == null ? input.getParallelism() : parallelism.intValue());
        return new DataStream(input.getExecutionEnvironment(), (Transformation)partitioned);
    }

    public static <T> DataStream<T> rebalance(DataStream<T> input, Integer parallelism) {
        RebalancePartitioner partitioner = new RebalancePartitioner();
        PartitionTransformation partitioned = new PartitionTransformation(input.getTransformation(), (StreamPartitioner)partitioner);
        if (parallelism != null) {
            partitioned.setParallelism(parallelism.intValue());
        }
        return new DataStream(input.getExecutionEnvironment(), (Transformation)partitioned);
    }
}

