/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.client.hotrod.impl.transport.netty;

import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.Signal;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.infinispan.client.hotrod.ProtocolVersion;
import org.infinispan.client.hotrod.configuration.ClientIntelligence;
import org.infinispan.client.hotrod.configuration.Configuration;
import org.infinispan.client.hotrod.counter.impl.HotRodCounterEvent;
import org.infinispan.client.hotrod.event.impl.AbstractClientEvent;
import org.infinispan.client.hotrod.exceptions.HotRodClientException;
import org.infinispan.client.hotrod.exceptions.TransportException;
import org.infinispan.client.hotrod.impl.operations.AddClientListenerOperation;
import org.infinispan.client.hotrod.impl.operations.ByteBufCacheUnmarshaller;
import org.infinispan.client.hotrod.impl.operations.HotRodOperation;
import org.infinispan.client.hotrod.impl.protocol.Codec;
import org.infinispan.client.hotrod.impl.protocol.HotRodConstants;
import org.infinispan.client.hotrod.impl.transport.netty.ByteBufUtil;
import org.infinispan.client.hotrod.impl.transport.netty.HintedReplayingDecoder;
import org.infinispan.client.hotrod.impl.transport.netty.OperationChannel;
import org.infinispan.client.hotrod.impl.transport.netty.OperationDispatcher;
import org.infinispan.client.hotrod.logging.Log;
import org.infinispan.client.hotrod.logging.LogFactory;
import org.infinispan.commons.time.TimeService;
import org.infinispan.commons.util.ArrayRingBuffer;
import org.infinispan.commons.util.Util;

