/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.data.redis.listener;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.connection.SubscriptionListener;
import org.springframework.data.redis.connection.util.ByteArrayWrapper;
import org.springframework.lang.Nullable;

class SynchronizingMessageListener
implements MessageListener,
SubscriptionListener {
    private final MessageListener messageListener;
    private final SubscriptionListener subscriptionListener;
    private final List<SubscriptionSynchronizion> synchronizations = new CopyOnWriteArrayList<SubscriptionSynchronizion>();

    public SynchronizingMessageListener(MessageListener messageListener, SubscriptionListener subscriptionListener) {
        this.messageListener = messageListener;
        this.subscriptionListener = subscriptionListener;
    }

    public void addSynchronization(SubscriptionSynchronizion synchronization) {
        this.synchronizations.add(synchronization);
    }

    @Override
    public void onMessage(Message message, @Nullable byte[] pattern) {
        this.messageListener.onMessage(message, pattern);
    }

    @Override
    public void onChannelSubscribed(byte[] channel, long count) {
        this.subscriptionListener.onChannelSubscribed(channel, count);
        this.handleSubscription(channel, SubscriptionSynchronizion::onChannelSubscribed);
    }

    @Override
    public void onChannelUnsubscribed(byte[] channel, long count) {
        this.subscriptionListener.onChannelUnsubscribed(channel, count);
    }

    @Override
    public void onPatternSubscribed(byte[] pattern, long count) {
        this.subscriptionListener.onPatternSubscribed(pattern, count);
        this.handleSubscription(pattern, SubscriptionSynchronizion::onPatternSubscribed);
    }

    @Override
    public void onPatternUnsubscribed(byte[] pattern, long count) {
        this.subscriptionListener.onPatternUnsubscribed(pattern, count);
    }

    void handleSubscription(byte[] topic, BiFunction<SubscriptionSynchronizion, ByteArrayWrapper, Boolean> synchronizerCallback) {
        if (this.synchronizations.isEmpty()) {
            return;
        }
        ByteArrayWrapper binaryChannel = new ByteArrayWrapper(topic);
        ArrayList<SubscriptionSynchronizion> finalized = new ArrayList<SubscriptionSynchronizion>(this.synchronizations.size());
        for (SubscriptionSynchronizion synchronizer : this.synchronizations) {
            if (!synchronizerCallback.apply(synchronizer, binaryChannel).booleanValue()) continue;
            finalized.add(synchronizer);
        }
        this.synchronizations.removeAll(finalized);
    }

    static class SubscriptionSynchronizion {
        private static final AtomicIntegerFieldUpdater<SubscriptionSynchronizion> DONE = AtomicIntegerFieldUpdater.newUpdater(SubscriptionSynchronizion.class, "done");
        private static final int NOT_DONE = 0;
        private static final int DONE_DONE = 0;
        private volatile int done = 0;
        private final Set<ByteArrayWrapper> remainingPatterns;
        private final Set<ByteArrayWrapper> remainingChannels;
        private final Runnable doneCallback;

        public SubscriptionSynchronizion(Collection<byte[]> remainingPatterns, Collection<byte[]> remainingChannels, Runnable doneCallback) {
            if (remainingPatterns.isEmpty()) {
                this.remainingPatterns = Collections.emptySet();
            } else {
                this.remainingPatterns = ConcurrentHashMap.newKeySet(remainingPatterns.size());
                this.remainingPatterns.addAll(remainingPatterns.stream().map(ByteArrayWrapper::new).collect(Collectors.toList()));
            }
            if (remainingChannels.isEmpty()) {
                this.remainingChannels = Collections.emptySet();
            } else {
                this.remainingChannels = ConcurrentHashMap.newKeySet(remainingChannels.size());
                this.remainingChannels.addAll(remainingChannels.stream().map(ByteArrayWrapper::new).collect(Collectors.toList()));
            }
            this.doneCallback = doneCallback;
        }

        boolean onChannelSubscribed(ByteArrayWrapper channel) {
            if (DONE.get(this) == 0) {
                this.remainingChannels.remove(channel);
                return this.postSubscribe();
            }
            return false;
        }

        boolean onPatternSubscribed(ByteArrayWrapper pattern) {
            if (DONE.get(this) == 0) {
                this.remainingPatterns.remove(pattern);
                return this.postSubscribe();
            }
            return false;
        }

        private boolean postSubscribe() {
            if (this.remainingChannels.isEmpty() && this.remainingPatterns.isEmpty() && DONE.compareAndSet(this, 0, 0)) {
                this.doneCallback.run();
                return true;
            }
            return false;
        }
    }
}

