/*
 * Decompiled with CFR 0.152.
 */
package com.taotao.cloud.mq.kafka.kafka.core;

import com.taotao.cloud.common.utils.log.LogUtils;
import com.taotao.cloud.mq.common.Message;
import com.taotao.cloud.mq.common.MessageQueueProvider;
import com.taotao.cloud.mq.common.producer.MessageQueueProducerException;
import com.taotao.cloud.mq.common.producer.MessageSendCallback;
import com.taotao.cloud.mq.common.producer.MessageSendResult;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;

public class KafkaProvider
implements MessageQueueProvider {
    private static final String KAFKA_PROVIDER_SEND_INTERRUPTED = "KafkaProvider send interrupted: {}";
    private static final String KAFKA_PROVIDER_CONSUME_ERROR = "KafkaProvider send error: {}";
    private final KafkaTemplate<String, String> kafkaTemplate;

    public KafkaProvider(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public MessageSendResult syncSend(Message message) throws MessageQueueProducerException {
        try {
            ListenableFuture future = this.kafkaTemplate.send(message.getTopic(), (Object)message.getBody());
            SendResult sendResult = (SendResult)future.get();
            return this.transfer((SendResult<String, String>)sendResult);
        }
        catch (InterruptedException e) {
            LogUtils.error((String)KAFKA_PROVIDER_SEND_INTERRUPTED, (Object[])new Object[]{e.getMessage(), e});
            Thread.currentThread().interrupt();
            throw new MessageQueueProducerException(e.getMessage());
        }
        catch (Exception e) {
            LogUtils.error((String)KAFKA_PROVIDER_CONSUME_ERROR, (Object[])new Object[]{e.getMessage(), e});
            throw new MessageQueueProducerException(e.getMessage());
        }
    }

    public void asyncSend(Message message, final MessageSendCallback messageCallback) throws MessageQueueProducerException {
        try {
            ListenableFuture future = this.kafkaTemplate.send(message.getTopic(), (Object)message.getBody());
            future.addCallback((ListenableFutureCallback)new ListenableFutureCallback<SendResult<String, String>>(){

                public void onSuccess(SendResult<String, String> sendResult) {
                    messageCallback.onSuccess(KafkaProvider.this.transfer(sendResult));
                }

                public void onFailure(Throwable e) {
                    messageCallback.onFailed(e);
                }
            });
        }
        catch (Exception e) {
            LogUtils.error((String)KAFKA_PROVIDER_CONSUME_ERROR, (Object[])new Object[]{e.getMessage(), e});
            throw new MessageQueueProducerException(e.getMessage());
        }
    }

    private MessageSendResult transfer(SendResult<String, String> sendResult) {
        ProducerRecord producerRecord = sendResult.getProducerRecord();
        RecordMetadata recordMetadata = sendResult.getRecordMetadata();
        MessageSendResult result = new MessageSendResult();
        result.setTopic(producerRecord.topic());
        result.setPartition(Integer.valueOf(recordMetadata.partition()));
        result.setOffset(Long.valueOf(recordMetadata.offset()));
        return result;
    }
}