public class HeaderDecoder
extends HintedReplayingDecoder<State> {
    private static final Log log = LogFactory.getLog(HeaderDecoder.class);
    public static final String NAME = "header-decoder";
    private final Configuration configuration;
    private final TimeService timeService;
    private final OperationDispatcher dispatcher;
    private final List<byte[]> listeners = new ArrayList<byte[]>();
    private final ByteBufCacheUnmarshaller unmarshaller;
    private volatile boolean closing;
    private final ArrayRingBuffer<OperationTimeout> operations = new ArrayRingBuffer(32);
    private final Runnable CHECK_TIMEOUTS = this::checkForTimeouts;
    private Channel channel;
    private HotRodOperation<?> operation;
    private short status;
    private long receivedMessageId;
    private short receivedOpCode;
    private long messageOffset;
    private Codec codec;
    private ScheduledFuture<?> scheduledTimeout;
    private final Map<Long, HotRodOperation<?>> incomplete = new ConcurrentHashMap();
    private final Map<Long, ScheduledFuture<?>> timeouts = new HashMap();

    public HeaderDecoder(Configuration configuration, OperationDispatcher dispatcher) {
        super(State.READ_MESSAGE_ID);
        this.configuration = configuration;
        this.timeService = dispatcher.getTimeService();
        this.dispatcher = dispatcher;
        this.unmarshaller = new ByteBufCacheUnmarshaller(configuration.getClassAllowList());
    }

    public Configuration getConfiguration() {
        return this.configuration;
    }

    public Channel getChannel() {
        assert (this.channel == null || this.channel.eventLoop().inEventLoop());
        return this.channel;
    }

    private void checkForTimeouts() {
        block2: {
            long nanoDiff;
            long currentTime = this.timeService.time();
            while (true) {
                OperationTimeout opTimeout;
                if ((opTimeout = (OperationTimeout)this.operations.peek()) == null) {
                    log.trace("No operations left, not scheduling timeout checker");
                    this.scheduledTimeout = null;
                    break block2;
                }
                nanoDiff = opTimeout.timeout - currentTime;
                if (nanoDiff >= 0L) break;
                long messageId = this.operations.getHeadSequence();
                this.operations.poll();
                this.incomplete.put(messageId, opTimeout.op);
                this.dispatcher.handleResponse(opTimeout.op, messageId, this.channel, null, (Throwable)new SocketTimeoutException(String.valueOf((Object)this) + " timed out after " + this.configuration.socketTimeout() + " ms"));
            }
            log.tracef("Rescheduling timeout checker for ~%d ms", TimeUnit.NANOSECONDS.toMillis(nanoDiff));
            this.channel.eventLoop().schedule(this.CHECK_TIMEOUTS, nanoDiff, TimeUnit.NANOSECONDS);
        }
    }

    public boolean isSharable() {
        return false;
    }

    public long registerOperation(HotRodOperation<?> operation) {
        log.tracef("Decoder is %s Channel is %s", (Object)this, this.channel);
        assert (this.channel.eventLoop().inEventLoop());
        long messageId = this.messageOffset++;
        if (log.isTraceEnabled()) {
            log.tracef("Registering id %d for operation %s(%08X) on %s", new Object[]{messageId, operation, System.identityHashCode(operation), this.channel});
        }
        if (this.closing) {
            HotRodClientException noOpException = Log.HOTROD.noMoreOperationsAllowed();
            this.dispatcher.handleResponse(operation, messageId, this.channel, null, (Throwable)noOpException);
            throw noOpException;
        }
        long timeout = operation.timeout();
        if (timeout > 0L || operation.isInstanceOf(AddClientListenerOperation.class)) {
            Long messageIdLong = messageId;
            HotRodOperation<?> prev = this.incomplete.put(messageIdLong, operation);
            assert (prev == null);
            this.scheduleTimeout(operation, messageIdLong);
        } else {
            long nanoTime = this.timeService.time();
            int socketTimeout = this.configuration.socketTimeout();
            this.operations.set(messageId, (Object)new OperationTimeout(operation, nanoTime + TimeUnit.MILLISECONDS.toNanos(socketTimeout)));
            if (this.scheduledTimeout == null) {
                log.tracef("Scheduling timeout checker for %d ms", socketTimeout);
                this.scheduledTimeout = this.channel.eventLoop().schedule(this.CHECK_TIMEOUTS, (long)socketTimeout, TimeUnit.MILLISECONDS);
            }
        }
        return messageId;
    }

    private void scheduleTimeout(HotRodOperation<?> op, Long messageIdLong) {
        long timeout = op.timeout() > 0L ? op.timeout() : (long)this.configuration.socketTimeout();
        log.tracef("Scheduling timeout for %d ms", timeout);
        io.netty.util.concurrent.ScheduledFuture future = this.channel.eventLoop().schedule(() -> {
            this.timeouts.remove(messageIdLong);
            this.dispatcher.handleResponse(op, (long)messageIdLong, this.channel, null, (Throwable)new SocketTimeoutException(String.valueOf((Object)this) + " timed out after " + this.configuration.socketTimeout() + " ms"));
        }, timeout, TimeUnit.MILLISECONDS);
        this.timeouts.put(messageIdLong, (ScheduledFuture<?>)future);
    }

    public void refreshTimeout(HotRodOperation<?> op, long messageId) {
        assert (op.isInstanceOf(AddClientListenerOperation.class));
        Long messageIdLong = messageId;
        ScheduledFuture<?> future = this.timeouts.remove(messageIdLong);
        if (future == null) {
            log.tracef("Unable to refresh timeout for messageID %d", messageId);
            return;
        }
        log.tracef("Refreshing timeout with id %d for op %s", messageId, op);
        future.cancel(false);
        this.scheduleTimeout(op, messageIdLong);
    }

    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        this.channel = ctx.channel();
        log.tracef("Decoder %s has Channel %s active", (Object)this, this.channel);
        if (this.codec == null) {
            this.codec = this.configuration.version().getCodec();
        }
        super.channelActive(ctx);
    }

    private HotRodOperation<?> removeOperation(long messageId) {
        OperationTimeout opTimeout = (OperationTimeout)this.operations.remove(messageId);
        if (opTimeout != null) {
            return opTimeout.op;
        }
        HotRodOperation<?> op = this.incomplete.remove(messageId);
        ScheduledFuture<?> future = this.timeouts.remove(messageId);
        if (future != null) {
            future.cancel(false);
        }
        return op;
    }

    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
        try {
            switch (((State)((Object)this.state())).ordinal()) {
                case 0: {
                    this.operation = null;
                    this.receivedMessageId = this.codec.readMessageId(in);
                    this.receivedOpCode = in.readUnsignedByte();
                    switch (this.receivedOpCode) {
                        case 96: 
                        case 97: 
                        case 98: 
                        case 99: {
                            HotRodOperation<?> hotRodOperation = this.operation = this.receivedMessageId == 0L ? null : this.incomplete.get(this.receivedMessageId);
                            if (this.operation != null && !this.operation.isInstanceOf(AddClientListenerOperation.class)) {
                                throw Log.HOTROD.operationIsNotAddClientListener(this.receivedMessageId, this.operation.toString());
                            }
                            if (log.isTraceEnabled()) {
                                log.tracef("Received event for request %d", this.receivedMessageId, this.operation);
                            }
                            this.checkpoint(State.READ_CACHE_EVENT);
                            return;
                        }
                        case 102: {
                            this.checkpoint(State.READ_COUNTER_EVENT);
                            return;
                        }
                    }
                    if (this.receivedMessageId >= 0L) {
                        this.operation = this.removeOperation(this.receivedMessageId);
                        if (this.operation == null) {
                            throw Log.HOTROD.unknownMessageId(this.receivedMessageId);
                        }
                    }
                    if (log.isTraceEnabled()) {
                        log.tracef("Received response for request %d, %s", this.receivedMessageId, this.operation);
                    }
                    this.checkpoint(State.READ_STATUS);
                }
                case 1: {
                    if (log.isTraceEnabled()) {
                        log.tracef("Decoding header for message %s", HotRodConstants.Names.of(this.receivedOpCode));
                    }
                    this.status = in.readUnsignedByte();
                    this.checkpoint(State.READ_TOPOLOGY);
                }
                case 2: {
                    short topologyChangeByte = in.readUnsignedByte();
                    if (topologyChangeByte == 1) {
                        this.readNewTopologyAndHash(in, this.operation.getCacheName());
                    }
                    this.checkpoint(State.READ_PAYLOAD);
                }
                case 3: {
                    if (this.operation == null || this.receivedOpCode != this.operation.responseOpCode()) {
                        short responseCode;
                        String cacheName = this.operation == null ? "" : this.operation.getCacheName();
                        short s = responseCode = this.operation == null ? (short)-1 : (short)this.operation.responseOpCode();
                        if (this.receivedOpCode == 80) {
                            this.codec.checkForErrorsInResponseStatus(in, cacheName, this.receivedMessageId, this.status, this.channel.remoteAddress());
                        }
                        throw Log.HOTROD.invalidResponse(cacheName, responseCode, this.receivedOpCode);
                    }
                    if (log.isTraceEnabled()) {
                        log.tracef("Decoding payload for message %s", HotRodConstants.Names.of(this.receivedOpCode));
                    }
                    try {
                        this.unmarshaller.setDataFormat(this.operation.getDataFormat());
                        Object resp = this.operation.createResponse(in, this.status, this, this.codec, this.unmarshaller);
                        this.dispatcher.handleResponse(this.operation, this.receivedMessageId, ctx.channel(), resp, null);
                    }
                    catch (Signal signal) {
                        throw signal;
                    }
                    catch (Throwable t) {
                        this.dispatcher.handleResponse(this.operation, this.receivedMessageId, ctx.channel(), null, t);
                    }
                    this.checkpoint(State.READ_MESSAGE_ID);
                    break;
                }
                case 4: {
                    AbstractClientEvent cacheEvent;
                    if (log.isTraceEnabled()) {
                        log.tracef("Decoding cache event %s", HotRodConstants.Names.of(this.receivedOpCode));
                    }
                    try {
                        cacheEvent = this.codec.readCacheEvent(in, this.receivedMessageId, this.dispatcher.getClientListenerNotifier()::getCacheDataFormat, this.receivedOpCode, this.configuration.getClassAllowList(), ctx.channel().remoteAddress());
                    }
                    catch (Signal signal) {
                        throw signal;
                    }
                    catch (Throwable t) {
                        log.unableToReadEventFromServer(t, ctx.channel().remoteAddress());
                        throw t;
                    }
                    if (this.operation != null && this.operation.isInstanceOf(AddClientListenerOperation.class)) {
                        this.refreshTimeout(this.operation, this.receivedMessageId);
                    }
                    this.invokeEvent(cacheEvent.getListenerId(), cacheEvent);
                    this.checkpoint(State.READ_MESSAGE_ID);
                    break;
                }
                case 5: {
                    HotRodCounterEvent counterEvent;
                    if (log.isTraceEnabled()) {
                        log.tracef("Decoding counter event %s", HotRodConstants.Names.of(this.receivedOpCode));
                    }
                    try {
                        counterEvent = this.codec.readCounterEvent(in);
                    }
                    catch (Signal signal) {
                        throw signal;
                    }
                    catch (Throwable t) {
                        Log.HOTROD.unableToReadEventFromServer(t, ctx.channel().remoteAddress());
                        throw t;
                    }
                    this.invokeEvent(counterEvent.getListenerId(), counterEvent);
                    this.checkpoint(State.READ_MESSAGE_ID);
                }
            }
        }
        catch (Exception e) {
            this.checkpoint(State.READ_MESSAGE_ID);
            throw e;
        }
    }

    private void readNewTopologyAndHash(ByteBuf buf, String cacheName) {
        SocketAddress[][] segmentOwners;
        short hashFunctionVersion;
        int newTopologyId = ByteBufUtil.readVInt(buf);
        InetSocketAddress[] addresses = this.readTopology(buf);
        if (this.dispatcher.getClientIntelligence().getValue() == ClientIntelligence.HASH_DISTRIBUTION_AWARE.getValue()) {
            hashFunctionVersion = buf.readUnsignedByte();
            int numSegments = ByteBufUtil.readVInt(buf);
            segmentOwners = new SocketAddress[numSegments][];
            if (hashFunctionVersion > 0) {
                for (int i = 0; i < numSegments; ++i) {
                    int numOwners = buf.readUnsignedByte();
                    segmentOwners[i] = new SocketAddress[numOwners];
                    for (int j = 0; j < numOwners; ++j) {
                        int memberIndex = ByteBufUtil.readVInt(buf);
                        segmentOwners[i][j] = addresses[memberIndex];
                    }
                }
            }
        } else {
            hashFunctionVersion = -1;
            segmentOwners = null;
        }
        this.dispatcher.updateTopology(cacheName, this.operation, newTopologyId, addresses, segmentOwners, hashFunctionVersion);
    }

    private InetSocketAddress[] readTopology(ByteBuf buf) {
        int clusterSize = ByteBufUtil.readVInt(buf);
        InetSocketAddress[] addresses = new InetSocketAddress[clusterSize];
        for (int i = 0; i < clusterSize; ++i) {
            String host = ByteBufUtil.readString(buf);
            int port = buf.readUnsignedShort();
            addresses[i] = InetSocketAddress.createUnresolved(host, port);
        }
        return addresses;
    }

    public void setCodec(Codec codec) {
        assert (this.channel.eventLoop().inEventLoop());
        if (this.configuration.version() == ProtocolVersion.PROTOCOL_VERSION_AUTO) {
            if (codec == null) {
                this.codec = codec;
            }
            ((OperationChannel)this.channel.attr(OperationChannel.OPERATION_CHANNEL_ATTRIBUTE_KEY).get()).setCodec(codec);
        }
    }

    private void invokeEvent(byte[] listenerId, Object cacheEvent) {
        try {
            this.dispatcher.getClientListenerNotifier().invokeEvent(listenerId, cacheEvent);
        }
        catch (Exception e) {
            Log.HOTROD.unexpectedErrorConsumingEvent(cacheEvent, e);
        }
    }

    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        if (this.operation != null && !this.operation.asCompletableFuture().isDone()) {
            HotRodOperation<?> op = this.operation;
            this.operation = null;
            this.dispatcher.handleResponse(op, this.receivedMessageId, ctx.channel(), null, cause);
        } else {
            TransportException transportException = log.errorFromUnknownOperation(ctx.channel(), cause, ctx.channel().remoteAddress());
            if (log.isTraceEnabled()) {
                log.tracef(transportException, "Requesting %s close due to exception", ctx.channel());
            }
            this.handleClosing(ctx, transportException);
            ctx.close();
        }
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        TransportException transportException = log.connectionClosed(this.channel.remoteAddress(), this.channel.remoteAddress());
        this.handleClosing(ctx, transportException);
        super.channelInactive(ctx);
    }

    void failoverClientListeners() {
        for (byte[] listenerId : this.listeners) {
            this.dispatcher.getClientListenerNotifier().failoverClientListener(listenerId);
        }
    }

    private void handleClosing(ChannelHandlerContext ctx, Throwable t) {
        if (this.closing) {
            return;
        }
        assert (this.channel == null || this.channel.eventLoop().inEventLoop());
        this.closing = true;
        this.dispatcher.handleChannelFailure(ctx.channel(), t);
        this.operations.forEach((opTimeout, id) -> {
            try {
                this.dispatcher.handleResponse(opTimeout.op, id, ctx.channel(), null, t);
            }
            catch (Throwable innerT) {
                Log.HOTROD.errorf(t, "Failed to complete %s", opTimeout.op);
            }
        });
        this.operations.clear();
        for (Map.Entry<Long, HotRodOperation<?>> entry : this.incomplete.entrySet()) {
            ScheduledFuture<?> f;
            HotRodOperation<?> op = entry.getValue();
            try {
                this.dispatcher.handleResponse(op, (long)entry.getKey(), ctx.channel(), null, t);
            }
            catch (Throwable innerT) {
                Log.HOTROD.errorf(t, "Failed to complete %s", op);
            }
            if ((f = this.timeouts.remove(entry.getKey())) == null) continue;
            f.cancel(false);
        }
        this.failoverClientListeners();
        this.incomplete.clear();
    }

    @Override
    public void checkpoint() {
        super.checkpoint();
    }

    public Map<Long, HotRodOperation<?>> registeredOperationsById() {
        HashMap map = new HashMap();
        this.operations.forEach((opTimeout, id) -> map.put(id, opTimeout.op));
        map.putAll(this.incomplete);
        return map;
    }

    public void addListener(byte[] listenerId) {
        if (log.isTraceEnabled()) {
            log.tracef("Decoder %08X adding listener %s", ((Object)((Object)this)).hashCode(), Util.printArray((byte[])listenerId));
        }
        this.listeners.add(listenerId);
    }

    public void removeListener(byte[] listenerId) {
        boolean removed = this.listeners.removeIf(id -> Arrays.equals(id, listenerId));
        if (log.isTraceEnabled()) {
            log.tracef("Decoder %08X removed? %s listener %s", ((Object)((Object)this)).hashCode(), Boolean.toString(removed), Util.printArray((byte[])listenerId));
        }
    }

    static enum State {
        READ_MESSAGE_ID,
        READ_STATUS,
        READ_TOPOLOGY,
        READ_PAYLOAD,
        READ_CACHE_EVENT,
        READ_COUNTER_EVENT;

    }

    record OperationTimeout(HotRodOperation<?> op, long timeout) {
    }
}

