/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.stream.impl;

import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.infinispan.commands.CommandsFactory;
import org.infinispan.commons.CacheException;
import org.infinispan.distribution.ch.ConsistentHash;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.factories.annotations.Start;
import org.infinispan.remoting.responses.Response;
import org.infinispan.remoting.rpc.RpcManager;
import org.infinispan.remoting.transport.Address;
import org.infinispan.remoting.transport.jgroups.SuspectException;
import org.infinispan.stream.impl.ClusterStreamManager;
import org.infinispan.stream.impl.KeyTrackingTerminalOperation;
import org.infinispan.stream.impl.StreamRequestCommand;
import org.infinispan.stream.impl.TerminalOperation;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;

public class ClusterStreamManagerImpl<K>
implements ClusterStreamManager<K> {
    protected final Map<UUID, RequestTracker> currentlyRunning = new ConcurrentHashMap<UUID, RequestTracker>();
    protected RpcManager rpc;
    protected CommandsFactory factory;
    protected Address localAddress;
    protected static final Log log = LogFactory.getLog(ClusterStreamManagerImpl.class);

    @Inject
    public void inject(RpcManager rpc, CommandsFactory factory) {
        this.rpc = rpc;
        this.factory = factory;
    }

    @Start
    public void start() {
        this.localAddress = this.rpc.getAddress();
    }

    @Override
    public <R> UUID remoteStreamOperation(boolean parallelDistribution, boolean parallelStream, ConsistentHash ch, Set<Integer> segments, Set<K> keysToInclude, Map<Integer, Set<K>> keysToExclude, boolean includeLoader, TerminalOperation<R> operation, ClusterStreamManager.ResultsCallback<R> callback, Predicate<? super R> earlyTerminatePredicate) {
        return this.remoteStreamIgnoreKeyOperation(parallelDistribution, parallelStream, ch, segments, keysToInclude, keysToExclude, includeLoader, operation, callback, StreamRequestCommand.Type.TERMINAL, earlyTerminatePredicate);
    }

    @Override
    public <R> UUID remoteStreamOperationRehashAware(boolean parallelDistribution, boolean parallelStream, ConsistentHash ch, Set<Integer> segments, Set<K> keysToInclude, Map<Integer, Set<K>> keysToExclude, boolean includeLoader, TerminalOperation<R> operation, ClusterStreamManager.ResultsCallback<R> callback, Predicate<? super R> earlyTerminatePredicate) {
        return this.remoteStreamIgnoreKeyOperation(parallelDistribution, parallelStream, ch, segments, keysToInclude, keysToExclude, includeLoader, operation, callback, StreamRequestCommand.Type.TERMINAL_REHASH, earlyTerminatePredicate);
    }

    private <R> UUID remoteStreamIgnoreKeyOperation(boolean parallelDistribution, boolean parallelStream, ConsistentHash ch, Set<Integer> segments, Set<K> keysToInclude, Map<Integer, Set<K>> keysToExclude, boolean includeLoader, TerminalOperation<R> operation, ClusterStreamManager.ResultsCallback<R> callback, StreamRequestCommand.Type type, Predicate<? super R> earlyTerminatePredicate) {
        Map<Address, Set<Integer>> targets = this.determineTargets(ch, segments);
        UUID uuid = UUID.randomUUID();
        if (!targets.isEmpty()) {
            log.tracef("Performing remote operations %s for id %s", (Object)targets, (Object)uuid);
            RequestTracker<? super R> tracker = new RequestTracker<R>(callback, targets, earlyTerminatePredicate);
            this.currentlyRunning.put(uuid, tracker);
            if (parallelDistribution) {
                this.submitAsyncTasks(uuid, targets, keysToExclude, parallelStream, keysToInclude, includeLoader, type, operation);
            } else {
                for (Map.Entry<Address, Set<Integer>> targetInfo : targets.entrySet()) {
                    Set<Integer> targetSegments = targetInfo.getValue();
                    Set<K> keysExcluded = this.determineExcludedKeys(keysToExclude, targetSegments);
                    this.rpc.invokeRemotely(Collections.singleton(targetInfo.getKey()), this.factory.buildStreamRequestCommand(uuid, parallelStream, type, targetSegments, keysToInclude, keysExcluded, includeLoader, operation), this.rpc.getDefaultRpcOptions(true));
                }
            }
        } else {
            log.tracef("Not performing any remote operations for id %s as no valid targets found", (Object)uuid);
        }
        return uuid;
    }

    @Override
    public <R> UUID remoteStreamOperation(boolean parallelDistribution, boolean parallelStream, ConsistentHash ch, Set<Integer> segments, Set<K> keysToInclude, Map<Integer, Set<K>> keysToExclude, boolean includeLoader, KeyTrackingTerminalOperation<K, R, ?> operation, ClusterStreamManager.ResultsCallback<Collection<R>> callback) {
        Map<Address, Set<Integer>> targets = this.determineTargets(ch, segments);
        UUID uuid = UUID.randomUUID();
        if (!targets.isEmpty()) {
            log.tracef("Performing remote key aware operations %s for id %s", (Object)targets, (Object)uuid);
            RequestTracker<Collection<R>> tracker = new RequestTracker<Collection<R>>(callback, targets, null);
            this.currentlyRunning.put(uuid, tracker);
            if (parallelDistribution) {
                this.submitAsyncTasks(uuid, targets, keysToExclude, parallelStream, keysToInclude, includeLoader, StreamRequestCommand.Type.TERMINAL_KEY, operation);
            } else {
                for (Map.Entry<Address, Set<Integer>> targetInfo : targets.entrySet()) {
                    Set<Integer> targetSegments = targetInfo.getValue();
                    Set<K> keysExcluded = this.determineExcludedKeys(keysToExclude, targetSegments);
                    this.rpc.invokeRemotely(Collections.singleton(targetInfo.getKey()), this.factory.buildStreamRequestCommand(uuid, parallelStream, StreamRequestCommand.Type.TERMINAL_KEY, targetSegments, keysToInclude, keysExcluded, includeLoader, operation), this.rpc.getDefaultRpcOptions(true));
                }
            }
        }
        return uuid;
    }

    @Override
    public <R2> UUID remoteStreamOperationRehashAware(boolean parallelDistribution, boolean parallelStream, ConsistentHash ch, Set<Integer> segments, Set<K> keysToInclude, Map<Integer, Set<K>> keysToExclude, boolean includeLoader, KeyTrackingTerminalOperation<K, ?, R2> operation, ClusterStreamManager.ResultsCallback<Map<K, R2>> callback) {
        Map<Address, Set<Integer>> targets = this.determineTargets(ch, segments);
        UUID uuid = UUID.randomUUID();
        if (!targets.isEmpty()) {
            log.tracef("Performing remote rehash key aware operations %s for id %s", (Object)targets, (Object)uuid);
            RequestTracker<Map<K, R2>> tracker = new RequestTracker<Map<K, R2>>(callback, targets, null);
            this.currentlyRunning.put(uuid, tracker);
            if (parallelDistribution) {
                this.submitAsyncTasks(uuid, targets, keysToExclude, parallelStream, keysToInclude, includeLoader, StreamRequestCommand.Type.TERMINAL_KEY_REHASH, operation);
            } else {
                for (Map.Entry<Address, Set<Integer>> targetInfo : targets.entrySet()) {
                    Address dest = targetInfo.getKey();
                    Set<Integer> targetSegments = targetInfo.getValue();
                    try {
                        Set<K> keysExcluded = this.determineExcludedKeys(keysToExclude, targetSegments);
                        log.tracef("Submitting task to %s for %s excluding keys %s", (Object)dest, (Object)uuid, (Object)keysExcluded);
                        Response response = this.rpc.invokeRemotely(Collections.singleton(dest), this.factory.buildStreamRequestCommand(uuid, parallelStream, StreamRequestCommand.Type.TERMINAL_KEY_REHASH, targetSegments, keysToInclude, keysExcluded, includeLoader, operation), this.rpc.getDefaultRpcOptions(true)).values().iterator().next();
                        if (response.isSuccessful()) continue;
                        log.tracef("Unsuccessful response for %s from %s - making segments %s suspect", (Object)uuid, (Object)dest, (Object)targetSegments);
                        this.receiveResponse(uuid, dest, true, targetSegments, null);
                    }
                    catch (Exception e) {
                        Throwable cause = e;
                        boolean wasSuspect = false;
                        do {
                            if (!(cause instanceof SuspectException)) continue;
                            log.tracef("Exception from %s contained a SuspectException, making all segments %s suspect", (Object)dest, (Object)targetSegments);
                            this.receiveResponse(uuid, dest, true, targetSegments, null);
                            wasSuspect = true;
                            break;
                        } while ((cause = cause.getCause()) != null);
                        if (wasSuspect) continue;
                        log.tracef((Throwable)e, "Encounted exception for %s from %s", (Object)uuid, (Object)dest);
                        throw e;
                    }
                }
            }
        }
        return uuid;
    }

    private void submitAsyncTasks(UUID uuid, Map<Address, Set<Integer>> targets, Map<Integer, Set<K>> keysToExclude, boolean parallelStream, Set<K> keysToInclude, boolean includeLoader, StreamRequestCommand.Type type, Object operation) {
        for (Map.Entry<Address, Set<Integer>> targetInfo : targets.entrySet()) {
            Set<Integer> segments = targetInfo.getValue();
            Set<K> keysExcluded = this.determineExcludedKeys(keysToExclude, segments);
            Address dest = targetInfo.getKey();
            log.tracef("Submitting async task to %s for %s excluding keys %s", (Object)dest, (Object)uuid, (Object)keysExcluded);
            CompletableFuture<Map<Address, Response>> completableFuture = this.rpc.invokeRemotelyAsync(Collections.singleton(dest), this.factory.buildStreamRequestCommand(uuid, parallelStream, type, segments, keysToInclude, keysExcluded, includeLoader, operation), this.rpc.getDefaultRpcOptions(true));
            completableFuture.whenComplete((v, e) -> {
                if (v != null) {
                    Response response = (Response)v.values().iterator().next();
                    if (!response.isSuccessful()) {
                        log.tracef("Unsuccessful response for %s from %s - making segments suspect", (Object)uuid, targetInfo.getKey());
                        this.receiveResponse(uuid, (Address)targetInfo.getKey(), true, (Set)targetInfo.getValue(), null);
                    }
                } else if (e != null) {
                    Throwable cause = e;
                    boolean wasSuspect = false;
                    do {
                        if (!(cause instanceof SuspectException)) continue;
                        log.tracef("Exception contained a SuspectException, making all segments %s suspect", targetInfo.getValue());
                        this.receiveResponse(uuid, (Address)targetInfo.getKey(), true, (Set)targetInfo.getValue(), null);
                        wasSuspect = true;
                        break;
                    } while ((cause = cause.getCause()) != null);
                    if (!wasSuspect) {
                        log.tracef((Throwable)e, "Encounted exception for %s from %s", (Object)uuid, targetInfo.getKey());
                        RequestTracker tracker = this.currentlyRunning.get(uuid);
                        if (tracker != null) {
                            ClusterStreamManagerImpl.markTrackerWithException(tracker, dest, e, uuid);
                        } else {
                            log.warnf("Unhandled remote stream exception encountered", e);
                        }
                    }
                }
            });
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected static void markTrackerWithException(RequestTracker<?> tracker, Address dest, Throwable e, UUID uuid) {
        log.tracef("Marking tracker to have exception", new Object[0]);
        tracker.throwable = e;
        if (dest == null || tracker.lastResult(dest, null)) {
            if (uuid != null) {
                log.tracef("Tracker %s completed with exception, waking sleepers!", (Object)uuid);
            } else {
                log.trace("Tracker completed due to outside cause, waking sleepers! ");
            }
            tracker.completionLock.lock();
            try {
                tracker.completionCondition.signalAll();
            }
            finally {
                tracker.completionLock.unlock();
            }
        }
    }

    private Set<K> determineExcludedKeys(Map<Integer, Set<K>> keysToExclude, Set<Integer> segmentsToUse) {
        if (keysToExclude.isEmpty()) {
            return Collections.emptySet();
        }
        return segmentsToUse.stream().flatMap(s -> {
            Set keysForSegment = (Set)keysToExclude.get(s);
            if (keysForSegment != null) {
                return keysForSegment.stream();
            }
            return null;
        }).collect(Collectors.toSet());
    }

    private Map<Address, Set<Integer>> determineTargets(ConsistentHash ch, Set<Integer> segments) {
        ConcurrentHashMap<Address, Set<Integer>> targets = new ConcurrentHashMap<Address, Set<Integer>>();
        List<Address> addresses = ch.getMembers();
        for (Address address : addresses) {
            if (address.equals(this.localAddress)) continue;
            Set<Integer> theirSegments = ch.getPrimarySegmentsForOwner(address);
            if (segments != null && theirSegments.retainAll(segments) && theirSegments.isEmpty()) continue;
            targets.put(address, theirSegments);
        }
        return targets;
    }

    @Override
    public boolean isComplete(UUID id) {
        return !this.currentlyRunning.containsKey(id);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public boolean awaitCompletion(UUID id, long time, TimeUnit unit) throws InterruptedException {
        if (time <= 0L) {
            throw new IllegalArgumentException("Time must be greater than 0");
        }
        log.tracef("Awaiting completion of %s", (Object)id);
        boolean completed = false;
        long target = System.nanoTime() + unit.toNanos(time);
        Throwable throwable = null;
        while (target - System.nanoTime() > 0L) {
            RequestTracker tracker = this.currentlyRunning.get(id);
            if (tracker == null) {
                completed = true;
                break;
            }
            throwable = tracker.throwable;
            if (throwable != null) break;
            tracker.completionLock.lock();
            try {
                if (!this.currentlyRunning.containsKey(id)) {
                    completed = true;
                    throwable = tracker.throwable;
                    break;
                }
                if (tracker.completionCondition.await(target - System.nanoTime(), TimeUnit.NANOSECONDS)) continue;
                throwable = tracker.throwable;
                completed = false;
                break;
            }
            finally {
                tracker.completionLock.unlock();
            }
        }
        log.tracef("Returning back to caller due to %s being completed: %s", (Object)id, (Object)completed);
        if (throwable != null) {
            if (throwable instanceof RuntimeException) {
                throw (RuntimeException)throwable;
            }
            throw new CacheException(throwable);
        }
        return completed;
    }

    @Override
    public void forgetOperation(UUID id) {
        RequestTracker tracker = this.currentlyRunning.remove(id);
        if (tracker != null) {
            tracker.completionLock.lock();
            try {
                tracker.completionCondition.signalAll();
            }
            finally {
                tracker.completionLock.unlock();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public <R1> boolean receiveResponse(UUID id, Address origin, boolean complete, Set<Integer> missingSegments, R1 response) {
        log.tracef("Received response from %s with a completed response %s for id %s with %s suspected segments.", origin, complete, id, missingSegments);
        RequestTracker tracker = this.currentlyRunning.get(id);
        if (tracker != null) {
            boolean notify = false;
            RequestTracker requestTracker = tracker;
            synchronized (requestTracker) {
                if (tracker.awaitingResponse.containsKey(origin)) {
                    if (!missingSegments.isEmpty()) {
                        tracker.missingSegments(missingSegments);
                    }
                    if (complete) {
                        notify = tracker.lastResult(origin, response);
                    } else {
                        tracker.intermediateResults(origin, response);
                    }
                }
            }
            if (notify) {
                log.tracef("Marking %s as completed!", (Object)id);
                tracker.completionLock.lock();
                try {
                    this.currentlyRunning.remove(id);
                    tracker.completionCondition.signalAll();
                }
                finally {
                    tracker.completionLock.unlock();
                }
            }
            return !notify;
        }
        log.tracef("Ignoring response as we already received a completed response for %s from %s", (Object)id, (Object)origin);
        return false;
    }

    static class RequestTracker<R> {
        final ClusterStreamManager.ResultsCallback<R> callback;
        final Map<Address, Set<Integer>> awaitingResponse;
        final Lock completionLock = new ReentrantLock();
        final Condition completionCondition = this.completionLock.newCondition();
        final Predicate<? super R> earlyTerminatePredicate;
        Set<Integer> missingSegments;
        volatile Throwable throwable;

        RequestTracker(ClusterStreamManager.ResultsCallback<R> callback, Map<Address, Set<Integer>> awaitingResponse, Predicate<? super R> earlyTerminatePredicate) {
            this.callback = callback;
            this.awaitingResponse = awaitingResponse;
            this.earlyTerminatePredicate = earlyTerminatePredicate;
        }

        public void intermediateResults(Address origin, R intermediateResult) {
            this.callback.onIntermediateResult(origin, intermediateResult);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public boolean lastResult(Address origin, R result) {
            Set<Integer> completedSegments = this.awaitingResponse.get(origin);
            if (this.missingSegments != null) {
                completedSegments.removeAll(this.missingSegments);
            }
            this.callback.onCompletion(origin, completedSegments, result);
            RequestTracker requestTracker = this;
            synchronized (requestTracker) {
                if (this.earlyTerminatePredicate != null && this.earlyTerminatePredicate.test(result)) {
                    this.awaitingResponse.clear();
                } else {
                    this.awaitingResponse.remove(origin);
                }
                return this.awaitingResponse.isEmpty();
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void missingSegments(Set<Integer> segments) {
            RequestTracker requestTracker = this;
            synchronized (requestTracker) {
                if (this.missingSegments == null) {
                    this.missingSegments = segments;
                } else {
                    this.missingSegments.addAll(segments);
                }
            }
            this.callback.onSegmentsLost(segments);
        }
    }
}

