/*
 * Decompiled with CFR 0.152.
 */
package io.dingodb.net.netty.channel;

import io.dingodb.common.Location;
import io.dingodb.common.codec.PrimitiveCodec;
import io.dingodb.common.concurrent.Executors;
import io.dingodb.common.concurrent.LinkedRunner;
import io.dingodb.common.util.Parameters;
import io.dingodb.net.Channel;
import io.dingodb.net.Message;
import io.dingodb.net.MessageListener;
import io.dingodb.net.netty.api.ApiRegistryImpl;
import io.dingodb.net.netty.connection.Connection;
import io.dingodb.net.netty.handler.TagMessageHandler;
import io.dingodb.net.netty.packet.Command;
import io.dingodb.net.netty.packet.Type;
import io.netty.buffer.ByteBuf;
import java.nio.ByteBuffer;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Channel
implements io.dingodb.net.Channel {
    private static final Logger log = LoggerFactory.getLogger(Channel.class);
    private static final long WAIT_THREAD_TIME = TimeUnit.MILLISECONDS.toNanos(2L);
    private static final ApiRegistryImpl API_REGISTRY = ApiRegistryImpl.instance();
    private static final MessageListener EMPTY_MESSAGE_LISTENER = (msg, ch) -> log.warn("Receive message, but listener is empty.");
    private static final Consumer<io.dingodb.net.Channel> EMPTY_CLOSE_LISTENER = ch -> {};
    private int closeRetry = 300;
    protected final long channelId;
    protected final Connection connection;
    protected final Consumer<Long> onClose;
    protected LinkedRunner runner;
    protected Channel.Status status;
    private Consumer<ByteBuffer> directListener = null;
    private MessageListener messageListener = null;
    private Consumer<io.dingodb.net.Channel> closeListener = EMPTY_CLOSE_LISTENER;

    public Channel(long channelId, Connection connection, LinkedRunner runner, Consumer<Long> onClose) {
        this.channelId = channelId;
        this.connection = connection;
        this.onClose = onClose;
        this.status = Channel.Status.ACTIVE;
        this.runner = runner;
    }

    public ByteBuf buffer(Type type2, int capacity) {
        capacity = capacity + 8 + 1;
        return this.connection.alloc().buffer(capacity + 4, capacity + 4).writeInt(capacity).writeLong(this.channelId).writeByte(type2.ordinal());
    }

    @Override
    public synchronized void close() {
        if (this.status == Channel.Status.CLOSE) {
            log.warn("Channel [{}] already close", (Object)this.channelId);
            return;
        }
        this.shutdown();
        try {
            this.sendAsync(this.buffer(Type.COMMAND, 1).writeByte(Command.CLOSE.code()));
        }
        catch (Exception e) {
            log.error("Send close message error.", e);
        }
    }

    public synchronized void shutdown() {
        if (this.status == Channel.Status.CLOSE) {
            return;
        }
        this.status = Channel.Status.CLOSE;
        this.runner.forceFollow(() -> this.onClose.accept(this.channelId));
        this.runner.forceFollow(() -> this.closeListener.accept(this));
        this.runner = null;
    }

    @Override
    public synchronized void setMessageListener(MessageListener listener) {
        this.messageListener = Parameters.cleanNull(listener, EMPTY_MESSAGE_LISTENER);
    }

    @Override
    public synchronized void setCloseListener(Consumer<io.dingodb.net.Channel> listener) {
        if (this.isClosed()) {
            this.runner.forceFollow(() -> this.closeListener.accept(this));
        } else {
            this.closeListener = Parameters.cleanNull(listener, EMPTY_CLOSE_LISTENER);
        }
    }

    @Override
    public Location localLocation() {
        return this.connection.localLocation();
    }

    @Override
    public Location remoteLocation() {
        return this.connection.remoteLocation();
    }

    @Override
    public void send(Message message) {
        this.send(message, false);
    }

    @Override
    public void send(Message message, boolean sync) {
        if (this.isClosed()) {
            throw new RuntimeException("The channel is closed");
        }
        byte[] msg = message.encode();
        if (log.isTraceEnabled()) {
            log.trace("Send message to [{}] on [{}].", (Object)this.remoteLocation().getUrl(), (Object)this.channelId);
        }
        if (sync) {
            try {
                this.send(this.buffer(Type.USER_DEFINE, msg.length).writeBytes(msg));
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
        try {
            this.sendAsync(this.buffer(Type.USER_DEFINE, msg.length).writeBytes(msg));
        }
        catch (Exception e) {
            log.error("Send message to {} on {} error.", this.remoteLocation().getUrl(), this.channelId, e);
        }
    }

    public void send(ByteBuf content) throws InterruptedException {
        this.connection.send(content);
    }

    public void sendAsync(ByteBuf content) {
        this.connection.sendAsync(content);
    }

    public void receive(ByteBuffer buffer) {
        if (this.status == Channel.Status.ACTIVE && !this.runner.follow(() -> this.processMessage(buffer))) {
            log.error("Channel [{}] concurrent receive.", (Object)this.channelId);
        }
    }

    private void processMessage(ByteBuffer buffer) {
        try {
            switch (Type.values()[buffer.get()]) {
                case USER_DEFINE: {
                    if (this.directListener != null) {
                        this.directListener.accept(buffer);
                        return;
                    }
                    Message message = Message.decode(buffer);
                    if (this.messageListener != null) {
                        this.messageListener.onMessage(message, this);
                    }
                    TagMessageHandler.instance().handler(message, this);
                    break;
                }
                case COMMAND: {
                    this.processCommand(buffer);
                    break;
                }
                case API: {
                    API_REGISTRY.invoke(this, buffer);
                    break;
                }
                default: {
                    throw new IllegalStateException("Unexpected value: " + (Object)((Object)Type.values()[buffer.get()]));
                }
            }
        }
        catch (Exception e) {
            log.error("Process message failed.", e);
        }
    }

    private void processCommand(ByteBuffer buffer) {
        Command type2 = Command.values()[buffer.get()];
        switch (type2) {
            case PONG: {
                if (log.isTraceEnabled()) {
                    log.trace("Channel [{}] receive pong command.", (Object)this.channelId);
                }
                return;
            }
            case ACK: {
                if (log.isTraceEnabled()) {
                    log.trace("Channel [{}] receive ack command.", (Object)this.channelId);
                }
                return;
            }
            case PING: {
                if (log.isTraceEnabled()) {
                    log.trace("Channel [{}] receive ping command.", (Object)this.channelId);
                }
                this.sendAsync(this.buffer(Type.COMMAND, 1).writeByte(Command.PONG.code()));
                return;
            }
            case CLOSE: {
                if (log.isTraceEnabled()) {
                    log.trace("Channel [{}] receive close command.", (Object)this.channelId);
                }
                this.shutdown();
                Executors.execute(this.channelId + "-channel-close", () -> this.onClose.accept(this.channelId));
                return;
            }
            case ERROR: {
                log.error("Receive error: {}.", (Object)PrimitiveCodec.readString(buffer));
                return;
            }
        }
        throw new IllegalStateException("Unexpected value: " + (Object)((Object)type2));
    }

    public int closeRetry() {
        return this.closeRetry;
    }

    public Consumer<Long> onClose() {
        return this.onClose;
    }

    public LinkedRunner runner() {
        return this.runner;
    }

    public Consumer<ByteBuffer> directListener() {
        return this.directListener;
    }

    public MessageListener messageListener() {
        return this.messageListener;
    }

    public Consumer<io.dingodb.net.Channel> closeListener() {
        return this.closeListener;
    }

    @Override
    public long channelId() {
        return this.channelId;
    }

    public Connection connection() {
        return this.connection;
    }

    @Override
    public Channel.Status status() {
        return this.status;
    }

    public Channel directListener(Consumer<ByteBuffer> directListener) {
        this.directListener = directListener;
        return this;
    }
}

