/*
 * Decompiled with CFR 0.152.
 */
package com.taotao.cloud.cache.redis.delay.listener;

import cn.hutool.core.util.StrUtil;
import com.fasterxml.jackson.databind.JsonNode;
import com.taotao.cloud.cache.redis.delay.listener.AbstractRedissonListenerContainer;
import com.taotao.cloud.cache.redis.delay.listener.ContainerProperties;
import com.taotao.cloud.cache.redis.delay.listener.SimpleRedissonMessageListenerAdapter;
import com.taotao.cloud.cache.redis.delay.message.FastJsonCodec;
import com.taotao.cloud.cache.redis.delay.message.RedissonMessage;
import com.taotao.cloud.common.utils.common.JsonUtils;
import com.taotao.cloud.common.utils.log.LogUtils;
import java.util.Map;
import org.redisson.Redisson;
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RFuture;
import org.redisson.client.RedisException;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.decoder.ListObjectDecoder;
import org.redisson.client.protocol.decoder.MultiDecoder;
import org.redisson.command.CommandAsyncExecutor;

public class SimpleRedissonListenerContainer
extends AbstractRedissonListenerContainer {
    private RedisCommand<Object> LPOP_VALUE = new RedisCommand("LPOP", (MultiDecoder)new ListObjectDecoder(1));
    private AsyncMessageProcessingConsumer takeMessageTask;

    public SimpleRedissonListenerContainer(ContainerProperties containerProperties) {
        super(containerProperties);
    }

    @Override
    protected void doStart() {
        this.takeMessageTask = new AsyncMessageProcessingConsumer();
        this.getTaskExecutor().execute(this.takeMessageTask);
    }

    @Override
    protected void doStop() {
        this.takeMessageTask.stop();
    }

    private final class AsyncMessageProcessingConsumer
    implements Runnable {
        private volatile Thread currentThread = null;
        private volatile AbstractRedissonListenerContainer.ConsumerStatus status = AbstractRedissonListenerContainer.ConsumerStatus.CREATED;

        private AsyncMessageProcessingConsumer() {
        }

        @Override
        public void run() {
            if (this.status != AbstractRedissonListenerContainer.ConsumerStatus.CREATED) {
                LogUtils.info((String)"consumer currentThread [{}] will exit, because consumer status is {},expected is CREATED", (Object[])new Object[]{this.currentThread.getName(), this.status});
                return;
            }
            String queue = SimpleRedissonListenerContainer.this.getContainerProperties().getQueue();
            Redisson redisson = (Redisson)SimpleRedissonListenerContainer.this.getRedissonClient();
            RBlockingQueue blockingQueue = redisson.getBlockingQueue(queue, (Codec)FastJsonCodec.INSTANCE);
            if (blockingQueue == null) {
                LogUtils.error((String)"error occurred while create blockingQueue for queue [{}]", (Object[])new Object[]{queue});
                return;
            }
            CommandAsyncExecutor commandExecutor = redisson.getCommandExecutor();
            this.currentThread = Thread.currentThread();
            this.status = AbstractRedissonListenerContainer.ConsumerStatus.RUNNING;
            long maxWaitMillis = 100L;
            long emptyFetchTimes = 0L;
            do {
                try {
                    RFuture asyncResult = commandExecutor.writeAsync(blockingQueue.getName(), blockingQueue.getCodec(), SimpleRedissonListenerContainer.this.LPOP_VALUE, new Object[]{blockingQueue.getName()});
                    String message = (String)commandExecutor.get(asyncResult);
                    if (StrUtil.isBlank((CharSequence)message)) {
                        Thread.sleep(Math.min(++emptyFetchTimes * 5L, 100L));
                        continue;
                    }
                    System.out.println("message:" + message);
                    emptyFetchTimes = 0L;
                    JsonNode jsonNode = JsonUtils.parse((String)message);
                    String payload = jsonNode.get("payload").toString();
                    Map headers = JsonUtils.readMap((String)jsonNode.get("headers").toString());
                    RedissonMessage redissonMessage = new RedissonMessage(payload, headers);
                    SimpleRedissonMessageListenerAdapter redissonListener = (SimpleRedissonMessageListenerAdapter)SimpleRedissonListenerContainer.this.getRedissonListener();
                    redissonListener.onMessage(redissonMessage);
                }
                catch (InterruptedException | RedisException asyncResult) {
                }
                catch (Exception e) {
                    LogUtils.error((String)"error occurred while take message from redisson", (Object[])new Object[]{e});
                }
            } while (this.status != AbstractRedissonListenerContainer.ConsumerStatus.STOPPED);
            LogUtils.info((String)"consumer currentThread [{}] will exit, because of STOPPED status", (Object[])new Object[]{this.currentThread.getName()});
            this.currentThread = null;
        }

        private void stop() {
            if (this.currentThread != null) {
                this.status = AbstractRedissonListenerContainer.ConsumerStatus.STOPPED;
                this.currentThread.interrupt();
            }
        }
    }
}

