/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.spi.impl;

import com.hazelcast.core.HazelcastInstanceNotActiveException;
import com.hazelcast.instance.MemberImpl;
import com.hazelcast.instance.Node;
import com.hazelcast.logging.ILogger;
import com.hazelcast.nio.Address;
import com.hazelcast.partition.MigrationInfo;
import com.hazelcast.spi.AbstractOperation;
import com.hazelcast.spi.Notifier;
import com.hazelcast.spi.Operation;
import com.hazelcast.spi.PartitionAwareOperation;
import com.hazelcast.spi.WaitNotifyKey;
import com.hazelcast.spi.WaitNotifyService;
import com.hazelcast.spi.WaitSupport;
import com.hazelcast.spi.exception.CallTimeoutException;
import com.hazelcast.spi.exception.PartitionMigratingException;
import com.hazelcast.spi.exception.RetryableException;
import com.hazelcast.spi.impl.NodeEngineImpl;
import com.hazelcast.util.Clock;
import com.hazelcast.util.ConcurrencyUtil;
import com.hazelcast.util.ConstructorFunction;
import com.hazelcast.util.executor.SingleExecutorThreadFactory;
import java.util.Iterator;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;

class WaitNotifyServiceImpl
implements WaitNotifyService {
    private final ConcurrentMap<WaitNotifyKey, Queue<WaitingOp>> mapWaitingOps = new ConcurrentHashMap<WaitNotifyKey, Queue<WaitingOp>>(100);
    private final DelayQueue delayQueue = new DelayQueue();
    private final ExecutorService expirationService;
    private final Future expirationTask;
    private final NodeEngineImpl nodeEngine;
    private final ILogger logger;
    private final ConstructorFunction<WaitNotifyKey, Queue<WaitingOp>> waitQueueConstructor = new ConstructorFunction<WaitNotifyKey, Queue<WaitingOp>>(){

        @Override
        public Queue<WaitingOp> createNew(WaitNotifyKey key) {
            return new ConcurrentLinkedQueue<WaitingOp>();
        }
    };

    public WaitNotifyServiceImpl(NodeEngineImpl nodeEngine) {
        this.nodeEngine = nodeEngine;
        Node node = nodeEngine.getNode();
        this.logger = node.getLogger(WaitNotifyService.class.getName());
        String threadNamePrefix = node.getThreadNamePrefix("wait-notify");
        this.expirationService = Executors.newSingleThreadExecutor(new SingleExecutorThreadFactory(node.threadGroup, node.getConfigClassLoader(), threadNamePrefix));
        this.expirationTask = this.expirationService.submit(new ExpirationTask());
    }

    private void invalidate(WaitingOp waitingOp) throws Exception {
        this.nodeEngine.getOperationService().executeOperation(waitingOp);
    }

    @Override
    public void await(WaitSupport waitSupport) {
        WaitNotifyKey key = waitSupport.getWaitKey();
        Queue<WaitingOp> q = ConcurrencyUtil.getOrPutIfAbsent(this.mapWaitingOps, key, this.waitQueueConstructor);
        long timeout = waitSupport.getWaitTimeout();
        WaitingOp waitingOp = new WaitingOp(q, waitSupport);
        waitingOp.setNodeEngine(this.nodeEngine);
        q.offer(waitingOp);
        if (timeout > -1L && timeout < 1500L) {
            this.delayQueue.offer(waitingOp);
        }
    }

    @Override
    public void notify(Notifier notifier) {
        WaitNotifyKey key = notifier.getNotifiedKey();
        Queue q = (Queue)this.mapWaitingOps.get(key);
        if (q == null) {
            return;
        }
        WaitingOp waitingOp = (WaitingOp)q.peek();
        while (waitingOp != null) {
            Operation op = waitingOp.getOperation();
            if (notifier == op) {
                throw new IllegalStateException("Found cyclic wait-notify! -> " + notifier);
            }
            if (waitingOp.isValid()) {
                if (waitingOp.isExpired()) {
                    waitingOp.onExpire();
                } else {
                    if (waitingOp.shouldWait()) {
                        return;
                    }
                    this.nodeEngine.operationService.runOperationOnCallingThread(op);
                }
                waitingOp.setValid(false);
            }
            q.poll();
            waitingOp = (WaitingOp)q.peek();
        }
    }

    void onMemberLeft(MemberImpl leftMember) {
        this.invalidateWaitingOps(leftMember.getUuid());
    }

    void onClientDisconnected(String clientUuid) {
        this.invalidateWaitingOps(clientUuid);
    }

    private void invalidateWaitingOps(String callerUuid) {
        for (Queue q : this.mapWaitingOps.values()) {
            for (WaitingOp waitingOp : q) {
                Operation op;
                if (!waitingOp.isValid() || !callerUuid.equals((op = waitingOp.getOperation()).getCallerUuid())) continue;
                waitingOp.setValid(false);
            }
        }
    }

    void onPartitionMigrate(Address thisAddress, MigrationInfo migrationInfo) {
        if (thisAddress.equals(migrationInfo.getSource())) {
            int partitionId = migrationInfo.getPartitionId();
            for (Queue q : this.mapWaitingOps.values()) {
                Iterator it = q.iterator();
                while (it.hasNext()) {
                    Operation op;
                    if (Thread.interrupted()) {
                        return;
                    }
                    WaitingOp waitingOp = (WaitingOp)it.next();
                    if (!waitingOp.isValid() || partitionId != (op = waitingOp.getOperation()).getPartitionId()) continue;
                    waitingOp.setValid(false);
                    PartitionMigratingException pme = new PartitionMigratingException(thisAddress, partitionId, op.getClass().getName(), op.getServiceName());
                    op.getResponseHandler().sendResponse(pme);
                    it.remove();
                }
            }
        }
    }

    public void cancelWaitingOps(String serviceName, Object objectId, Throwable cause) {
        for (Queue q : this.mapWaitingOps.values()) {
            for (WaitingOp waitingOp : q) {
                WaitNotifyKey wnk;
                if (!waitingOp.isValid() || !serviceName.equals((wnk = waitingOp.waitSupport.getWaitKey()).getServiceName()) || !objectId.equals(wnk.getObjectName())) continue;
                waitingOp.cancel(cause);
            }
        }
    }

    void shutdown() {
        this.logger.finest("Stopping tasks...");
        this.expirationTask.cancel(true);
        this.expirationService.shutdown();
        HazelcastInstanceNotActiveException response = new HazelcastInstanceNotActiveException();
        Address thisAddress = this.nodeEngine.getThisAddress();
        for (Queue q : this.mapWaitingOps.values()) {
            for (WaitingOp waitingOp : q) {
                Operation op;
                if (!waitingOp.isValid() || !thisAddress.equals((op = waitingOp.getOperation()).getCallerAddress())) continue;
                try {
                    op.getResponseHandler().sendResponse(response);
                }
                catch (Exception e) {
                    this.logger.finest("While sending HazelcastInstanceNotActiveException response...", e);
                }
            }
            q.clear();
        }
        this.mapWaitingOps.clear();
    }

    public String toString() {
        StringBuilder sb = new StringBuilder("WaitNotifyService{");
        sb.append("delayQueue=" + this.delayQueue.size());
        sb.append(" \n[");
        for (Queue ScheduledOps : this.mapWaitingOps.values()) {
            sb.append("\t");
            sb.append(ScheduledOps.size() + ", ");
        }
        sb.append("]\n}");
        return sb.toString();
    }

    private class ExpirationTask
    implements Runnable {
        private ExpirationTask() {
        }

        @Override
        public void run() {
            block3: while (!Thread.interrupted()) {
                try {
                    long waitTime = 1000L;
                    while (waitTime > 0L) {
                        long end;
                        long begin = System.currentTimeMillis();
                        WaitingOp waitingOp = (WaitingOp)WaitNotifyServiceImpl.this.delayQueue.poll(waitTime, TimeUnit.MILLISECONDS);
                        if (waitingOp != null && waitingOp.isValid()) {
                            WaitNotifyServiceImpl.this.invalidate(waitingOp);
                        }
                        if ((waitTime -= (end = System.currentTimeMillis()) - begin) <= 1000L) continue;
                        waitTime = 1000L;
                    }
                    Iterator i$ = WaitNotifyServiceImpl.this.mapWaitingOps.values().iterator();
                    block5: while (true) {
                        if (!i$.hasNext()) continue block3;
                        Queue q = (Queue)i$.next();
                        Iterator i$2 = q.iterator();
                        while (true) {
                            if (!i$2.hasNext()) continue block5;
                            WaitingOp waitingOp = (WaitingOp)i$2.next();
                            if (Thread.interrupted()) {
                                return;
                            }
                            if (!waitingOp.isValid() || !waitingOp.needsInvalidation()) continue;
                            WaitNotifyServiceImpl.this.invalidate(waitingOp);
                        }
                        break;
                    }
                }
                catch (InterruptedException e) {
                    return;
                }
                catch (Throwable t) {
                    WaitNotifyServiceImpl.this.logger.warning(t);
                    continue;
                }
                break;
            }
            return;
        }
    }

    static class WaitingOp
    extends AbstractOperation
    implements Delayed,
    PartitionAwareOperation {
        final Queue<WaitingOp> queue;
        final Operation op;
        final WaitSupport waitSupport;
        final long expirationTime;
        volatile boolean valid = true;
        volatile Throwable error = null;

        WaitingOp(Queue<WaitingOp> queue, WaitSupport waitSupport) {
            this.op = (Operation)((Object)waitSupport);
            this.waitSupport = waitSupport;
            this.queue = queue;
            this.expirationTime = this.getExpirationTime(waitSupport);
            this.setPartitionId(this.op.getPartitionId());
        }

        private long getExpirationTime(WaitSupport waitSupport) {
            long waitTimeout = waitSupport.getWaitTimeout();
            if (waitTimeout < 0L) {
                return -1L;
            }
            long expirationTime = Clock.currentTimeMillis() + waitTimeout;
            if (expirationTime < 0L) {
                return -1L;
            }
            return expirationTime;
        }

        public Operation getOperation() {
            return this.op;
        }

        public void setValid(boolean valid) {
            this.valid = valid;
        }

        public boolean isValid() {
            return this.valid;
        }

        public boolean needsInvalidation() {
            return this.isExpired() || this.isCancelled() || this.isCallTimedOut();
        }

        public boolean isExpired() {
            return this.expirationTime > 0L && Clock.currentTimeMillis() >= this.expirationTime;
        }

        public boolean isCancelled() {
            return this.error != null;
        }

        public boolean isCallTimedOut() {
            NodeEngineImpl nodeEngine = (NodeEngineImpl)this.getNodeEngine();
            if (nodeEngine.operationService.isCallTimedOut(this.op)) {
                this.cancel(new CallTimeoutException(this.op.getClass().getName(), this.op.getInvocationTime(), this.op.getCallTimeout()));
                return true;
            }
            return false;
        }

        public boolean shouldWait() {
            return this.waitSupport.shouldWait();
        }

        @Override
        public long getDelay(TimeUnit unit) {
            return unit.convert(this.expirationTime - Clock.currentTimeMillis(), TimeUnit.MILLISECONDS);
        }

        @Override
        public int compareTo(Delayed other) {
            if (other == this) {
                return 0;
            }
            long d = this.getDelay(TimeUnit.NANOSECONDS) - other.getDelay(TimeUnit.NANOSECONDS);
            return d == 0L ? 0 : (d < 0L ? -1 : 1);
        }

        @Override
        public void run() throws Exception {
            if (this.valid) {
                boolean expired = this.isExpired();
                boolean cancelled = this.isCancelled();
                if ((expired || cancelled) && this.queue.remove(this)) {
                    this.valid = false;
                    if (expired) {
                        this.waitSupport.onWaitExpire();
                    } else {
                        this.op.getResponseHandler().sendResponse(this.error);
                    }
                }
            }
        }

        public int hashCode() {
            assert (false) : "hashCode not designed";
            return 42;
        }

        public boolean equals(Object obj) {
            return super.equals(obj);
        }

        @Override
        public void logError(Throwable e) {
            ILogger logger = this.getLogger();
            if (e instanceof RetryableException) {
                logger.warning("Op: " + this.op + ", " + e.getClass().getName() + ": " + e.getMessage());
            } else if (e instanceof OutOfMemoryError) {
                try {
                    logger.log(Level.SEVERE, e.getMessage(), e);
                }
                catch (Throwable throwable) {}
            } else {
                logger.severe("Op: " + this.op + ", Error: " + e.getMessage(), e);
            }
        }

        @Override
        public boolean returnsResponse() {
            return false;
        }

        @Override
        public String getServiceName() {
            return this.op.getServiceName();
        }

        public void onExpire() {
            this.waitSupport.onWaitExpire();
        }

        public void cancel(Throwable t) {
            this.error = t;
        }

        public String toString() {
            StringBuilder sb = new StringBuilder();
            sb.append("WaitingOp");
            sb.append("{op=").append(this.op);
            sb.append(", expirationTime=").append(this.expirationTime);
            sb.append(", valid=").append(this.valid);
            sb.append('}');
            return sb.toString();
        }
    }
}

