/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.kafka.sink;

import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Deque;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.DefaultKafkaSinkContext;
import org.apache.flink.connector.kafka.sink.KafkaCommittable;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaWriter;
import org.apache.flink.connector.kafka.sink.KafkaWriterState;
import org.apache.flink.connector.kafka.sink.TransactionAborter;
import org.apache.flink.connector.kafka.sink.internal.FlinkKafkaInternalProducer;
import org.apache.flink.connector.kafka.sink.internal.TransactionalIdFactory;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class ExactlyOnceKafkaWriter<IN>
extends KafkaWriter<IN> {
    private static final Logger LOG = LoggerFactory.getLogger(ExactlyOnceKafkaWriter.class);
    private final String transactionalIdPrefix;
    private final KafkaWriterState kafkaWriterState;
    private final Deque<FlinkKafkaInternalProducer<byte[], byte[]>> producerPool = new ArrayDeque<FlinkKafkaInternalProducer<byte[], byte[]>>();
    private final Collection<KafkaWriterState> recoveredStates;
    private long lastCheckpointId;
    private final Deque<Closeable> producerCloseables = new ArrayDeque<Closeable>();

    ExactlyOnceKafkaWriter(DeliveryGuarantee deliveryGuarantee, Properties kafkaProducerConfig, String transactionalIdPrefix, Sink.InitContext sinkInitContext, KafkaRecordSerializationSchema<IN> recordSerializer, SerializationSchema.InitializationContext schemaContext, Collection<KafkaWriterState> recoveredStates) {
        this(deliveryGuarantee, kafkaProducerConfig, transactionalIdPrefix, sinkInitContext, recordSerializer, schemaContext, recoveredStates, new DefaultKafkaSinkContext(sinkInitContext.getSubtaskId(), sinkInitContext.getNumberOfParallelSubtasks(), kafkaProducerConfig));
    }

    ExactlyOnceKafkaWriter(DeliveryGuarantee deliveryGuarantee, Properties kafkaProducerConfig, String transactionalIdPrefix, Sink.InitContext sinkInitContext, KafkaRecordSerializationSchema<IN> recordSerializer, SerializationSchema.InitializationContext schemaContext, Collection<KafkaWriterState> recoveredStates, KafkaRecordSerializationSchema.KafkaSinkContext kafkaSinkContext) {
        super(deliveryGuarantee, kafkaProducerConfig, sinkInitContext, recordSerializer, schemaContext, kafkaSinkContext);
        this.transactionalIdPrefix = (String)Preconditions.checkNotNull((Object)transactionalIdPrefix, (String)"transactionalIdPrefix must not be null");
        try {
            recordSerializer.open(schemaContext, kafkaSinkContext);
        }
        catch (Exception e) {
            throw new FlinkRuntimeException("Cannot initialize schema.", (Throwable)e);
        }
        this.kafkaWriterState = new KafkaWriterState(transactionalIdPrefix);
        this.lastCheckpointId = sinkInitContext.getRestoredCheckpointId().orElse(0L);
        this.recoveredStates = (Collection)Preconditions.checkNotNull(recoveredStates, (String)"recoveredStates");
        this.initFlinkMetrics();
    }

    @Override
    public void initialize() {
        this.abortLingeringTransactions(this.recoveredStates, this.lastCheckpointId + 1L);
        this.currentProducer = this.getTransactionalProducer(this.lastCheckpointId + 1L);
        this.currentProducer.beginTransaction();
    }

    @Override
    public Collection<KafkaCommittable> prepareCommit() {
        if (this.currentProducer.hasRecordsInTransaction()) {
            List<KafkaCommittable> committables = Collections.singletonList(KafkaCommittable.of(this.currentProducer, this.producerPool::add));
            LOG.debug("Committing {} committables.", committables);
            return committables;
        }
        this.currentProducer.commitTransaction();
        this.producerPool.add(this.currentProducer);
        return Collections.emptyList();
    }

    @Override
    public List<KafkaWriterState> snapshotState(long checkpointId) throws IOException {
        this.currentProducer = this.getTransactionalProducer(checkpointId + 1L);
        this.currentProducer.beginTransaction();
        return Collections.singletonList(this.kafkaWriterState);
    }

    @Override
    public void close() throws Exception {
        IOUtils.closeAll((AutoCloseable[])new AutoCloseable[]{this::abortCurrentProducer, () -> IOUtils.closeAll(this.producerPool), () -> IOUtils.closeAll(this.producerCloseables), () -> super.close()});
    }

    private void abortCurrentProducer() {
        if (this.currentProducer.isInTransaction() && this.currentProducer.hasRecordsInTransaction()) {
            try {
                this.currentProducer.abortTransaction();
            }
            catch (ProducerFencedException e) {
                LOG.debug("Producer {} fenced while aborting", (Object)this.currentProducer.getTransactionalId());
            }
        }
    }

    @VisibleForTesting
    Collection<FlinkKafkaInternalProducer<byte[], byte[]>> getProducerPool() {
        return this.producerPool;
    }

    private void abortLingeringTransactions(Collection<KafkaWriterState> recoveredStates, long startCheckpointId) {
        KafkaWriterState lastState;
        ArrayList<String> prefixesToAbort = new ArrayList<String>();
        prefixesToAbort.add(this.transactionalIdPrefix);
        Optional<KafkaWriterState> lastStateOpt = recoveredStates.stream().findFirst();
        if (lastStateOpt.isPresent() && !(lastState = lastStateOpt.get()).getTransactionalIdPrefix().equals(this.transactionalIdPrefix)) {
            prefixesToAbort.add(lastState.getTransactionalIdPrefix());
            LOG.warn("Transactional id prefix from previous execution {} has changed to {}.", (Object)lastState.getTransactionalIdPrefix(), (Object)this.transactionalIdPrefix);
        }
        try (TransactionAborter transactionAborter = new TransactionAborter(this.kafkaSinkContext.getParallelInstanceId(), this.kafkaSinkContext.getNumberOfParallelInstances(), this::getOrCreateTransactionalProducer, this.producerPool::add);){
            transactionAborter.abortLingeringTransactions(prefixesToAbort, startCheckpointId);
        }
    }

    private FlinkKafkaInternalProducer<byte[], byte[]> getTransactionalProducer(long checkpointId) {
        Preconditions.checkState((checkpointId > this.lastCheckpointId ? 1 : 0) != 0, (String)"Expected %s > %s", (Object[])new Object[]{checkpointId, this.lastCheckpointId});
        FlinkKafkaInternalProducer<byte[], byte[]> producer = null;
        for (long id = this.lastCheckpointId + 1L; id <= checkpointId; ++id) {
            String transactionalId = TransactionalIdFactory.buildTransactionalId(this.transactionalIdPrefix, this.kafkaSinkContext.getParallelInstanceId(), id);
            producer = this.getOrCreateTransactionalProducer(transactionalId);
        }
        this.lastCheckpointId = checkpointId;
        assert (producer != null);
        LOG.info("Created new transactional producer {}", (Object)producer.getTransactionalId());
        return producer;
    }

    private FlinkKafkaInternalProducer<byte[], byte[]> getOrCreateTransactionalProducer(String transactionalId) {
        FlinkKafkaInternalProducer<Object, Object> producer = this.producerPool.poll();
        if (producer == null) {
            producer = new FlinkKafkaInternalProducer(this.kafkaProducerConfig, transactionalId);
            this.producerCloseables.add(producer);
            producer.initTransactions();
            this.initKafkaMetrics(producer);
        } else {
            producer.initTransactionId(transactionalId);
        }
        return producer;
    }
}

