/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.kafka;

import java.util.Objects;
import java.util.Properties;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.producer.Callback;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaException;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.paimon.flink.sink.LogSinkFunction;
import org.apache.paimon.table.sink.SinkRecord;

public class KafkaSinkFunction
extends FlinkKafkaProducer<SinkRecord>
implements LogSinkFunction {
    private LogSinkFunction.WriteCallback writeCallback;

    public KafkaSinkFunction(String defaultTopic, KafkaSerializationSchema<SinkRecord> serializationSchema, Properties producerConfig, FlinkKafkaProducer.Semantic semantic) {
        super(defaultTopic, serializationSchema, producerConfig, semantic);
    }

    public void setWriteCallback(LogSinkFunction.WriteCallback writeCallback) {
        this.writeCallback = writeCallback;
    }

    public void open(OpenContext openContext) throws Exception {
        this.open(new Configuration());
    }

    public void open(Configuration configuration) throws Exception {
        super.open(configuration);
        Callback baseCallback = Objects.requireNonNull(this.callback);
        this.callback = (metadata, exception) -> {
            if (this.writeCallback != null && metadata != null && metadata.hasOffset()) {
                this.writeCallback.onCompletion(metadata.partition(), metadata.offset());
            }
            baseCallback.onCompletion(metadata, exception);
        };
    }

    public void flush() throws FlinkKafkaException {
        super.preCommit((FlinkKafkaProducer.KafkaTransactionState)super.currentTransaction());
    }
}

