/*
 * Decompiled with CFR 0.152.
 */
package com.antgroup.geaflow.shuffle.network.netty;

import com.antgroup.geaflow.shuffle.api.pipeline.buffer.OutBuffer;
import com.antgroup.geaflow.shuffle.api.pipeline.buffer.PipeBuffer;
import com.antgroup.geaflow.shuffle.api.pipeline.buffer.PipeChannelBuffer;
import com.antgroup.geaflow.shuffle.api.pipeline.channel.ChannelId;
import com.antgroup.geaflow.shuffle.api.pipeline.fetcher.SequenceSliceReader;
import com.antgroup.geaflow.shuffle.network.protocol.ErrorResponse;
import com.antgroup.geaflow.shuffle.network.protocol.SliceResponse;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.concurrent.GenericFutureListener;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SliceOutputChannelHandler
extends ChannelInboundHandlerAdapter {
    private static final Logger LOGGER = LoggerFactory.getLogger(SliceOutputChannelHandler.class);
    private final ArrayDeque<SequenceSliceReader> availableReaders = new ArrayDeque();
    private final ConcurrentMap<ChannelId, SequenceSliceReader> allReaders = new ConcurrentHashMap<ChannelId, SequenceSliceReader>();
    private boolean fatalError;
    private ChannelHandlerContext ctx;

    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        if (this.ctx == null) {
            this.ctx = ctx;
        }
        super.channelRegistered(ctx);
    }

    public void notifyNonEmpty(SequenceSliceReader reader) {
        this.ctx.executor().execute(() -> this.ctx.pipeline().fireUserEventTriggered((Object)reader));
    }

    private void enqueueReader(SequenceSliceReader reader) throws Exception {
        if (reader.isRegistered() || !reader.hasNext()) {
            return;
        }
        boolean triggerWrite = this.availableReaders.isEmpty();
        this.addAvailableReader(reader);
        if (triggerWrite) {
            this.writeAndFlushNextMessageIfPossible(this.ctx.channel());
        }
    }

    public void notifyReaderCreated(SequenceSliceReader reader) {
        this.allReaders.put(reader.getReceiverId(), reader);
    }

    public void cancel(ChannelId receiverId) {
        this.ctx.pipeline().fireUserEventTriggered((Object)receiverId);
    }

    public void close() throws IOException {
        if (this.ctx != null) {
            this.ctx.channel().close();
        }
        for (SequenceSliceReader reader : this.allReaders.values()) {
            this.releaseReader(reader);
        }
        this.allReaders.clear();
    }

    void updateRequestedBatchId(ChannelId receiverId, Consumer<SequenceSliceReader> operation) throws Exception {
        if (this.fatalError) {
            return;
        }
        SequenceSliceReader reader = (SequenceSliceReader)this.allReaders.get(receiverId);
        if (reader == null) {
            throw new IllegalStateException("No reader for receiverId = " + receiverId + " exists.");
        }
        operation.accept(reader);
        this.enqueueReader(reader);
    }

    public void userEventTriggered(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (msg instanceof SequenceSliceReader) {
            this.enqueueReader((SequenceSliceReader)msg);
        } else if (msg.getClass() == ChannelId.class) {
            ChannelId toCancel = (ChannelId)msg;
            this.availableReaders.removeIf(reader -> reader.getReceiverId().equals(toCancel));
            SequenceSliceReader toRelease = (SequenceSliceReader)this.allReaders.remove(toCancel);
            if (toRelease != null) {
                this.releaseReader(toRelease);
            }
        } else {
            ctx.fireUserEventTriggered(msg);
        }
    }

    public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
        this.writeAndFlushNextMessageIfPossible(ctx.channel());
    }

    private void writeAndFlushNextMessageIfPossible(Channel channel) throws IOException {
        if (this.fatalError || !channel.isWritable()) {
            return;
        }
        SequenceSliceReader reader = null;
        try {
            PipeChannelBuffer next;
            do {
                if ((reader = this.pollAvailableReader()) != null) continue;
                return;
            } while ((next = reader.next()) == null);
            if (next.moreAvailable()) {
                this.addAvailableReader(reader);
            }
            SliceResponse msg = new SliceResponse(next.getBuffer(), reader.getSequenceNumber(), reader.getReceiverId());
            channel.writeAndFlush((Object)msg).addListener((GenericFutureListener)new WriteNextMessageIfPossibleListener(next.getBuffer()));
            return;
        }
        catch (Throwable t) {
            LOGGER.error("fetch {} failed: {}", (Object)reader, (Object)t.getMessage());
            throw new IOException(t.getMessage(), t);
        }
    }

    private void addAvailableReader(SequenceSliceReader reader) {
        this.availableReaders.add(reader);
        reader.setRegistered(true);
    }

    private SequenceSliceReader pollAvailableReader() {
        SequenceSliceReader reader = this.availableReaders.poll();
        if (reader != null) {
            reader.setRegistered(false);
        }
        return reader;
    }

    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        LOGGER.warn("channel inactive and release resource...");
        this.releaseAllResources();
        ctx.fireChannelInactive();
    }

    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        this.handleException(ctx.channel(), cause);
    }

    private void handleException(Channel channel, Throwable cause) throws IOException {
        LOGGER.error("Encountered error while consuming slices", cause);
        this.fatalError = true;
        this.releaseAllResources();
        if (channel.isActive()) {
            channel.writeAndFlush((Object)new ErrorResponse(cause)).addListener((GenericFutureListener)ChannelFutureListener.CLOSE);
        }
    }

    private void releaseAllResources() throws IOException {
        for (SequenceSliceReader reader : this.allReaders.values()) {
            this.releaseReader(reader);
        }
        this.availableReaders.clear();
        this.allReaders.clear();
    }

    private void releaseReader(SequenceSliceReader reader) throws IOException {
        reader.setRegistered(false);
        reader.releaseAllResources();
    }

    private class WriteNextMessageIfPossibleListener
    implements ChannelFutureListener {
        private final OutBuffer buffer;

        public WriteNextMessageIfPossibleListener(PipeBuffer pipeBuffer) {
            this.buffer = pipeBuffer.getBuffer();
        }

        public void operationComplete(ChannelFuture future) throws Exception {
            try {
                if (this.buffer != null && this.buffer.isDisposable()) {
                    this.buffer.release();
                }
                if (future.isSuccess()) {
                    SliceOutputChannelHandler.this.writeAndFlushNextMessageIfPossible(future.channel());
                } else if (future.cause() != null) {
                    SliceOutputChannelHandler.this.handleException(future.channel(), future.cause());
                } else {
                    SliceOutputChannelHandler.this.handleException(future.channel(), new IllegalStateException("Sending cancelled by user."));
                }
            }
            catch (Throwable t) {
                SliceOutputChannelHandler.this.handleException(future.channel(), t);
            }
        }
    }
}

