/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.Future;
import org.apache.kafka.clients.consumer.CommitFailedException;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.InvalidProducerEpochException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.UnknownProducerIdException;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.KafkaClientSupplier;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskMigratedException;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.ClientUtils;
import org.apache.kafka.streams.processor.internals.StreamThread;
import org.slf4j.Logger;

public class StreamsProducer {
    private final Logger log;
    private final String logPrefix;
    private final Map<String, Object> eosV2ProducerConfigs;
    private final KafkaClientSupplier clientSupplier;
    private final StreamThread.ProcessingMode processingMode;
    private Producer<byte[], byte[]> producer;
    private boolean transactionInFlight = false;
    private boolean transactionInitialized = false;

    public StreamsProducer(StreamsConfig config, String threadId, KafkaClientSupplier clientSupplier, TaskId taskId, UUID processId, LogContext logContext) {
        Map<String, Object> producerConfigs;
        Objects.requireNonNull(config, "config cannot be null");
        Objects.requireNonNull(threadId, "threadId cannot be null");
        this.clientSupplier = Objects.requireNonNull(clientSupplier, "clientSupplier cannot be null");
        this.log = Objects.requireNonNull(logContext, "logContext cannot be null").logger(this.getClass());
        this.logPrefix = logContext.logPrefix().trim();
        this.processingMode = StreamThread.processingMode(config);
        switch (this.processingMode) {
            case AT_LEAST_ONCE: {
                producerConfigs = config.getProducerConfigs(ClientUtils.getThreadProducerClientId(threadId));
                this.eosV2ProducerConfigs = null;
                break;
            }
            case EXACTLY_ONCE_ALPHA: {
                producerConfigs = config.getProducerConfigs(ClientUtils.getTaskProducerClientId(threadId, Objects.requireNonNull(taskId, "taskId cannot be null for exactly-once alpha")));
                String applicationId = config.getString("application.id");
                producerConfigs.put("transactional.id", applicationId + "-" + taskId);
                this.eosV2ProducerConfigs = null;
                break;
            }
            case EXACTLY_ONCE_V2: {
                producerConfigs = config.getProducerConfigs(ClientUtils.getThreadProducerClientId(threadId));
                String applicationId = config.getString("application.id");
                producerConfigs.put("transactional.id", applicationId + "-" + Objects.requireNonNull(processId, "processId cannot be null for exactly-once v2") + "-" + threadId.split("-StreamThread-")[1]);
                this.eosV2ProducerConfigs = producerConfigs;
                break;
            }
            default: {
                throw new IllegalArgumentException("Unknown processing mode: " + (Object)((Object)this.processingMode));
            }
        }
        this.producer = clientSupplier.getProducer(producerConfigs);
    }

    private String formatException(String message) {
        return message + " [" + this.logPrefix + "]";
    }

    boolean eosEnabled() {
        return StreamThread.eosEnabled(this.processingMode);
    }

    void initTransaction() {
        if (!this.eosEnabled()) {
            throw new IllegalStateException(this.formatException("Exactly-once is not enabled"));
        }
        if (!this.transactionInitialized) {
            try {
                this.producer.initTransactions();
                this.transactionInitialized = true;
            }
            catch (TimeoutException timeoutException) {
                this.log.warn("Timeout exception caught trying to initialize transactions. The broker is either slow or in bad state (like not having enough replicas) in responding to the request, or the connection to broker was interrupted sending the request or receiving the response. Will retry initializing the task in the next loop. Consider overwriting {} to a larger value to avoid timeout errors", (Object)"max.block.ms");
                throw timeoutException;
            }
            catch (KafkaException exception) {
                throw new StreamsException(this.formatException("Error encountered trying to initialize transactions"), exception);
            }
        }
    }

