/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.client.impl.spi.impl;

import com.hazelcast.client.impl.clientside.HazelcastClientInstanceImpl;
import com.hazelcast.client.impl.protocol.ClientMessage;
import com.hazelcast.client.impl.spi.impl.ClientInvocation;
import com.hazelcast.client.impl.spi.impl.ClientInvocationServiceImpl;
import com.hazelcast.client.impl.spi.impl.listener.ClientListenerServiceImpl;
import com.hazelcast.client.properties.ClientProperty;
import com.hazelcast.instance.impl.OutOfMemoryErrorDispatcher;
import com.hazelcast.internal.util.ConcurrencyDetection;
import com.hazelcast.internal.util.HashUtil;
import com.hazelcast.internal.util.MutableInteger;
import com.hazelcast.internal.util.ThreadAffinity;
import com.hazelcast.internal.util.concurrent.BackoffIdleStrategy;
import com.hazelcast.internal.util.concurrent.BusySpinIdleStrategy;
import com.hazelcast.internal.util.concurrent.IdleStrategy;
import com.hazelcast.internal.util.concurrent.MPSCQueue;
import com.hazelcast.internal.util.executor.HazelcastManagedThread;
import com.hazelcast.logging.ILogger;
import com.hazelcast.spi.properties.HazelcastProperties;
import com.hazelcast.spi.properties.HazelcastProperty;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Supplier;

