/*
 * Decompiled with CFR 0.152.
 */
package org.jetlinks.supports.cluster.redis;

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import javax.annotation.Nonnull;
import lombok.Generated;
import org.jetlinks.core.VisitCount;
import org.jetlinks.core.cluster.ClusterQueue;
import org.jetlinks.core.utils.Reactors;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.data.redis.core.ReactiveRedisOperations;
import org.springframework.data.redis.core.ReactiveRedisTemplate;
import org.springframework.data.redis.core.script.RedisScript;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;

public class RedisClusterQueue<T>
extends VisitCount
implements ClusterQueue<T> {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(RedisClusterQueue.class);
    private static final AtomicReferenceFieldUpdater<RedisClusterQueue, Boolean> POLLING = AtomicReferenceFieldUpdater.newUpdater(RedisClusterQueue.class, Boolean.class, "polling");
    private static final AtomicIntegerFieldUpdater<RedisClusterQueue> ROUND_ROBIN = AtomicIntegerFieldUpdater.newUpdater(RedisClusterQueue.class, "roundRobin");
    private final String id;
    protected final ReactiveRedisOperations<String, T> operations;
    private volatile Boolean polling = false;
    private volatile int roundRobin = 0;
    private int maxBatchSize = 32;
    private volatile float localConsumerPercent = 1.0f;
    private long lastRequestSize = this.maxBatchSize;
    private boolean hasLocalProducer;
    private ClusterQueue.Mod mod = ClusterQueue.Mod.FIFO;
    long lastEmptyTime = 0L;
    private final List<FluxSink<T>> subscribers = new CopyOnWriteArrayList<FluxSink<T>>();
    private static final RedisScript<List> lifoPollScript = RedisScript.of((String)String.join((CharSequence)"\n", "local val = redis.call('lrange',KEYS[1],0,KEYS[2]);", "redis.call('ltrim',KEYS[1],KEYS[2]+1,-1);", "return val;"), List.class);
    private static final RedisScript<List> fifoPollScript = RedisScript.of((String)String.join((CharSequence)"\n", "local size = redis.call('llen',KEYS[1]);", "if size == 0 then", "return nil", "end", "local index = size - KEYS[2];", "if index == 0 then", "return redis.call('lpop',KEYS[1]);", "end", "local val = redis.call('lrange',KEYS[1],index,size);", "redis.call('ltrim',KEYS[1],0,index-1);", "return val;"), List.class);
    private static final RedisScript<Long> pushAndPublish = RedisScript.of((String)"local val = redis.call('lpush',KEYS[1],ARGV[1]);redis.call('publish','queue:data:produced',ARGV[2]);return val;", Long.class);
    private boolean useScript = "true".equals(System.getProperty("jetlinks.cluster.redus.queue.batch.enabled", "true"));

    public void setLocalConsumerPercent(float localConsumerPercent) {
        this.localConsumerPercent = localConsumerPercent;
    }

    public RedisClusterQueue(String id, ReactiveRedisTemplate<String, T> operations) {
        this.id = id;
        this.operations = operations;
        if (this.useScript && operations.getConnectionFactory() instanceof LettuceConnectionFactory) {
            LettuceConnectionFactory factory = (LettuceConnectionFactory)operations.getConnectionFactory();
            this.useScript = !factory.isClusterAware();
        }
    }

    protected void tryPoll() {
        this.doPoll(this.lastRequestSize);
    }

    private boolean push(Iterable<T> data) {
        for (T datum : data) {
            if (this.push(datum)) continue;
            return false;
        }
        return true;
    }

    private boolean push(T data) {
        int size = this.subscribers.size();
        if (size == 0) {
            return false;
        }
        if (size == 1) {
            this.subscribers.get(0).next(data);
            return true;
        }
        int index = ROUND_ROBIN.incrementAndGet(this);
        if (index >= size) {
            index = 0;
            ROUND_ROBIN.set(this, 0);
        }
        this.subscribers.get(index).next(data);
        return true;
    }

    private void doPoll(long size) {
        if (!this.hasLocalConsumer()) {
            return;
        }
        this.visit();
        if (POLLING.compareAndSet(this, false, true)) {
            AtomicLong total = new AtomicLong(size);
            long pollSize = Math.min(total.get(), (long)this.maxBatchSize);
            this.pollBatch((int)pollSize).flatMap(v -> {
                if (!this.push(v)) {
                    return this.operations.opsForList().leftPush((Object)this.id, v).then();
                }
                return Mono.just((Object)v);
            }).count().doFinally(s -> POLLING.set(this, false)).subscribe(r -> {
                if (r > 0L && total.addAndGet(-r.longValue()) > 0L) {
                    POLLING.set(this, false);
                    this.doPoll(total.get());
                    log.trace("poll datas[{}] from redis [{}] ", r, (Object)this.id);
                } else {
                    this.lastEmptyTime = System.currentTimeMillis();
                }
            });
        }
    }

    protected void stopPoll() {
    }

    @Nonnull
    public Flux<T> subscribe() {
        return Flux.create(sink -> {
            this.subscribers.add((FluxSink<T>)sink);
            sink.onDispose(() -> this.subscribers.remove(sink));
            this.doPoll(sink.requestedFromDownstream());
        }).doOnRequest(i -> {
            if (!this.hasLocalProducer) {
                this.lastRequestSize = i;
                this.doPoll(this.lastRequestSize);
            }
        });
    }

    public void stop() {
        this.stopPoll();
    }

    public boolean hasLocalConsumer() {
        return this.subscribers.size() > 0;
    }

    public Mono<Integer> size() {
        this.visit();
        return this.operations.opsForList().size((Object)this.id).map(Number::intValue);
    }

    public void setPollMod(ClusterQueue.Mod mod) {
        this.mod = mod;
    }

    @Nonnull
    public Mono<T> poll() {
        this.visit();
        return this.mod == ClusterQueue.Mod.LIFO ? this.operations.opsForList().leftPop((Object)this.id) : this.operations.opsForList().rightPop((Object)this.id);
    }

    private Flux<T> pollBatch(int size) {
        if (size == 1 || !this.useScript) {
            return this.poll().flux();
        }
        return (this.mod == ClusterQueue.Mod.FIFO ? this.operations.execute(fifoPollScript, Arrays.asList(this.id, String.valueOf(size))).doOnNext(list -> Collections.reverse(list)) : this.operations.execute(lifoPollScript, Arrays.asList(this.id, String.valueOf(size)))).flatMap(list -> Flux.create(sink -> {
            for (Object o : list) {
                if (o == null) continue;
                sink.next(o);
            }
            sink.complete();
        })).map(i -> i);
    }

    private ReactiveRedisOperations getOperations() {
        return this.operations;
    }

    private boolean isLocalConsumer() {
        return this.subscribers.size() > 0 && (this.localConsumerPercent == 1.0f || ThreadLocalRandom.current().nextFloat() < this.localConsumerPercent);
    }

    public Mono<Boolean> add(T data) {
        this.visit();
        return this.doAdd(data);
    }

    private Mono<Boolean> doAdd(T data) {
        this.hasLocalProducer = true;
        if (this.isLocalConsumer() && this.push(data)) {
            return Reactors.ALWAYS_TRUE;
        }
        if (!this.useScript) {
            return this.operations.opsForList().leftPush((Object)this.id, data).then(this.getOperations().convertAndSend("queue:data:produced", (Object)this.id));
        }
        return this.getOperations().execute(pushAndPublish, Arrays.asList(this.id), Arrays.asList(data, this.id)).then(Reactors.ALWAYS_TRUE);
    }

    public Mono<Boolean> add(Publisher<T> publisher) {
        this.hasLocalProducer = true;
        this.visit();
        return Flux.from(publisher).flatMap(this::doAdd).then(Reactors.ALWAYS_TRUE);
    }

    public Mono<Boolean> addBatch(Publisher<? extends Collection<T>> publisher) {
        this.hasLocalProducer = true;
        this.visit();
        return Flux.from(publisher).flatMap(v -> {
            if (this.isLocalConsumer() && this.push((T)v)) {
                return Reactors.ALWAYS_ONE;
            }
            return this.operations.opsForList().leftPushAll((Object)this.id, v).then(this.getOperations().convertAndSend("queue:data:produced", (Object)this.id));
        }).then(Reactors.ALWAYS_TRUE);
    }

    @Generated
    public void setUseScript(boolean useScript) {
        this.useScript = useScript;
    }
}

