/*
 * Decompiled with CFR 0.152.
 */
package io.atomix.catalyst.transport;

import io.atomix.catalyst.buffer.BufferInput;
import io.atomix.catalyst.buffer.BufferOutput;
import io.atomix.catalyst.transport.ByteBufInput;
import io.atomix.catalyst.transport.ByteBufOutput;
import io.atomix.catalyst.transport.Connection;
import io.atomix.catalyst.transport.MessageHandler;
import io.atomix.catalyst.transport.TransportException;
import io.atomix.catalyst.util.Assert;
import io.atomix.catalyst.util.Listener;
import io.atomix.catalyst.util.Listeners;
import io.atomix.catalyst.util.ReferenceCounted;
import io.atomix.catalyst.util.concurrent.Scheduled;
import io.atomix.catalyst.util.concurrent.ThreadContext;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import java.time.Duration;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;

public class NettyConnection
implements Connection {
    static final byte REQUEST = 1;
    static final byte RESPONSE = 2;
    static final byte SUCCESS = 3;
    static final byte FAILURE = 4;
    private static final long REQUEST_TIMEOUT = 500L;
    private static final ThreadLocal<ByteBufInput> INPUT = new ThreadLocal<ByteBufInput>(){

        @Override
        protected ByteBufInput initialValue() {
            return new ByteBufInput();
        }
    };
    private static final ThreadLocal<ByteBufOutput> OUTPUT = new ThreadLocal<ByteBufOutput>(){

        @Override
        protected ByteBufOutput initialValue() {
            return new ByteBufOutput();
        }
    };
    private final Channel channel;
    private final ThreadContext context;
    private final Map<Class, HandlerHolder> handlers = new ConcurrentHashMap<Class, HandlerHolder>();
    private final Listeners<Throwable> exceptionListeners = new Listeners();
    private final Listeners<Connection> closeListeners = new Listeners();
    private volatile long requestId;
    private volatile Throwable failure;
    private volatile boolean closed;
    private Scheduled timeout;
    private final Map<Long, ContextualFuture> responseFutures = new LinkedHashMap<Long, ContextualFuture>(1024);
    private ChannelFuture writeFuture;

    public NettyConnection(Channel channel, ThreadContext context) {
        this.channel = channel;
        this.context = context;
        this.timeout = context.schedule(Duration.ofMillis(250L), Duration.ofMillis(250L), this::timeout);
    }

    void handleRequest(ByteBuf buffer) {
        long requestId = buffer.readLong();
        Object request = this.readRequest(buffer);
        HandlerHolder handler = this.handlers.get(request.getClass());
        if (handler != null) {
            handler.context.executor().execute(() -> this.handleRequest(requestId, request, handler));
        } else {
            this.handleRequestFailure(requestId, new IllegalStateException("unknown message type: " + request.getClass()));
        }
        buffer.release();
    }

    private void handleRequest(long requestId, Object request, HandlerHolder handler) {
        CompletableFuture responseFuture = handler.handler.handle(request);
        responseFuture.whenCompleteAsync((response, error) -> {
            if (error == null) {
                this.handleRequestSuccess(requestId, response);
            } else {
                this.handleRequestFailure(requestId, (Throwable)error);
            }
        }, this.context.executor());
    }

    private void handleRequestSuccess(long requestId, Object response) {
        ByteBuf buffer = this.channel.alloc().buffer(10).writeByte(2).writeLong(requestId).writeByte(3);
        this.channel.writeAndFlush((Object)this.writeResponse(buffer, response), this.channel.voidPromise());
        if (response instanceof ReferenceCounted) {
            ((ReferenceCounted)response).release();
        }
    }

    private void handleRequestFailure(long requestId, Throwable error) {
        ByteBuf buffer = this.channel.alloc().buffer(10).writeByte(2).writeLong(requestId).writeByte(4);
        this.channel.writeAndFlush((Object)this.writeError(buffer, error), this.channel.voidPromise());
    }

    void handleResponse(ByteBuf response) {
        long requestId = response.readLong();
        byte status = response.readByte();
        switch (status) {
            case 3: {
                this.handleResponseSuccess(requestId, this.readResponse(response));
                break;
            }
            case 4: {
                this.handleResponseFailure(requestId, this.readError(response));
            }
        }
        response.release();
    }

    private void handleResponseSuccess(long requestId, Object response) {
        ContextualFuture future = this.responseFutures.remove(requestId);
        if (future != null) {
            future.context.executor().execute(() -> future.complete(response));
        }
    }

    private void handleResponseFailure(long requestId, Throwable t) {
        ContextualFuture future = this.responseFutures.remove(requestId);
        if (future != null) {
            future.context.executor().execute(() -> future.completeExceptionally(t));
        }
    }

    private ByteBuf writeRequest(ByteBuf buffer, Object request) {
        this.context.serializer().writeObject(request, (BufferOutput)OUTPUT.get().setByteBuf(buffer));
        if (request instanceof ReferenceCounted) {
            ((ReferenceCounted)request).release();
        }
        return buffer;
    }

    private ByteBuf writeResponse(ByteBuf buffer, Object request) {
        this.context.serializer().writeObject(request, (BufferOutput)OUTPUT.get().setByteBuf(buffer));
        return buffer;
    }

    private ByteBuf writeError(ByteBuf buffer, Throwable t) {
        this.context.serializer().writeObject((Object)t, (BufferOutput)OUTPUT.get().setByteBuf(buffer));
        return buffer;
    }

    private Object readRequest(ByteBuf buffer) {
        return this.context.serializer().readObject((BufferInput)INPUT.get().setByteBuf(buffer));
    }

    private Object readResponse(ByteBuf buffer) {
        return this.context.serializer().readObject((BufferInput)INPUT.get().setByteBuf(buffer));
    }

    private Throwable readError(ByteBuf buffer) {
        return (Throwable)this.context.serializer().readObject((BufferInput)INPUT.get().setByteBuf(buffer));
    }

    void handleException(Throwable t) {
        if (this.failure == null) {
            this.failure = t;
            for (ContextualFuture responseFuture : this.responseFutures.values()) {
                responseFuture.context.executor().execute(() -> responseFuture.completeExceptionally(t));
            }
            this.responseFutures.clear();
            for (Listener listener : this.exceptionListeners) {
                listener.accept((Object)t);
            }
        }
    }

    void handleClosed() {
        if (!this.closed) {
            this.closed = true;
            for (ContextualFuture responseFuture : this.responseFutures.values()) {
                responseFuture.context.executor().execute(() -> responseFuture.completeExceptionally(new IllegalStateException("connection closed")));
            }
            this.responseFutures.clear();
            for (Listener listener : this.closeListeners) {
                listener.accept((Object)this);
            }
            this.timeout.cancel();
        }
    }

    void timeout() {
        ContextualFuture future;
        long time = System.currentTimeMillis();
        Iterator<Map.Entry<Long, ContextualFuture>> iterator = this.responseFutures.entrySet().iterator();
        while (iterator.hasNext() && (future = iterator.next().getValue()).time + 500L < time) {
            iterator.remove();
            future.context.executor().execute(() -> future.completeExceptionally(new TimeoutException("request timed out")));
        }
    }

    public <T, U> CompletableFuture<U> send(T request) {
        Assert.notNull(request, (String)"request");
        ThreadContext context = ThreadContext.currentContextOrThrow();
        ContextualFuture future = new ContextualFuture(System.currentTimeMillis(), context);
        long requestId = ++this.requestId;
        context.executor().execute(() -> {
            ByteBuf buffer = this.channel.alloc().buffer(9);
            buffer.writeByte(1).writeLong(requestId);
            this.writeFuture = this.channel.writeAndFlush((Object)this.writeRequest(buffer, request)).addListener(channelFuture -> {
                if (channelFuture.isSuccess()) {
                    if (!this.closed) {
                        this.responseFutures.put(requestId, future);
                    } else {
                        future.context.executor().execute(() -> future.completeExceptionally((Throwable)new TransportException("connection closed")));
                    }
                } else {
                    future.context.executor().execute(() -> future.completeExceptionally((Throwable)new TransportException(channelFuture.cause())));
                }
            });
        });
        return future;
    }

    public <T, U> Connection handler(Class<T> type, MessageHandler<T, U> handler) {
        Assert.notNull(type, (String)"type");
        this.handlers.put(type, new HandlerHolder(handler, ThreadContext.currentContextOrThrow()));
        return null;
    }

    public Listener<Throwable> exceptionListener(Consumer<Throwable> listener) {
        if (this.failure != null) {
            listener.accept(this.failure);
        }
        return this.exceptionListeners.add((Consumer)Assert.notNull(listener, (String)"listener"));
    }

    public Listener<Connection> closeListener(Consumer<Connection> listener) {
        if (this.closed) {
            listener.accept(this);
        }
        return this.closeListeners.add((Consumer)Assert.notNull(listener, (String)"listener"));
    }

    public CompletableFuture<Void> close() {
        CompletableFuture<Void> future = new CompletableFuture<Void>();
        if (this.writeFuture != null && !this.writeFuture.isDone()) {
            this.writeFuture.addListener(channelFuture -> this.channel.close().addListener(closeFuture -> {
                if (closeFuture.isSuccess()) {
                    future.complete(null);
                } else {
                    future.completeExceptionally(closeFuture.cause());
                }
            }));
        } else {
            this.channel.close().addListener(closeFuture -> {
                if (closeFuture.isSuccess()) {
                    future.complete(null);
                } else {
                    future.completeExceptionally(closeFuture.cause());
                }
            });
        }
        return future;
    }

    public int hashCode() {
        return this.channel.hashCode();
    }

    public boolean equals(Object object) {
        return object instanceof NettyConnection && ((NettyConnection)object).channel.equals(this.channel);
    }

    private static class ContextualFuture<T>
    extends CompletableFuture<T> {
        private final long time;
        private final ThreadContext context;

        private ContextualFuture(long time, ThreadContext context) {
            this.time = time;
            this.context = context;
        }
    }

    protected static class HandlerHolder {
        private final MessageHandler handler;
        private final ThreadContext context;

        private HandlerHolder(MessageHandler handler, ThreadContext context) {
            this.handler = handler;
            this.context = context;
        }
    }
}

