/*
 * Decompiled with CFR 0.152.
 */
package com.antgroup.geaflow.shuffle.network.netty;

import com.antgroup.geaflow.shuffle.api.pipeline.channel.RemoteInputChannel;
import com.antgroup.geaflow.shuffle.message.SliceId;
import com.antgroup.geaflow.shuffle.network.ConnectionId;
import com.antgroup.geaflow.shuffle.network.netty.SliceRequestClientFactory;
import com.antgroup.geaflow.shuffle.network.netty.SliceRequestClientHandler;
import com.antgroup.geaflow.shuffle.network.protocol.BatchRequest;
import com.antgroup.geaflow.shuffle.network.protocol.CloseRequest;
import com.antgroup.geaflow.shuffle.network.protocol.SliceRequest;
import com.antgroup.geaflow.shuffle.util.AtomicReferenceCounter;
import com.antgroup.geaflow.shuffle.util.TransportException;
import com.google.common.base.Preconditions;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.util.concurrent.GenericFutureListener;
import java.io.IOException;
import java.net.SocketAddress;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SliceRequestClient {
    private static final Logger LOGGER = LoggerFactory.getLogger(SliceRequestClient.class);
    private final Channel tcpChannel;
    private final ConnectionId connectionId;
    private final SliceRequestClientHandler clientHandler;
    private final SliceRequestClientFactory clientFactory;
    private final AtomicReferenceCounter closeReferenceCounter = new AtomicReferenceCounter();

    public SliceRequestClient(Channel tcpChannel, SliceRequestClientHandler clientHandler, ConnectionId connectionId, SliceRequestClientFactory clientFactory) {
        this.tcpChannel = (Channel)Preconditions.checkNotNull((Object)tcpChannel);
        this.clientHandler = (SliceRequestClientHandler)((Object)Preconditions.checkNotNull((Object)((Object)clientHandler)));
        this.connectionId = (ConnectionId)Preconditions.checkNotNull((Object)connectionId);
        this.clientFactory = (SliceRequestClientFactory)Preconditions.checkNotNull((Object)clientFactory);
    }

    public boolean disposeIfNotUsed() {
        return this.closeReferenceCounter.disposeIfNotUsed();
    }

    boolean incrementReferenceCounter() {
        return this.closeReferenceCounter.increment();
    }

    public void requestSlice(SliceId sliceId, final RemoteInputChannel inputChannel, int delayMs, long startBatchId) throws IOException {
        this.checkNotClosed();
        this.clientHandler.addInputChannel(inputChannel);
        final SliceRequest request = new SliceRequest(sliceId, startBatchId, inputChannel.getInputChannelId());
        final ChannelFutureListener listener = new ChannelFutureListener(){

            public void operationComplete(ChannelFuture future) throws Exception {
                if (!future.isSuccess()) {
                    SliceRequestClient.this.clientHandler.removeInputChannel(inputChannel);
                    SocketAddress remoteAddr = future.channel().remoteAddress();
                    inputChannel.onError(new TransportException(String.format("Sending the request to '%s' failed.", remoteAddr), future.channel().localAddress(), future.cause()));
                }
            }
        };
        if (delayMs == 0) {
            ChannelFuture f = this.tcpChannel.writeAndFlush((Object)request);
            f.addListener((GenericFutureListener)listener);
        } else {
            final ChannelFuture[] f = new ChannelFuture[1];
            this.tcpChannel.eventLoop().schedule(new Runnable(){

                @Override
                public void run() {
                    f[0] = SliceRequestClient.this.tcpChannel.writeAndFlush((Object)request);
                    f[0].addListener((GenericFutureListener)listener);
                }
            }, (long)delayMs, TimeUnit.MILLISECONDS);
        }
    }

    public void requestNextBatch(long batchId, final RemoteInputChannel inputChannel) throws IOException {
        this.checkNotClosed();
        BatchRequest request = new BatchRequest(batchId, inputChannel.getInputChannelId());
        ChannelFutureListener listener = new ChannelFutureListener(){

            public void operationComplete(ChannelFuture future) throws Exception {
                if (!future.isSuccess()) {
                    SocketAddress remoteAddr = future.channel().remoteAddress();
                    inputChannel.onError(new TransportException(String.format("Sending the batch request to '%s' failed.", remoteAddr), future.channel().localAddress(), future.cause()));
                }
            }
        };
        ChannelFuture f = this.tcpChannel.writeAndFlush((Object)request);
        f.addListener((GenericFutureListener)listener);
    }

    public void close(RemoteInputChannel inputChannel) throws IOException {
        this.clientHandler.removeInputChannel(inputChannel);
        if (this.closeReferenceCounter.decrement()) {
            this.tcpChannel.writeAndFlush((Object)new CloseRequest()).addListener((GenericFutureListener)ChannelFutureListener.CLOSE_ON_FAILURE);
            this.clientFactory.destroyRequestClient(this.connectionId, this);
        } else {
            LOGGER.warn("cancel slice consumption of {}", (Object)inputChannel.getInputSliceId());
            this.clientHandler.cancelRequest(inputChannel.getInputChannelId());
        }
    }

    private void checkNotClosed() throws IOException {
        if (this.closeReferenceCounter.isDisposed()) {
            SocketAddress localAddr = this.tcpChannel.localAddress();
            SocketAddress remoteAddr = this.tcpChannel.remoteAddress();
            throw new TransportException(String.format("Channel to '%s' closed.", remoteAddr), localAddr);
        }
    }
}