    public void resetProducer() {
        if (this.processingMode != StreamThread.ProcessingMode.EXACTLY_ONCE_V2) {
            throw new IllegalStateException("Expected eos-v2 to be enabled, but the processing mode was " + (Object)((Object)this.processingMode));
        }
        this.producer.close();
        this.producer = this.clientSupplier.getProducer(this.eosV2ProducerConfigs);
        this.transactionInitialized = false;
    }

    private void maybeBeginTransaction() {
        if (this.eosEnabled() && !this.transactionInFlight) {
            try {
                this.producer.beginTransaction();
                this.transactionInFlight = true;
            }
            catch (InvalidProducerEpochException | ProducerFencedException error) {
                throw new TaskMigratedException(this.formatException("Producer got fenced trying to begin a new transaction"), error);
            }
            catch (KafkaException error) {
                throw new StreamsException(this.formatException("Error encountered trying to begin a new transaction"), error);
            }
        }
    }

    Future<RecordMetadata> send(ProducerRecord<byte[], byte[]> record, Callback callback) {
        this.maybeBeginTransaction();
        try {
            return this.producer.send(record, callback);
        }
        catch (KafkaException uncaughtException) {
            if (StreamsProducer.isRecoverable(uncaughtException)) {
                throw new TaskMigratedException(this.formatException("Producer got fenced trying to send a record"), uncaughtException.getCause());
            }
            throw new StreamsException(this.formatException(String.format("Error encountered trying to send record to topic %s", record.topic())), uncaughtException);
        }
    }

    private static boolean isRecoverable(KafkaException uncaughtException) {
        return uncaughtException.getCause() instanceof ProducerFencedException || uncaughtException.getCause() instanceof InvalidProducerEpochException || uncaughtException.getCause() instanceof UnknownProducerIdException;
    }

    protected void commitTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, ConsumerGroupMetadata consumerGroupMetadata) {
        if (!this.eosEnabled()) {
            throw new IllegalStateException(this.formatException("Exactly-once is not enabled"));
        }
        this.maybeBeginTransaction();
        try {
            ConsumerGroupMetadata maybeDowngradedGroupMetadata = this.processingMode == StreamThread.ProcessingMode.EXACTLY_ONCE_V2 ? consumerGroupMetadata : new ConsumerGroupMetadata(consumerGroupMetadata.groupId());
            this.producer.sendOffsetsToTransaction(offsets, maybeDowngradedGroupMetadata);
            this.producer.commitTransaction();
            this.transactionInFlight = false;
        }
        catch (CommitFailedException | InvalidProducerEpochException | ProducerFencedException error) {
            throw new TaskMigratedException(this.formatException("Producer got fenced trying to commit a transaction"), error);
        }
        catch (TimeoutException timeoutException) {
            throw timeoutException;
        }
        catch (KafkaException error) {
            throw new StreamsException(this.formatException("Error encountered trying to commit a transaction"), error);
        }
    }

    void abortTransaction() {
        if (!this.eosEnabled()) {
            throw new IllegalStateException(this.formatException("Exactly-once is not enabled"));
        }
        if (this.transactionInFlight) {
            try {
                this.producer.abortTransaction();
            }
            catch (TimeoutException logAndSwallow) {
                this.log.warn("Aborting transaction failed due to timeout. Will rely on broker to eventually abort the transaction after the transaction timeout passed.", logAndSwallow);
            }
            catch (InvalidProducerEpochException | ProducerFencedException error) {
                this.log.debug("Encountered {} while aborting the transaction; this is expected and hence swallowed", (Object)error.getMessage());
            }
            catch (KafkaException error) {
                throw new StreamsException(this.formatException("Error encounter trying to abort a transaction"), error);
            }
            this.transactionInFlight = false;
        }
    }

    List<PartitionInfo> partitionsFor(String topic) {
        return this.producer.partitionsFor(topic);
    }

    Map<MetricName, ? extends Metric> metrics() {
        return this.producer.metrics();
    }

    void flush() {
        this.producer.flush();
    }

    void close() {
        this.producer.close();
    }

    Producer<byte[], byte[]> kafkaProducer() {
        return this.producer;
    }
}

