/*
 * Decompiled with CFR 0.152.
 */
package io.atomix.catalyst.transport;

import io.atomix.catalyst.transport.Connection;
import io.atomix.catalyst.transport.MessageHandler;
import io.atomix.catalyst.util.Assert;
import io.atomix.catalyst.util.Listener;
import io.atomix.catalyst.util.Listeners;
import io.atomix.catalyst.util.ReferenceCounted;
import io.atomix.catalyst.util.concurrent.Futures;
import io.atomix.catalyst.util.concurrent.ThreadContext;
import java.net.ConnectException;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.RejectedExecutionException;
import java.util.function.Consumer;

public class LocalConnection
implements Connection {
    private static final int RESPONSE_ERROR = 0;
    private static final int RESPONSE_OK = 1;
    private final UUID id = UUID.randomUUID();
    private final ThreadContext context;
    private final Set<LocalConnection> connections;
    private LocalConnection connection;
    private long requestId;
    private final Map<Long, ContextualFuture> futures = new ConcurrentHashMap<Long, ContextualFuture>();
    private final Map<Class<?>, HandlerHolder> handlers = new ConcurrentHashMap();
    private final Listeners<Throwable> exceptionListeners = new Listeners();
    private final Listeners<Connection> closeListeners = new Listeners();
    volatile boolean open = true;

    public LocalConnection(ThreadContext context, Set<LocalConnection> connections) {
        this.context = context;
        this.connections = connections;
    }

    public LocalConnection connect(LocalConnection connection) {
        this.connection = connection;
        return this;
    }

    @Override
    public <T, U> CompletableFuture<U> send(T request) {
        if (!this.open || !this.connection.open) {
            return Futures.exceptionalFuture((Throwable)new ConnectException("connection closed"));
        }
        Assert.notNull(request, (String)"request");
        ContextualFuture future = new ContextualFuture(ThreadContext.currentContextOrThrow());
        this.context.execute(() -> this.sendRequest(request, future));
        return future;
    }

    private void sendRequest(Object request, ContextualFuture future) {
        if (this.open && this.connection.open) {
            long requestId = ++this.requestId;
            this.futures.put(requestId, future);
            this.connection.handleRequest(requestId, request);
        } else {
            future.context.executor().execute(() -> future.completeExceptionally(new ConnectException("connection closed")));
        }
        if (request instanceof ReferenceCounted) {
            ((ReferenceCounted)request).release();
        }
    }

    private void handleResponseOk(long requestId, Object response) {
        ContextualFuture future = this.futures.remove(requestId);
        if (future != null) {
            future.context.executor().execute(() -> future.complete(response));
        }
    }

    private void handleResponseError(long requestId, Throwable error) {
        ContextualFuture future = this.futures.remove(requestId);
        if (future != null) {
            future.context.execute(() -> future.completeExceptionally(error));
        }
    }

    private void handleRequest(long requestId, Object request) {
        HandlerHolder holder = this.handlers.get(request.getClass());
        if (holder == null) {
            this.connection.handleResponseError(requestId, new ConnectException("no handler registered"));
            return;
        }
        MessageHandler handler = holder.handler;
        try {
            holder.context.executor().execute(() -> {
                if (this.open && this.connection.open) {
                    handler.handle(request).whenComplete((response, error) -> {
                        if (!this.open || !this.connection.open) {
                            this.connection.handleResponseError(requestId, new ConnectException("connection closed"));
                        } else if (error == null) {
                            this.connection.handleResponseOk(requestId, response);
                        } else {
                            this.connection.handleResponseError(requestId, (Throwable)error);
                        }
                    });
                } else {
                    this.connection.handleResponseError(requestId, new ConnectException("connection closed"));
                }
            });
        }
        catch (RejectedExecutionException e) {
            this.connection.handleResponseError(requestId, new ConnectException("connection closed"));
        }
    }

    @Override
    public <T, U> Connection handler(Class<T> type, MessageHandler<T, U> handler) {
        Assert.notNull(type, (String)"type");
        if (handler != null) {
            this.handlers.put(type, new HandlerHolder(handler, ThreadContext.currentContextOrThrow()));
        } else {
            this.handlers.remove(type);
        }
        return this;
    }

    @Override
    public Listener<Throwable> exceptionListener(Consumer<Throwable> listener) {
        return this.exceptionListeners.add((Consumer)Assert.notNull(listener, (String)"listener"));
    }

    @Override
    public Listener<Connection> closeListener(Consumer<Connection> listener) {
        return this.closeListeners.add((Consumer)Assert.notNull(listener, (String)"listener"));
    }

    @Override
    public CompletableFuture<Void> close() {
        if (!this.open) {
            return CompletableFuture.completedFuture(null);
        }
        this.doClose();
        this.connection.doClose();
        return ThreadContext.currentContextOrThrow().execute(() -> null);
    }

    private void doClose() {
        this.open = false;
        this.connections.remove(this);
        for (Map.Entry<Long, ContextualFuture> entry : this.futures.entrySet()) {
            ContextualFuture future = entry.getValue();
            try {
                future.context.executor().execute(() -> future.completeExceptionally(new ConnectException("connection closed")));
            }
            catch (RejectedExecutionException e) {}
        }
        this.futures.clear();
        for (Consumer closeListener : this.closeListeners) {
            try {
                this.context.executor().execute(() -> closeListener.accept(this));
            }
            catch (RejectedExecutionException rejectedExecutionException) {}
        }
    }

    public int hashCode() {
        return this.id.hashCode();
    }

    public boolean equals(Object object) {
        return object instanceof LocalConnection && ((LocalConnection)object).id.equals(this.id);
    }

    protected static class HandlerHolder {
        private final MessageHandler<?, ?> handler;
        private final ThreadContext context;

        private HandlerHolder(MessageHandler<?, ?> handler, ThreadContext context) {
            this.handler = handler;
            this.context = context;
        }
    }

    private static class ContextualFuture<T>
    extends CompletableFuture<T> {
        private final ThreadContext context;

        private ContextualFuture(ThreadContext context) {
            this.context = context;
        }
    }
}

