/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.pulsar.sink;

import java.io.IOException;
import java.util.List;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClient;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.seatunnel.api.sink.SinkCommitter;
import org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarClientConfig;
import org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarConfigUtil;
import org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarSemantics;
import org.apache.seatunnel.connectors.seatunnel.pulsar.state.PulsarCommitInfo;

public class PulsarSinkCommitter
implements SinkCommitter<PulsarCommitInfo> {
    private PulsarClientConfig clientConfig;
    private PulsarClient pulsarClient;
    private TransactionCoordinatorClient coordinatorClient;

    public PulsarSinkCommitter(PulsarClientConfig clientConfig) {
        this.clientConfig = clientConfig;
    }

    public List<PulsarCommitInfo> commit(List<PulsarCommitInfo> commitInfos) throws IOException {
        if (commitInfos.isEmpty()) {
            return commitInfos;
        }
        TransactionCoordinatorClient client = this.transactionCoordinatorClient();
        for (PulsarCommitInfo pulsarCommitInfo : commitInfos) {
            TxnID txnID = pulsarCommitInfo.getTxnID();
            client.commit(txnID);
        }
        return commitInfos;
    }

    public void abort(List<PulsarCommitInfo> commitInfos) throws IOException {
        if (commitInfos.isEmpty()) {
            return;
        }
        TransactionCoordinatorClient client = this.transactionCoordinatorClient();
        for (PulsarCommitInfo commitInfo : commitInfos) {
            TxnID txnID = commitInfo.getTxnID();
            client.abort(txnID);
        }
        if (this.pulsarClient != null) {
            this.pulsarClient.close();
        }
    }

    private TransactionCoordinatorClient transactionCoordinatorClient() throws PulsarClientException {
        if (this.coordinatorClient == null) {
            this.pulsarClient = PulsarConfigUtil.createClient(this.clientConfig, PulsarSemantics.EXACTLY_ONCE);
            this.coordinatorClient = PulsarConfigUtil.getTcClient(this.pulsarClient);
        }
        return this.coordinatorClient;
    }
}

