/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.sink;

import java.util.Map;
import java.util.UUID;
import javax.annotation.Nullable;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.paimon.flink.sink.ChannelComputer;
import org.apache.paimon.flink.sink.FlinkStreamPartitioner;
import org.apache.paimon.flink.sink.FlinkWriteSink;
import org.apache.paimon.flink.sink.HashBucketAssignerOperator;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.PartitionKeyExtractor;
import org.apache.paimon.utils.SerializableFunction;

public abstract class DynamicBucketSink<T>
extends FlinkWriteSink<Tuple2<T, Integer>> {
    private static final long serialVersionUID = 1L;

    public DynamicBucketSink(FileStoreTable table, @Nullable Map<String, String> overwritePartition) {
        super(table, overwritePartition);
    }

    protected abstract ChannelComputer<T> channelComputer1();

    protected abstract ChannelComputer<Tuple2<T, Integer>> channelComputer2();

    protected abstract SerializableFunction<TableSchema, PartitionKeyExtractor<T>> extractorFunction();

    public DataStreamSink<?> build(DataStream<T> input, @Nullable Integer parallelism) {
        String initialCommitUser = UUID.randomUUID().toString();
        Integer assignerParallelism = this.table.coreOptions().dynamicBucketAssignerParallelism();
        if (assignerParallelism == null) {
            assignerParallelism = parallelism;
        }
        DataStream<T> partitionByKeyHash = FlinkStreamPartitioner.partition(input, this.channelComputer1(), assignerParallelism);
        HashBucketAssignerOperator<T> assignerOperator = new HashBucketAssignerOperator<T>(initialCommitUser, this.table, this.extractorFunction(), false);
        TupleTypeInfo rowWithBucketType = new TupleTypeInfo(new TypeInformation[]{partitionByKeyHash.getType(), BasicTypeInfo.INT_TYPE_INFO});
        SingleOutputStreamOperator bucketAssigned = partitionByKeyHash.transform("dynamic-bucket-assigner", (TypeInformation)rowWithBucketType, assignerOperator).setParallelism(partitionByKeyHash.getParallelism());
        DataStream<Tuple2<T, Integer>> partitionByBucket = FlinkStreamPartitioner.partition(bucketAssigned, this.channelComputer2(), parallelism);
        return this.sinkFrom(partitionByBucket, initialCommitUser);
    }
}

