/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.kafka.sink;

import java.util.List;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.sink.SinkCommitter;
import org.apache.seatunnel.connectors.seatunnel.kafka.sink.KafkaInternalProducer;
import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaCommitInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaSinkCommitter
implements SinkCommitter<KafkaCommitInfo> {
    private static final Logger log = LoggerFactory.getLogger(KafkaSinkCommitter.class);
    private final ReadonlyConfig pluginConfig;
    private KafkaInternalProducer<?, ?> kafkaProducer;

    public KafkaSinkCommitter(ReadonlyConfig pluginConfig) {
        this.pluginConfig = pluginConfig;
    }

    public List<KafkaCommitInfo> commit(List<KafkaCommitInfo> commitInfos) {
        if (commitInfos.isEmpty()) {
            return commitInfos;
        }
        for (KafkaCommitInfo commitInfo : commitInfos) {
            String transactionId = commitInfo.getTransactionId();
            if (log.isDebugEnabled()) {
                log.debug("Committing transaction {}", (Object)transactionId);
            }
            KafkaInternalProducer<?, ?> producer = this.getProducer(commitInfo);
            ((KafkaProducer)producer).commitTransaction();
            producer.flush();
        }
        if (this.kafkaProducer != null) {
            this.kafkaProducer.close();
            this.kafkaProducer = null;
        }
        return commitInfos;
    }

    public void abort(List<KafkaCommitInfo> commitInfos) {
        if (commitInfos.isEmpty()) {
            return;
        }
        for (KafkaCommitInfo commitInfo : commitInfos) {
            KafkaInternalProducer<?, ?> producer = this.getProducer(commitInfo);
            ((KafkaProducer)producer).abortTransaction();
        }
        if (this.kafkaProducer != null) {
            this.kafkaProducer.close();
            this.kafkaProducer = null;
        }
    }

    private KafkaInternalProducer<?, ?> getProducer(KafkaCommitInfo commitInfo) {
        if (this.kafkaProducer != null) {
            this.kafkaProducer.setTransactionalId(commitInfo.getTransactionId());
        } else {
            Properties kafkaProperties = commitInfo.getKafkaProperties();
            kafkaProperties.setProperty("transactional.id", commitInfo.getTransactionId());
            this.kafkaProducer = new KafkaInternalProducer(commitInfo.getKafkaProperties(), commitInfo.getTransactionId());
        }
        this.kafkaProducer.resumeTransaction(commitInfo.getProducerId(), commitInfo.getEpoch());
        return this.kafkaProducer;
    }
}

