/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.kafka.sink;

import com.google.common.collect.Lists;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.seatunnel.connectors.seatunnel.kafka.sink.KafkaInternalProducer;
import org.apache.seatunnel.connectors.seatunnel.kafka.sink.KafkaProduceSender;
import org.apache.seatunnel.connectors.seatunnel.kafka.sink.KafkaSinkWriter;
import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaCommitInfo;
import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaSinkState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaTransactionSender<K, V>
implements KafkaProduceSender<K, V> {
    private static final Logger log = LoggerFactory.getLogger(KafkaTransactionSender.class);
    private KafkaInternalProducer<K, V> kafkaProducer;
    private String transactionId;
    private final String transactionPrefix;
    private final Properties kafkaProperties;

    public KafkaTransactionSender(String transactionPrefix, Properties kafkaProperties) {
        this.transactionPrefix = transactionPrefix;
        this.kafkaProperties = kafkaProperties;
    }

    @Override
    public void send(ProducerRecord<K, V> producerRecord) {
        this.kafkaProducer.send(producerRecord);
    }

    @Override
    public void beginTransaction(String transactionId) {
        this.transactionId = transactionId;
        this.kafkaProducer = this.getTransactionProducer(this.kafkaProperties, transactionId);
        this.kafkaProducer.beginTransaction();
    }

    @Override
    public Optional<KafkaCommitInfo> prepareCommit() {
        KafkaCommitInfo kafkaCommitInfo = new KafkaCommitInfo(this.transactionId, this.kafkaProperties, this.kafkaProducer.getProducerId(), this.kafkaProducer.getEpoch());
        return Optional.of(kafkaCommitInfo);
    }

    @Override
    public void abortTransaction() {
        this.kafkaProducer.abortTransaction();
    }

    @Override
    public void abortTransaction(long checkpointId) {
        KafkaInternalProducer<K, V> producer = this.kafkaProducer != null ? this.kafkaProducer : this.getTransactionProducer(this.kafkaProperties, KafkaSinkWriter.generateTransactionId(this.transactionPrefix, checkpointId));
        long i = checkpointId;
        while (true) {
            String transactionId = KafkaSinkWriter.generateTransactionId(this.transactionPrefix, i);
            producer.setTransactionalId(transactionId);
            if (log.isDebugEnabled()) {
                log.debug("Abort kafka transaction: {}", (Object)transactionId);
            }
            producer.flush();
            if (producer.getEpoch() == 0) break;
            ++i;
        }
    }

    @Override
    public List<KafkaSinkState> snapshotState(long checkpointId) {
        return Lists.newArrayList(new KafkaSinkState(this.transactionId, this.transactionPrefix, checkpointId, this.kafkaProperties));
    }

    @Override
    public void close() {
        if (this.kafkaProducer != null) {
            this.kafkaProducer.flush();
            this.kafkaProducer.close();
        }
    }

    private KafkaInternalProducer<K, V> getTransactionProducer(Properties properties, String transactionId) {
        this.close();
        Properties transactionProperties = (Properties)properties.clone();
        transactionProperties.put("transactional.id", transactionId);
        KafkaInternalProducer transactionProducer = new KafkaInternalProducer(transactionProperties, transactionId);
        transactionProducer.initTransactions();
        return transactionProducer;
    }
}

