/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.sink.cdc;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.flink.sink.ChannelComputer;
import org.apache.paimon.flink.sink.Committable;
import org.apache.paimon.flink.sink.DynamicBucketSink;
import org.apache.paimon.flink.sink.StoreSinkWrite;
import org.apache.paimon.flink.sink.cdc.CdcDynamicBucketWriteOperator;
import org.apache.paimon.flink.sink.cdc.CdcHashKeyChannelComputer;
import org.apache.paimon.flink.sink.cdc.CdcRecord;
import org.apache.paimon.flink.sink.cdc.CdcRecordKeyAndBucketExtractor;
import org.apache.paimon.flink.sink.cdc.CdcWithBucketChannelComputer;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.PartitionKeyExtractor;
import org.apache.paimon.utils.SerializableFunction;

public class CdcDynamicBucketSink
extends DynamicBucketSink<CdcRecord> {
    private static final long serialVersionUID = 1L;

    public CdcDynamicBucketSink(FileStoreTable table) {
        super(table, null);
    }

    @Override
    protected ChannelComputer<CdcRecord> channelComputer1() {
        return new CdcHashKeyChannelComputer(this.table.schema());
    }

    @Override
    protected ChannelComputer<Tuple2<CdcRecord, Integer>> channelComputer2() {
        return new CdcWithBucketChannelComputer(this.table.schema());
    }

    @Override
    protected SerializableFunction<TableSchema, PartitionKeyExtractor<CdcRecord>> extractorFunction() {
        return schema -> {
            final CdcRecordKeyAndBucketExtractor extractor = new CdcRecordKeyAndBucketExtractor((TableSchema)schema);
            return new PartitionKeyExtractor<CdcRecord>(){

                @Override
                public BinaryRow partition(CdcRecord record) {
                    extractor.setRecord(record);
                    return extractor.partition();
                }

                @Override
                public BinaryRow trimmedPrimaryKey(CdcRecord record) {
                    extractor.setRecord(record);
                    return extractor.trimmedPrimaryKey();
                }
            };
        };
    }

    @Override
    protected OneInputStreamOperator<Tuple2<CdcRecord, Integer>, Committable> createWriteOperator(StoreSinkWrite.Provider writeProvider, String commitUser) {
        return new CdcDynamicBucketWriteOperator(this.table, writeProvider, commitUser);
    }
}

