/*
 * Decompiled with CFR 0.152.
 */
package com.taotao.cloud.mq.rabbitmq.comsumer;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConsumerCancelledException;
import com.rabbitmq.client.GetResponse;
import com.rabbitmq.client.ShutdownSignalException;
import com.taotao.cloud.common.utils.log.LogUtils;
import com.taotao.cloud.mq.rabbitmq.common.DetailResponse;
import com.taotao.cloud.mq.rabbitmq.comsumer.MessageConsumer;
import com.taotao.cloud.mq.rabbitmq.producer.MessageProcess;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.Connection;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.support.DefaultMessagePropertiesConverter;
import org.springframework.amqp.rabbit.support.MessagePropertiesConverter;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;

public class FastBuildRabbitMqConsumer {
    private ConnectionFactory connectionFactory;

    public FastBuildRabbitMqConsumer(ConnectionFactory connectionFactory) {
        this.connectionFactory = connectionFactory;
    }

    public <T> MessageConsumer buildMessageConsumer(String exchange, String routingKey, final String queue, MessageProcess<T> messageProcess, String type) throws IOException {
        final Connection connection = this.connectionFactory.createConnection();
        this.buildQueue(exchange, routingKey, queue, connection, type);
        DefaultMessagePropertiesConverter messagePropertiesConverter = new DefaultMessagePropertiesConverter();
        Jackson2JsonMessageConverter messageConverter = new Jackson2JsonMessageConverter();
        return new MessageConsumer(){
            Channel channel;
            final /* synthetic */ MessagePropertiesConverter val$messagePropertiesConverter;
            final /* synthetic */ MessageConverter val$messageConverter;
            final /* synthetic */ MessageProcess val$messageProcess;
            {
                this.val$messagePropertiesConverter = messagePropertiesConverter;
                this.val$messageConverter = messageConverter;
                this.val$messageProcess = messageProcess;
                this.channel = connection.createChannel(false);
            }

            @Override
            public DetailResponse consume() {
                try {
                    DetailResponse detailRes;
                    GetResponse response = this.channel.basicGet(queue, false);
                    while (response == null) {
                        response = this.channel.basicGet(queue, false);
                        Thread.sleep(1000L);
                    }
                    Message message = new Message(response.getBody(), this.val$messagePropertiesConverter.toMessageProperties(response.getProps(), response.getEnvelope(), "UTF-8"));
                    Object messageBean = this.val$messageConverter.fromMessage(message);
                    try {
                        detailRes = this.val$messageProcess.process(messageBean);
                    }
                    catch (Exception e) {
                        LogUtils.error((String)"exception", (Object[])new Object[]{e});
                        detailRes = new DetailResponse(false, "process exception: " + e, "");
                    }
                    if (detailRes.isIfSuccess()) {
                        this.channel.basicAck(response.getEnvelope().getDeliveryTag(), false);
                    } else {
                        Thread.sleep(1000L);
                        LogUtils.info((String)("process message failed: " + detailRes.getErrMsg()), (Object[])new Object[0]);
                        this.channel.basicNack(response.getEnvelope().getDeliveryTag(), false, true);
                    }
                    return detailRes;
                }
                catch (InterruptedException e) {
                    LogUtils.error((String)"exception", (Object[])new Object[]{e});
                    return new DetailResponse(false, "interrupted exception " + e.toString(), "");
                }
                catch (ConsumerCancelledException | ShutdownSignalException | IOException e) {
                    LogUtils.error((String)"exception", (Object[])new Object[]{e});
                    try {
                        this.channel.close();
                    }
                    catch (IOException | TimeoutException ex) {
                        LogUtils.error((String)"exception", (Object[])new Object[]{ex});
                    }
                    this.channel = connection.createChannel(false);
                    return new DetailResponse(false, "shutdown or cancelled exception " + e.toString(), "");
                }
                catch (Exception e) {
                    LogUtils.info((String)"exception : ", (Object[])new Object[]{e});
                    try {
                        this.channel.close();
                    }
                    catch (IOException | TimeoutException ex) {
                        ex.printStackTrace();
                    }
                    this.channel = connection.createChannel(false);
                    return new DetailResponse(false, "exception " + e.toString(), "");
                }
            }
        };
    }

    private void buildQueue(String exchange, String routingKey, String queue, Connection connection, String type) throws IOException {
        Channel channel = connection.createChannel(false);
        if (type.equals("direct")) {
            channel.exchangeDeclare(exchange, "direct", true, false, null);
        } else if (type.equals("topic")) {
            channel.exchangeDeclare(exchange, "topic", true, false, null);
        }
        channel.queueDeclare(queue, true, false, false, null);
        channel.queueBind(queue, exchange, routingKey);
        try {
            channel.close();
        }
        catch (TimeoutException e) {
            LogUtils.info((String)"close channel time out ", (Object[])new Object[]{e});
        }
    }
}

