/*
 * Decompiled with CFR 0.152.
 */
package com.taotao.cloud.cache.redis.delay.listener;

import com.taotao.cloud.cache.redis.delay.listener.AbstractRedissonListenerContainer;
import com.taotao.cloud.cache.redis.delay.listener.BatchRedissonMessageListenerAdapter;
import com.taotao.cloud.cache.redis.delay.listener.ContainerProperties;
import com.taotao.cloud.cache.redis.delay.message.FastJsonCodec;
import com.taotao.cloud.cache.redis.delay.message.RedissonMessage;
import com.taotao.cloud.common.utils.common.JsonUtils;
import com.taotao.cloud.common.utils.common.ThreadFactoryCreator;
import com.taotao.cloud.common.utils.log.LogUtils;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.stream.Collectors;
import org.redisson.api.RScript;
import org.redisson.api.RedissonClient;
import org.redisson.client.RedisException;
import org.redisson.client.codec.Codec;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.util.Assert;

public class BatchRedissonListenerContainer
extends AbstractRedissonListenerContainer {
    private final String fetchScript;
    private final int maxFetch;
    private AsyncMessageProcessingConsumer takeMessageTask;

    public int getMaxFetch() {
        return this.maxFetch;
    }

    public BatchRedissonListenerContainer(ContainerProperties containerProperties, int maxFetch) {
        super(containerProperties);
        Assert.isTrue((maxFetch > 0 ? 1 : 0) != 0, (String)"maxFetch must be greater than 0");
        this.maxFetch = maxFetch;
        this.fetchScript = "local expiredValues = redis.call('lrange', KEYS[1], 0, ARGV[1]); if #expiredValues > 0 then redis.call('ltrim', KEYS[1], ARGV[2], -1); end; return expiredValues;";
        this.setTaskExecutor((Executor)new SimpleAsyncTaskExecutor(ThreadFactoryCreator.create((String)"taotao-cloud-redisson-batch-consume-thread")));
    }

    @Override
    protected void doStart() {
        this.takeMessageTask = new AsyncMessageProcessingConsumer();
        this.getTaskExecutor().execute(this.takeMessageTask);
    }

    @Override
    protected void doStop() {
        this.takeMessageTask.stop();
    }

    private final class AsyncMessageProcessingConsumer
    implements Runnable {
        private volatile Thread currentThread = null;
        private volatile AbstractRedissonListenerContainer.ConsumerStatus status = AbstractRedissonListenerContainer.ConsumerStatus.CREATED;

        private AsyncMessageProcessingConsumer() {
        }

        @Override
        public void run() {
            if (this.status != AbstractRedissonListenerContainer.ConsumerStatus.CREATED) {
                LogUtils.info((String)"consumer currentThread [{}] will exit, because consumer status is {},expected is CREATED", (Object[])new Object[]{this.currentThread.getName(), this.status});
                return;
            }
            this.currentThread = Thread.currentThread();
            this.status = AbstractRedissonListenerContainer.ConsumerStatus.RUNNING;
            long maxWaitMillis = 100L;
            long emptyFetchTimes = 0L;
            do {
                try {
                    List<RedissonMessage> messageList = this.fetch();
                    if (messageList == null || messageList.isEmpty()) {
                        long delay = ++emptyFetchTimes * 5L;
                        delay = Math.min(delay, 100L);
                        Thread.sleep(delay);
                        continue;
                    }
                    emptyFetchTimes = 0L;
                    BatchRedissonMessageListenerAdapter redissonListener = (BatchRedissonMessageListenerAdapter)BatchRedissonListenerContainer.this.getRedissonListener();
                    redissonListener.onMessage(messageList);
                }
                catch (InterruptedException | RedisException messageList) {
                }
                catch (Exception e) {
                    LogUtils.error((String)"error occurred while take message from redisson", (Object[])new Object[]{e});
                }
            } while (this.status != AbstractRedissonListenerContainer.ConsumerStatus.STOPPED);
            LogUtils.info((String)"consumer currentThread [{}] will exit, because of STOPPED status", (Object[])new Object[]{this.currentThread.getName()});
            this.currentThread = null;
        }

        private List<RedissonMessage> fetch() {
            String queue = BatchRedissonListenerContainer.this.getContainerProperties().getQueue();
            RedissonClient redissonClient = BatchRedissonListenerContainer.this.getRedissonClient();
            int fetchCount = BatchRedissonListenerContainer.this.maxFetch;
            int searchEndIndex = fetchCount - 1;
            List message = (List)redissonClient.getScript((Codec)FastJsonCodec.INSTANCE).eval(RScript.Mode.READ_WRITE, BatchRedissonListenerContainer.this.fetchScript, RScript.ReturnType.MULTI, Collections.singletonList(queue), new Object[]{searchEndIndex, fetchCount});
            if (message == null || message.isEmpty()) {
                return null;
            }
            return message.stream().map(e -> (RedissonMessage)JsonUtils.toObject((String)e, RedissonMessage.class)).collect(Collectors.toList());
        }

        private void stop() {
            if (this.currentThread != null) {
                this.status = AbstractRedissonListenerContainer.ConsumerStatus.STOPPED;
                this.currentThread.interrupt();
            }
        }
    }
}

