/*
 * Decompiled with CFR 0.152.
 */
package com.antgroup.geaflow.shuffle.network.netty;

import com.antgroup.geaflow.shuffle.api.pipeline.channel.ChannelId;
import com.antgroup.geaflow.shuffle.api.pipeline.fetcher.SequenceSliceReader;
import com.antgroup.geaflow.shuffle.network.netty.SliceOutputChannelHandler;
import com.antgroup.geaflow.shuffle.network.protocol.BatchRequest;
import com.antgroup.geaflow.shuffle.network.protocol.CancelRequest;
import com.antgroup.geaflow.shuffle.network.protocol.CloseRequest;
import com.antgroup.geaflow.shuffle.network.protocol.ErrorResponse;
import com.antgroup.geaflow.shuffle.network.protocol.NettyMessage;
import com.antgroup.geaflow.shuffle.network.protocol.SliceRequest;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SliceRequestServerHandler
extends SimpleChannelInboundHandler<NettyMessage> {
    private static final Logger LOGGER = LoggerFactory.getLogger(SliceRequestServerHandler.class);
    private final SliceOutputChannelHandler outboundQueue;

    public SliceRequestServerHandler(SliceOutputChannelHandler outboundQueue) {
        this.outboundQueue = outboundQueue;
    }

    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        super.channelRegistered(ctx);
    }

    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
        super.channelUnregistered(ctx);
    }

    protected void channelRead0(ChannelHandlerContext ctx, NettyMessage msg) throws Exception {
        try {
            Class<?> msgClazz = msg.getClass();
            if (msgClazz == SliceRequest.class) {
                SliceRequest request = (SliceRequest)msg;
                try {
                    SequenceSliceReader reader2 = new SequenceSliceReader(request.getReceiverId(), this.outboundQueue);
                    reader2.createSliceReader(request.getSliceId(), request.getStartBatchId());
                    this.outboundQueue.notifyReaderCreated(reader2);
                }
                catch (Throwable notFound) {
                    this.respondWithError(ctx, notFound, request.getReceiverId());
                }
            } else if (msgClazz == CancelRequest.class) {
                CancelRequest request = (CancelRequest)msg;
                this.outboundQueue.cancel(request.receiverId());
            } else if (msgClazz == CloseRequest.class) {
                this.outboundQueue.close();
            } else if (msgClazz == BatchRequest.class) {
                BatchRequest request = (BatchRequest)msg;
                this.outboundQueue.updateRequestedBatchId(request.receiverId(), reader -> reader.requestBatch(request.getNextBatchId()));
            } else {
                LOGGER.warn("Received unexpected client request: {}", (Object)msg);
                this.respondWithError(ctx, new IllegalArgumentException("unknown request:" + msg));
            }
        }
        catch (Throwable t) {
            this.respondWithError(ctx, t);
        }
    }

    private void respondWithError(ChannelHandlerContext ctx, Throwable error) {
        ctx.writeAndFlush((Object)new ErrorResponse(error));
    }

    private void respondWithError(ChannelHandlerContext ctx, Throwable error, ChannelId sourceId) {
        ctx.writeAndFlush((Object)new ErrorResponse(sourceId, error));
    }
}

