/*
 * Decompiled with CFR 0.152.
 */
package herddb.network.netty;

import herddb.network.Channel;
import herddb.network.SendResultCallback;
import herddb.proto.Pdu;
import io.netty.buffer.ByteBuf;
import java.io.IOException;
import java.util.ArrayList;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import java.util.logging.Logger;

public abstract class AbstractChannel
extends Channel {
    private static final Logger LOGGER = Logger.getLogger(AbstractChannel.class.getName());
    public static final String ADDRESS_JVM_LOCAL = "jvm-local";
    private static final AtomicLong idGenerator = new AtomicLong();
    private final ConcurrentHashMap<Long, Channel.PduCallback> callbacks = new ConcurrentHashMap();
    private final ConcurrentHashMap<Long, Long> pendingReplyMessagesDeadline = new ConcurrentHashMap();
    private final ExecutorService callbackexecutor;
    protected boolean ioErrors = false;
    private final long id = idGenerator.incrementAndGet();
    private final String remoteAddress;
    private AtomicBoolean closed = new AtomicBoolean(false);

    public AbstractChannel(String name, String remoteAddress, ExecutorService callbackexecutor) {
        super(name);
        this.callbackexecutor = callbackexecutor;
        this.remoteAddress = remoteAddress;
    }

    protected final long pendingCallbacks() {
        return this.callbacks.size();
    }

    public final long getId() {
        return this.id;
    }

    public final void pduReceived(Pdu message) {
        if (message.isRequest()) {
            this.handlePduRequest(message);
        } else {
            this.processPduResponse(message);
        }
    }

    public final void directProcessPdu(Pdu message) {
        if (message.isRequest()) {
            this.processRequest(message);
        } else {
            this.processPduResponse(message);
        }
    }

    private void processRequest(Pdu request) {
        try {
            this.messagesReceiver.requestReceived(request, this);
        }
        catch (Throwable t) {
            LOGGER.log(Level.SEVERE, this + ": error " + t, t);
            this.close();
        }
    }

    private void handlePduRequest(Pdu request) {
        this.submitCallback(() -> this.processRequest(request));
    }

    private void processPduResponse(Pdu pdu) {
        long replyMessageId = pdu.messageId;
        if (replyMessageId < 0L) {
            LOGGER.log(Level.SEVERE, "{0}: received response without replyId: type {1}", new Object[]{this, pdu.messageId});
            pdu.close();
            return;
        }
        Channel.PduCallback callback = this.callbacks.remove(replyMessageId);
        this.pendingReplyMessagesDeadline.remove(replyMessageId);
        if (callback != null) {
            this.submitCallback(() -> callback.responseReceived(pdu, null));
        }
    }

    @Override
    public final void sendReplyMessage(long inAnswerTo, ByteBuf message) {
        if (!this.isValid()) {
            LOGGER.log(Level.SEVERE, this + " channel not active, discarding reply message " + message);
            return;
        }
        this.sendOneWayMessage(message, new SendResultCallback(){

            @Override
            public void messageSent(Throwable error) {
                if (error != null) {
                    LOGGER.log(Level.SEVERE, this + " error:" + error, error);
                }
            }
        });
    }

    private void processPendingReplyMessagesDeadline() {
        ArrayList messagesWithNoReply = new ArrayList();
        long now = System.currentTimeMillis();
        this.pendingReplyMessagesDeadline.forEach((messageId, deadline) -> {
            if (deadline < now) {
                messagesWithNoReply.add(messageId);
            }
        });
        if (messagesWithNoReply.isEmpty()) {
            return;
        }
        LOGGER.log(Level.SEVERE, "{0} found {1} without reply, channel will be closed", new Object[]{this, messagesWithNoReply});
        this.ioErrors = true;
        for (Long messageId2 : messagesWithNoReply) {
            Channel.PduCallback callback = this.callbacks.remove(messageId2);
            if (callback == null) continue;
            this.submitCallback(() -> callback.responseReceived(null, new IOException(this + " reply timeout expired, channel will be closed")));
        }
        this.close();
    }

    @Override
    public final void sendRequestWithAsyncReply(long id, final ByteBuf message, long timeout, final Channel.PduCallback callback) {
        if (!this.isValid()) {
            callback.responseReceived(null, new Exception(this + " connection is not active"));
            return;
        }
        this.pendingReplyMessagesDeadline.put(id, System.currentTimeMillis() + timeout);
        this.callbacks.put(id, callback);
        this.sendOneWayMessage(message, new SendResultCallback(){

            @Override
            public void messageSent(Throwable error) {
                if (error != null) {
                    LOGGER.log(Level.SEVERE, this + ": error while sending reply message to " + message, error);
                    callback.responseReceived(null, new Exception(this + ": error while sending reply message to " + message, error));
                }
            }
        });
    }

    protected abstract String describeSocket();

    protected abstract void doClose();

    @Override
    public final void close() {
        if (!this.closed.compareAndSet(false, true)) {
            return;
        }
        LOGGER.log(Level.FINE, "{0}: closing", this);
        String socketDescription = this.describeSocket();
        this.doClose();
        this.failPendingMessages(socketDescription);
    }

    @Override
    public final boolean isClosed() {
        return this.closed.get();
    }

    private void failPendingMessages(String socketDescription) {
        this.callbacks.forEach((key, callback) -> {
            this.pendingReplyMessagesDeadline.remove(key);
            LOGGER.log(Level.SEVERE, "{0} message {1} was not replied callback:{2}", new Object[]{this, key, callback});
            this.submitCallback(() -> callback.responseReceived(null, new IOException("comunication channel is closed. Cannot wait for pending messages, socket=" + socketDescription)));
        });
        this.pendingReplyMessagesDeadline.clear();
        this.callbacks.clear();
    }

    final void exceptionCaught(Throwable cause) {
        LOGGER.log(Level.SEVERE, this + " io-error " + cause, cause);
        this.ioErrors = true;
    }

    final void channelClosed() {
        this.failPendingMessages(this.describeSocket());
        this.submitCallback(() -> {
            if (this.messagesReceiver != null) {
                this.messagesReceiver.channelClosed(this);
            }
        });
    }

    private void submitCallback(Runnable runnable) {
        try {
            this.callbackexecutor.submit(runnable);
        }
        catch (RejectedExecutionException stopped) {
            LOGGER.log(Level.SEVERE, this + " rejected runnable " + runnable + ":" + stopped);
            try {
                runnable.run();
            }
            catch (Throwable error) {
                LOGGER.log(Level.SEVERE, this + " error on rejected runnable " + runnable + ":" + error);
            }
        }
    }

    @Override
    public final String getRemoteAddress() {
        return this.remoteAddress;
    }

    @Override
    public final void channelIdle() {
        LOGGER.log(Level.FINEST, "{0} channelIdle", this);
        this.processPendingReplyMessagesDeadline();
    }
}

