/*
 * Decompiled with CFR 0.152.
 */
package io.atomix.catalyst.transport.netty;

import io.atomix.catalyst.concurrent.Listener;
import io.atomix.catalyst.concurrent.Listeners;
import io.atomix.catalyst.concurrent.Scheduled;
import io.atomix.catalyst.concurrent.ThreadContext;
import io.atomix.catalyst.serializer.SerializationException;
import io.atomix.catalyst.transport.Connection;
import io.atomix.catalyst.transport.MessageHandler;
import io.atomix.catalyst.transport.netty.ByteBufInput;
import io.atomix.catalyst.transport.netty.ByteBufOutput;
import io.atomix.catalyst.transport.netty.NettyOptions;
import io.atomix.catalyst.util.Assert;
import io.atomix.catalyst.util.reference.ReferenceCounted;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import java.net.ConnectException;
import java.time.Duration;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;

public class NettyConnection
implements Connection {
    static final byte REQUEST = 1;
    static final byte RESPONSE = 2;
    static final byte SUCCESS = 3;
    static final byte FAILURE = 4;
    private static final ThreadLocal<ByteBufInput> INPUT = new ThreadLocal<ByteBufInput>(){

        @Override
        protected ByteBufInput initialValue() {
            return new ByteBufInput();
        }
    };
    private static final ThreadLocal<ByteBufOutput> OUTPUT = new ThreadLocal<ByteBufOutput>(){

        @Override
        protected ByteBufOutput initialValue() {
            return new ByteBufOutput();
        }
    };
    private final Channel channel;
    private final ThreadContext context;
    private final Map<Class, HandlerHolder> handlers = new ConcurrentHashMap<Class, HandlerHolder>();
    private final Listeners<Throwable> exceptionListeners = new Listeners();
    private final Listeners<Connection> closeListeners = new Listeners();
    private final long requestTimeout;
    private volatile long requestId;
    private volatile Throwable failure;
    private volatile boolean closed;
    private Scheduled timeout;
    private final Map<Long, ContextualFuture> responseFutures = new ConcurrentSkipListMap<Long, ContextualFuture>();
    private ChannelFuture writeFuture;

    public NettyConnection(Channel channel, ThreadContext context, NettyOptions options) {
        this.channel = channel;
        this.context = context;
        this.requestTimeout = options.requestTimeout();
        this.timeout = context.schedule(Duration.ofMillis(this.requestTimeout / 2L), Duration.ofMillis(this.requestTimeout / 2L), this::timeout);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void handleRequest(ByteBuf buffer) {
        long requestId = buffer.readLong();
        try {
            Object request = this.readRequest(buffer);
            HandlerHolder handler = this.handlers.get(request.getClass());
            if (handler != null) {
                handler.context.executor().execute(() -> this.handleRequest(requestId, request, handler));
            } else {
                this.handleRequestFailure(requestId, new SerializationException("unknown message type: " + request.getClass()), this.context);
            }
        }
        catch (SerializationException e) {
            this.handleRequestFailure(requestId, e, this.context);
        }
        finally {
            buffer.release();
        }
    }

    private void handleRequest(long requestId, Object request, HandlerHolder handler) {
        CompletableFuture responseFuture = handler.handler.handle(request);
        responseFuture.whenComplete((response, error) -> {
            ThreadContext context = ThreadContext.currentContext();
            if (context == null) {
                this.context.executor().execute(() -> {
                    if (error == null) {
                        this.handleRequestSuccess(requestId, response, this.context);
                    } else {
                        this.handleRequestFailure(requestId, (Throwable)error, this.context);
                    }
                });
            } else if (error == null) {
                this.handleRequestSuccess(requestId, response, context);
            } else {
                this.handleRequestFailure(requestId, (Throwable)error, context);
            }
        });
    }

    private void handleRequestSuccess(long requestId, Object response, ThreadContext context) {
        ByteBuf buffer = this.channel.alloc().buffer(10).writeByte(2).writeLong(requestId).writeByte(3);
        try {
            this.writeResponse(buffer, response, context);
        }
        catch (SerializationException e) {
            this.handleRequestFailure(requestId, e, context);
            return;
        }
        this.channel.writeAndFlush((Object)buffer, this.channel.voidPromise());
        if (response instanceof ReferenceCounted) {
            ((ReferenceCounted)response).release();
        }
    }

    private void handleRequestFailure(long requestId, Throwable error, ThreadContext context) {
        ByteBuf buffer = this.channel.alloc().buffer(10).writeByte(2).writeLong(requestId).writeByte(4);
        try {
            this.writeError(buffer, error, context);
        }
        catch (SerializationException e) {
            return;
        }
        this.channel.writeAndFlush((Object)buffer, this.channel.voidPromise());
    }

    void handleResponse(ByteBuf response) {
        long requestId = response.readLong();
        byte status = response.readByte();
        switch (status) {
            case 3: {
                try {
                    this.handleResponseSuccess(requestId, this.readResponse(response));
                }
                catch (SerializationException e) {
                    this.handleResponseFailure(requestId, e);
                }
                break;
            }
            case 4: {
                try {
                    this.handleResponseFailure(requestId, this.readError(response));
                    break;
                }
                catch (SerializationException e) {
                    this.handleResponseFailure(requestId, e);
                }
            }
        }
        response.release();
    }

    private void handleResponseSuccess(long requestId, Object response) {
        ContextualFuture future = this.responseFutures.remove(requestId);
        if (future != null) {
            future.context.executor().execute(() -> future.complete(response));
        }
    }

    private void handleResponseFailure(long requestId, Throwable t) {
        ContextualFuture future = this.responseFutures.remove(requestId);
        if (future != null) {
            future.context.executor().execute(() -> future.completeExceptionally(t));
        }
    }

    private ByteBuf writeRequest(ByteBuf buffer, Object request, ThreadContext context) {
        context.serializer().writeObject(request, OUTPUT.get().setByteBuf(buffer));
        if (request instanceof ReferenceCounted) {
            ((ReferenceCounted)request).release();
        }
        return buffer;
    }

    private ByteBuf writeResponse(ByteBuf buffer, Object request, ThreadContext context) {
        context.serializer().writeObject(request, OUTPUT.get().setByteBuf(buffer));
        return buffer;
    }

    private ByteBuf writeError(ByteBuf buffer, Throwable t, ThreadContext context) {
        context.serializer().writeObject(t, OUTPUT.get().setByteBuf(buffer));
        return buffer;
    }

    private Object readRequest(ByteBuf buffer) {
        return this.context.serializer().readObject(INPUT.get().setByteBuf(buffer));
    }

    private Object readResponse(ByteBuf buffer) {
        return this.context.serializer().readObject(INPUT.get().setByteBuf(buffer));
    }

    private Throwable readError(ByteBuf buffer) {
        return (Throwable)this.context.serializer().readObject(INPUT.get().setByteBuf(buffer));
    }

    void handleException(Throwable t) {
        if (this.failure == null) {
            this.failure = t;
            for (ContextualFuture contextualFuture : this.responseFutures.values()) {
                contextualFuture.context.executor().execute(() -> responseFuture.completeExceptionally(t));
            }
            this.responseFutures.clear();
            for (Listener listener : this.exceptionListeners) {
                listener.accept(t);
            }
        }
    }

    void handleClosed() {
        if (!this.closed) {
            this.closed = true;
            for (ContextualFuture contextualFuture : this.responseFutures.values()) {
                contextualFuture.context.executor().execute(() -> responseFuture.completeExceptionally(new ConnectException("connection closed")));
            }
            this.responseFutures.clear();
            for (Listener listener : this.closeListeners) {
                listener.accept(this);
            }
            this.timeout.cancel();
        }
    }

    void timeout() {
        ContextualFuture future;
        long time = System.currentTimeMillis();
        Iterator<Map.Entry<Long, ContextualFuture>> iterator = this.responseFutures.entrySet().iterator();
        while (iterator.hasNext() && (future = iterator.next().getValue()).time + this.requestTimeout < time) {
            iterator.remove();
            future.context.executor().execute(() -> future.completeExceptionally(new TimeoutException("request timed out")));
        }
    }

    @Override
    public <T, U> CompletableFuture<U> send(T request) {
        Assert.notNull(request, "request");
        ThreadContext context = ThreadContext.currentContextOrThrow();
        ContextualFuture future = new ContextualFuture(System.currentTimeMillis(), context);
        long requestId = ++this.requestId;
        ByteBuf buffer = this.channel.alloc().buffer(9).writeByte(1).writeLong(requestId);
        try {
            this.writeRequest(buffer, request, context);
        }
        catch (SerializationException e) {
            future.completeExceptionally(e);
            return future;
        }
        this.responseFutures.put(requestId, future);
        this.writeFuture = this.channel.writeAndFlush((Object)buffer).addListener(channelFuture -> {
            if (channelFuture.isSuccess()) {
                ContextualFuture responseFuture;
                if (this.closed && (responseFuture = this.responseFutures.remove(requestId)) != null) {
                    responseFuture.context.executor().execute(() -> responseFuture.completeExceptionally(new ConnectException("connection closed")));
                }
            } else {
                future.context.executor().execute(() -> future.completeExceptionally(channelFuture.cause()));
            }
        });
        return future;
    }

    @Override
    public <T, U> Connection handler(Class<T> type, MessageHandler<T, U> handler) {
        Assert.notNull(type, "type");
        this.handlers.put(type, new HandlerHolder(handler, ThreadContext.currentContextOrThrow()));
        return null;
    }

    @Override
    public Listener<Throwable> exceptionListener(Consumer<Throwable> listener) {
        if (this.failure != null) {
            listener.accept(this.failure);
        }
        return this.exceptionListeners.add(Assert.notNull(listener, "listener"));
    }

    @Override
    public Listener<Connection> closeListener(Consumer<Connection> listener) {
        if (this.closed) {
            listener.accept(this);
        }
        return this.closeListeners.add(Assert.notNull(listener, "listener"));
    }

    @Override
    public CompletableFuture<Void> close() {
        ThreadContext context = ThreadContext.currentContextOrThrow();
        CompletableFuture<Void> future = new CompletableFuture<Void>();
        if (this.writeFuture != null && !this.writeFuture.isDone()) {
            this.writeFuture.addListener(channelFuture -> this.channel.close().addListener(closeFuture -> {
                if (closeFuture.isSuccess()) {
                    context.executor().execute(() -> future.complete(null));
                } else {
                    context.executor().execute(() -> future.completeExceptionally(closeFuture.cause()));
                }
            }));
        } else {
            this.channel.close().addListener(closeFuture -> {
                if (closeFuture.isSuccess()) {
                    context.executor().execute(() -> future.complete(null));
                } else {
                    context.executor().execute(() -> future.completeExceptionally(closeFuture.cause()));
                }
            });
        }
        return future;
    }

    public int hashCode() {
        return this.channel.hashCode();
    }

    public boolean equals(Object object) {
        return object instanceof NettyConnection && ((NettyConnection)object).channel.equals(this.channel);
    }

    private static class ContextualFuture<T>
    extends CompletableFuture<T> {
        private final long time;
        private final ThreadContext context;

        private ContextualFuture(long time, ThreadContext context) {
            this.time = time;
            this.context = context;
        }
    }

    protected static class HandlerHolder {
        private final MessageHandler handler;
        private final ThreadContext context;

        private HandlerHolder(MessageHandler handler, ThreadContext context) {
            this.handler = handler;
            this.context = context;
        }
    }
}