public class ClientResponseHandlerSupplier
implements Supplier<Consumer<ClientMessage>> {
    private static final HazelcastProperty IDLE_STRATEGY = new HazelcastProperty("hazelcast.client.responsequeue.idlestrategy", "block");
    private static final ThreadLocal<MutableInteger> INT_HOLDER = new ThreadLocal<MutableInteger>(){

        @Override
        protected MutableInteger initialValue() {
            return new MutableInteger();
        }
    };
    private static final long IDLE_MAX_SPINS = 20L;
    private static final long IDLE_MAX_YIELDS = 50L;
    private static final long IDLE_MIN_PARK_NS = TimeUnit.NANOSECONDS.toNanos(1L);
    private static final long IDLE_MAX_PARK_NS = TimeUnit.MICROSECONDS.toNanos(100L);
    private final ClientInvocationServiceImpl invocationService;
    private final ResponseThread[] responseThreads;
    private final HazelcastClientInstanceImpl client;
    private final ILogger logger;
    private final Consumer<ClientMessage> responseHandler;
    private final boolean responseThreadsDynamic;
    private final ConcurrencyDetection concurrencyDetection;
    private final ThreadAffinity threadAffinity = ThreadAffinity.newSystemThreadAffinity("hazelcast.client.response.thread.affinity");

    public ClientResponseHandlerSupplier(ClientInvocationServiceImpl invocationService, ConcurrencyDetection concurrencyDetection) {
        this.invocationService = invocationService;
        this.concurrencyDetection = concurrencyDetection;
        this.client = invocationService.client;
        this.logger = invocationService.invocationLogger;
        HazelcastProperties properties = this.client.getProperties();
        int responseThreadCount = properties.getInteger(ClientProperty.RESPONSE_THREAD_COUNT);
        if (this.threadAffinity.isEnabled()) {
            responseThreadCount = this.threadAffinity.getThreadCount();
        }
        if (responseThreadCount < 0) {
            throw new IllegalArgumentException(ClientProperty.RESPONSE_THREAD_COUNT.getName() + " can't be smaller than 0");
        }
        this.responseThreadsDynamic = properties.getBoolean(ClientProperty.RESPONSE_THREAD_DYNAMIC);
        this.logger.info("Running with " + responseThreadCount + " response threads, dynamic=" + this.responseThreadsDynamic);
        this.responseThreads = new ResponseThread[responseThreadCount];
        for (int k = 0; k < this.responseThreads.length; ++k) {
            this.responseThreads[k] = new ResponseThread(invocationService.client.getName() + ".responsethread-" + k + "-");
            this.responseThreads[k].setThreadAffinity(this.threadAffinity);
        }
        this.responseHandler = responseThreadCount == 0 ? new SyncResponseHandler() : (this.responseThreadsDynamic ? new DynamicResponseHandler() : new AsyncResponseHandler());
    }

    public void start() {
        if (this.responseThreadsDynamic) {
            return;
        }
        for (ResponseThread responseThread : this.responseThreads) {
            responseThread.start();
        }
    }

    public void shutdown() {
        for (ResponseThread responseThread : this.responseThreads) {
            responseThread.interrupt();
        }
    }

    @Override
    public Consumer<ClientMessage> get() {
        return this.responseHandler;
    }

    private void process(ClientMessage response) {
        try {
            this.handleResponse(response);
        }
        catch (Exception e) {
            this.logger.severe("Failed to process response: " + response + " on responseThread: " + Thread.currentThread().getName(), e);
        }
    }

    private void handleResponse(ClientMessage message) {
        if (ClientMessage.isFlagSet(message.getHeaderFlags(), 128)) {
            ClientListenerServiceImpl listenerService = (ClientListenerServiceImpl)this.client.getListenerService();
            listenerService.handleEventMessageOnCallingThread(message);
            return;
        }
        long correlationId = message.getCorrelationId();
        ClientInvocation invocation = this.invocationService.getInvocation(correlationId);
        if (invocation == null) {
            this.logger.warning("No call for callId: " + correlationId + ", response: " + message);
            return;
        }
        if (0 == message.getMessageType()) {
            invocation.notifyException(correlationId, this.client.getClientExceptionFactory().createException(message));
        } else {
            invocation.notify(message);
        }
    }

    private ResponseThread nextResponseThread() {
        if (this.responseThreads.length == 1) {
            return this.responseThreads[0];
        }
        int index = HashUtil.hashToIndex(INT_HOLDER.get().getAndInc(), this.responseThreads.length);
        return this.responseThreads[index];
    }

    public static IdleStrategy getIdleStrategy(HazelcastProperties properties, HazelcastProperty property) {
        String idleStrategyString = properties.getString(property);
        if ("block".equals(idleStrategyString)) {
            return null;
        }
        if ("busyspin".equals(idleStrategyString)) {
            return new BusySpinIdleStrategy();
        }
        if ("backoff".equals(idleStrategyString)) {
            return new BackoffIdleStrategy(20L, 50L, IDLE_MIN_PARK_NS, IDLE_MAX_PARK_NS);
        }
        if (idleStrategyString.startsWith("backoff,")) {
            return BackoffIdleStrategy.createBackoffIdleStrategy(idleStrategyString);
        }
        throw new IllegalStateException("Unrecognized " + property.getName() + " value=" + idleStrategyString);
    }

    private class ResponseThread
    extends HazelcastManagedThread {
        private final BlockingQueue<ClientMessage> responseQueue;
        private final AtomicBoolean started;

        ResponseThread(String name) {
            super(name);
            this.started = new AtomicBoolean();
            this.setContextClassLoader(ClientResponseHandlerSupplier.this.client.getClientConfig().getClassLoader());
            this.responseQueue = new MPSCQueue<ClientMessage>(this, ClientResponseHandlerSupplier.getIdleStrategy(ClientResponseHandlerSupplier.this.client.getProperties(), IDLE_STRATEGY));
        }

        @Override
        public void executeRun() {
            try {
                this.doRun();
            }
            catch (OutOfMemoryError e) {
                OutOfMemoryErrorDispatcher.onOutOfMemory(e);
            }
            catch (Throwable t) {
                ClientResponseHandlerSupplier.this.invocationService.invocationLogger.severe(t);
            }
        }

        private void doRun() {
            while (!ClientResponseHandlerSupplier.this.invocationService.isShutdown()) {
                ClientMessage response;
                try {
                    response = this.responseQueue.take();
                }
                catch (InterruptedException e) {
                    continue;
                }
                ClientResponseHandlerSupplier.this.process(response);
            }
        }

        private void queue(ClientMessage message) {
            this.responseQueue.add(message);
        }

        @SuppressFBWarnings(value={"IA_AMBIGUOUS_INVOCATION_OF_INHERITED_OR_OUTER_METHOD"}, justification="The thread.start method is the one we want to call")
        private void ensureStarted() {
            if (!this.started.get() && this.started.compareAndSet(false, true)) {
                this.start();
            }
        }
    }

    class SyncResponseHandler
    implements Consumer<ClientMessage> {
        SyncResponseHandler() {
        }

        @Override
        public void accept(ClientMessage message) {
            ClientResponseHandlerSupplier.this.process(message);
        }
    }

    class DynamicResponseHandler
    implements Consumer<ClientMessage> {
        DynamicResponseHandler() {
        }

        @Override
        public void accept(ClientMessage message) {
            if (ClientResponseHandlerSupplier.this.concurrencyDetection.isDetected()) {
                ResponseThread responseThread = ClientResponseHandlerSupplier.this.nextResponseThread();
                responseThread.queue(message);
                responseThread.ensureStarted();
            } else {
                ClientResponseHandlerSupplier.this.process(message);
            }
        }
    }

    class AsyncResponseHandler
    implements Consumer<ClientMessage> {
        AsyncResponseHandler() {
        }

        @Override
        public void accept(ClientMessage message) {
            ClientResponseHandlerSupplier.this.nextResponseThread().queue(message);
        }
    }
}

