/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.notifications.cachelistener;

import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicReferenceArray;
import java.util.stream.Stream;
import org.infinispan.container.InternalEntryFactory;
import org.infinispan.container.entries.InternalCacheEntry;
import org.infinispan.distribution.DistributionManager;
import org.infinispan.notifications.cachelistener.BaseQueueingSegmentListener;
import org.infinispan.notifications.cachelistener.EventWrapper;
import org.infinispan.notifications.cachelistener.event.CacheEntryEvent;
import org.infinispan.notifications.cachelistener.event.Event;
import org.infinispan.notifications.impl.ListenerInvocation;
import org.infinispan.util.KeyValuePair;

class DistributedQueueingSegmentListener<K, V>
extends BaseQueueingSegmentListener<K, V, CacheEntryEvent<K, V>> {
    private final AtomicReferenceArray<Queue<KeyValuePair<CacheEntryEvent<K, V>, ListenerInvocation<Event<K, V>>>>> queues;
    private final DistributionManager distributionManager;
    protected final InternalEntryFactory entryFactory;
    private Stream<Integer> justCompletedSegments = Stream.empty();

    public DistributedQueueingSegmentListener(InternalEntryFactory entryFactory, DistributionManager distributionManager) {
        this.entryFactory = entryFactory;
        this.distributionManager = distributionManager;
        this.queues = new AtomicReferenceArray(distributionManager.getReadConsistentHash().getNumSegments());
        for (int i = 0; i < this.queues.length(); ++i) {
            ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue();
            this.queues.set(i, queue);
        }
    }

    @Override
    public boolean handleEvent(EventWrapper<K, V, CacheEntryEvent<K, V>> wrapped, ListenerInvocation<Event<K, V>> invocation) {
        K key = wrapped.getKey();
        boolean enqueued = !this.completed.get();
        CacheEntryEvent<K, V> event = wrapped.getEvent();
        InternalCacheEntry<K, V> cacheEntry = this.entryFactory.create(event.getKey(), event.getValue(), event.getMetadata());
        if (enqueued && !this.addEvent(key, cacheEntry.getValue() != null ? cacheEntry : REMOVED)) {
            int segment = this.distributionManager.getReadConsistentHash().getSegment(key);
            Queue<KeyValuePair<CacheEntryEvent<K, V>, ListenerInvocation<Event<K, V>>>> queue = this.queues.get(segment);
            if (queue != null) {
                KeyValuePair<CacheEntryEvent<K, V>, ListenerInvocation<Event<K, V>>> eventPair = new KeyValuePair<CacheEntryEvent<K, V>, ListenerInvocation<Event<K, V>>>(event, invocation);
                queue.add(eventPair);
                if (this.queues.get(segment) == null && queue.remove(eventPair)) {
                    enqueued = false;
                }
            } else {
                enqueued = false;
            }
        }
        return enqueued;
    }

    @Override
    public void transferComplete() {
        this.justCompletedSegments.forEach(this::completeSegment);
        this.completed.set(true);
        this.notifiedKeys.clear();
        for (int i = 0; i < this.queues.length(); ++i) {
            this.queues.set(i, null);
        }
    }

    @Override
    public Object markKeyAsProcessing(K key) {
        return this.notifiedKeys.put(key, NOTIFIED);
    }

    @Override
    public void notifiedKey(K key) {
        this.justCompletedSegments.forEach(this::completeSegment);
        this.justCompletedSegments = Stream.empty();
    }

    private void completeSegment(int segment) {
        Queue<KeyValuePair<CacheEntryEvent<K, V>, ListenerInvocation<Event<K, V>>>> queue = this.queues.get(segment);
        if (queue != null) {
            if (this.trace) {
                this.log.tracef("Completed segment %s", segment);
            }
            for (KeyValuePair keyValuePair : queue) {
                ((ListenerInvocation)keyValuePair.getValue()).invoke(keyValuePair.getKey());
            }
            this.queues.set(segment, null);
        }
    }

    @Override
    public void segmentCompleted(Set<Integer> segments) {
        this.justCompletedSegments = segments.stream().filter(s -> this.queues.get((int)s) != null);
    }
}

