/*
 * Decompiled with CFR 0.152.
 */
package com.zendesk.maxwell.producer;

import com.zendesk.maxwell.MaxwellContext;
import com.zendesk.maxwell.producer.AbstractAsyncProducer;
import com.zendesk.maxwell.producer.KafkaCallback;
import com.zendesk.maxwell.producer.MaxwellKafkaProducer;
import com.zendesk.maxwell.producer.partitioners.MaxwellKafkaPartitioner;
import com.zendesk.maxwell.row.RowIdentity;
import com.zendesk.maxwell.row.RowMap;
import com.zendesk.maxwell.schema.ddl.DDLMap;
import com.zendesk.maxwell.util.StoppableTask;
import com.zendesk.maxwell.util.StoppableTaskState;
import com.zendesk.maxwell.util.TopicInterpolator;
import java.util.Properties;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeoutException;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class MaxwellKafkaProducerWorker
extends AbstractAsyncProducer
implements Runnable,
StoppableTask {
    static final Logger LOGGER = LoggerFactory.getLogger(MaxwellKafkaProducer.class);
    private final Producer<String, String> kafka;
    private final String topic;
    private final String ddlTopic;
    private final MaxwellKafkaPartitioner partitioner;
    private final MaxwellKafkaPartitioner ddlPartitioner;
    private final RowMap.KeyFormat keyFormat;
    private final ArrayBlockingQueue<RowMap> queue;
    private Thread thread;
    private StoppableTaskState taskState;
    private String deadLetterTopic;
    private final ConcurrentLinkedQueue<Pair<ProducerRecord<String, String>, KafkaCallback>> deadLetterQueue;
    private final TopicInterpolator topicInterpolator;

    public static MaxwellKafkaPartitioner makeDDLPartitioner(String partitionHashFunc, String partitionKey) {
        if (partitionKey.equals("table")) {
            return new MaxwellKafkaPartitioner(partitionHashFunc, "table", null, "database");
        }
        return new MaxwellKafkaPartitioner(partitionHashFunc, "database", null, null);
    }

    public MaxwellKafkaProducerWorker(MaxwellContext context, String kafkaTopic, ArrayBlockingQueue<RowMap> queue, Producer<String, String> producer) {
        super(context);
        this.topic = kafkaTopic == null ? "maxwell" : kafkaTopic;
        this.topicInterpolator = new TopicInterpolator(this.topic);
        this.kafka = producer;
        String hash = context.getConfig().kafkaPartitionHash;
        String partitionKey = context.getConfig().producerPartitionKey;
        String partitionColumns = context.getConfig().producerPartitionColumns;
        String partitionFallback = context.getConfig().producerPartitionFallback;
        this.partitioner = new MaxwellKafkaPartitioner(hash, partitionKey, partitionColumns, partitionFallback);
        this.ddlPartitioner = MaxwellKafkaProducerWorker.makeDDLPartitioner(hash, partitionKey);
        this.ddlTopic = context.getConfig().ddlKafkaTopic;
        this.deadLetterTopic = context.getConfig().deadLetterTopic;
        this.deadLetterQueue = new ConcurrentLinkedQueue();
        this.keyFormat = context.getConfig().kafkaKeyFormat.equals("hash") ? RowMap.KeyFormat.HASH : RowMap.KeyFormat.ARRAY;
        this.queue = queue;
        this.taskState = new StoppableTaskState("MaxwellKafkaProducerWorker");
    }

    public MaxwellKafkaProducerWorker(MaxwellContext context, Properties kafkaProperties, String kafkaTopic, ArrayBlockingQueue<RowMap> queue) {
        this(context, kafkaTopic, queue, (Producer<String, String>)new KafkaProducer(kafkaProperties, (Serializer)new StringSerializer(), (Serializer)new StringSerializer()));
    }

    @Override
    public void run() {
        this.thread = Thread.currentThread();
        try {
            while (true) {
                this.drainDeadLetterQueue();
                RowMap row = this.queue.take();
                if (!this.taskState.isRunning()) {
                    this.taskState.stopped();
                    return;
                }
                this.push(row);
            }
        }
        catch (Exception e) {
            this.taskState.stopped();
            this.context.terminate(e);
            return;
        }
    }

    void drainDeadLetterQueue() {
        Pair<ProducerRecord<String, String>, KafkaCallback> pair;
        while ((pair = this.deadLetterQueue.poll()) != null) {
            this.sendAsync((ProducerRecord<String, String>)((ProducerRecord)pair.getLeft()), (Callback)pair.getRight());
        }
    }

    private Integer getNumPartitions(String topic) {
        try {
            return this.kafka.partitionsFor(topic).size();
        }
        catch (KafkaException e) {
            LOGGER.error("Topic '" + topic + "' name does not exist. Exception: " + e.getLocalizedMessage());
            throw e;
        }
    }

    @Override
    public void sendAsync(RowMap r, AbstractAsyncProducer.CallbackCompleter cc) throws Exception {
        ProducerRecord<String, String> record = this.makeProducerRecord(r);
        String value = KafkaCallback.LOGGER.isDebugEnabled() ? (String)record.value() : null;
        KafkaCallback callback = new KafkaCallback(cc, r.getNextPosition(), r.getRowIdentity(), value, this.succeededMessageCount, this.failedMessageCount, this.succeededMessageMeter, this.failedMessageMeter, this.topic, this.deadLetterTopic, this.context, this);
        this.sendAsync(record, callback);
    }

    public void enqueueFallbackRow(String topic, RowIdentity fallbackRecord, KafkaCallback callback, RecordMetadata md, Exception reason) {
        LOGGER.info("publishing fallback record to " + topic + ": " + fallbackRecord);
        try {
            ProducerRecord<String, String> record = this.makeFallbackRecord(topic, fallbackRecord, reason);
            this.deadLetterQueue.add((Pair<ProducerRecord<String, String>, KafkaCallback>)Pair.of(record, (Object)callback));
        }
        catch (Exception fallbackEx) {
            callback.onCompletion(md, fallbackEx);
        }
    }

    void sendAsync(ProducerRecord<String, String> record, Callback callback) {
        this.kafka.send(record, callback);
    }

    ProducerRecord<String, String> makeProducerRecord(RowMap r) throws Exception {
        ProducerRecord record;
        String key = r.pkToJson(this.keyFormat);
        String value = r.toJSON(this.outputConfig);
        if (r instanceof DDLMap) {
            record = new ProducerRecord(this.ddlTopic, Integer.valueOf(this.ddlPartitioner.kafkaPartition(r, this.getNumPartitions(this.ddlTopic))), (Object)key, (Object)value);
        } else {
            String topic = r.getKafkaTopic();
            if (topic == null) {
                topic = this.topicInterpolator.generateFromRowMap(r);
            }
            LOGGER.debug("context.getConfig().producerPartitionKey = {}", (Object)this.context.getConfig().producerPartitionKey);
            record = new ProducerRecord(topic, Integer.valueOf(this.partitioner.kafkaPartition(r, this.getNumPartitions(topic))), (Object)key, (Object)value);
        }
        return record;
    }

    ProducerRecord<String, String> makeFallbackRecord(String fallbackTopic, RowIdentity pk, Exception reason) throws Exception {
        String key = pk.toKeyJson(this.keyFormat);
        String value = pk.toFallbackValueWithReason(reason.getClass().getSimpleName());
        String topic = new TopicInterpolator(fallbackTopic).generateFromRowIdentity(pk);
        return new ProducerRecord(topic, (Object)key, (Object)value);
    }

    @Override
    public void requestStop() {
        this.taskState.requestStop();
        this.kafka.close();
    }

    @Override
    public void awaitStop(Long timeout) throws TimeoutException {
        this.taskState.awaitStop(this.thread, timeout);
    }

    public void close() {
        this.kafka.close();
    }

    @Override
    public StoppableTask getStoppableTask() {
        return this;
    }
}

