/*
 * Decompiled with CFR 0.152.
 */
package org.apache.shardingsphere.data.pipeline.cdc.client.handler;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.util.List;
import java.util.Optional;
import java.util.function.Consumer;
import lombok.Generated;
import org.apache.shardingsphere.data.pipeline.cdc.client.constant.ClientConnectionStatus;
import org.apache.shardingsphere.data.pipeline.cdc.client.context.ClientConnectionContext;
import org.apache.shardingsphere.data.pipeline.cdc.client.handler.ExceptionHandler;
import org.apache.shardingsphere.data.pipeline.cdc.client.handler.ServerErrorResultHandler;
import org.apache.shardingsphere.data.pipeline.cdc.client.util.ResponseFuture;
import org.apache.shardingsphere.data.pipeline.cdc.client.util.ServerErrorResult;
import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.AckStreamingRequestBody;
import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CDCRequest;
import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse;
import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.DataRecordResult;
import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.ServerGreetingResult;
import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.StreamDataResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class CDCRequestHandler
extends ChannelInboundHandlerAdapter {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(CDCRequestHandler.class);
    private final Consumer<List<DataRecordResult.Record>> consumer;
    private final ExceptionHandler exceptionHandler;
    private final ServerErrorResultHandler errorResultHandler;

    public void channelRegistered(ChannelHandlerContext ctx) {
        ClientConnectionContext context = new ClientConnectionContext();
        context.getStatus().set(ClientConnectionStatus.NOT_LOGGED_IN);
        ctx.channel().attr(ClientConnectionContext.CONTEXT_KEY).setIfAbsent((Object)context);
    }

    public void channelInactive(ChannelHandlerContext ctx) {
        ctx.channel().attr(ClientConnectionContext.CONTEXT_KEY).setIfAbsent(null);
        log.info("Channel inactive, stop CDC client");
        ctx.fireChannelInactive();
    }

    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        CDCResponse response = (CDCResponse)msg;
        ClientConnectionContext connectionContext = (ClientConnectionContext)ctx.channel().attr(ClientConnectionContext.CONTEXT_KEY).get();
        Optional<ResponseFuture> responseFuture = Optional.ofNullable(connectionContext.getResponseFutureMap().get(response.getRequestId()));
        if (response.getStatus() != CDCResponse.Status.SUCCEED) {
            CDCRequest.Type requestType = CDCRequest.Type.UNKNOWN;
            if (responseFuture.isPresent()) {
                ResponseFuture future2 = responseFuture.get();
                future2.setErrorCode(response.getErrorCode());
                future2.setErrorMessage(response.getErrorMessage());
                future2.countDown();
                requestType = future2.getRequestType();
            }
            this.errorResultHandler.handleServerError(ctx, new ServerErrorResult(response.getErrorCode(), response.getErrorMessage(), requestType));
            responseFuture.ifPresent(future -> {
                future.setErrorCode(response.getErrorCode());
                future.setErrorMessage(response.getErrorMessage());
                future.countDown();
            });
            return;
        }
        if (response.hasServerGreetingResult()) {
            ServerGreetingResult serverGreetingResult = response.getServerGreetingResult();
            log.info("Received server greeting result, serverVersion={}, protocolVersion={}", (Object)serverGreetingResult.getServerVersion(), (Object)serverGreetingResult.getProtocolVersion());
            return;
        }
        if (ClientConnectionStatus.NOT_LOGGED_IN == connectionContext.getStatus().get() && responseFuture.isPresent() && CDCRequest.Type.LOGIN == responseFuture.get().getRequestType()) {
            responseFuture.ifPresent(ResponseFuture::countDown);
            connectionContext.getStatus().set(ClientConnectionStatus.LOGGED_IN);
            return;
        }
        if (response.hasStreamDataResult()) {
            StreamDataResult streamDataResult = response.getStreamDataResult();
            responseFuture.ifPresent(future -> future.setResult(response.getStreamDataResult().getStreamingId()));
            connectionContext.getStreamingIds().add(streamDataResult.getStreamingId());
        } else if (response.hasDataRecordResult()) {
            this.processDataRecords(ctx, response.getDataRecordResult());
        }
        responseFuture.ifPresent(ResponseFuture::countDown);
    }

    private void processDataRecords(ChannelHandlerContext ctx, DataRecordResult result) {
        this.consumer.accept(result.getRecordList());
        ctx.channel().writeAndFlush((Object)CDCRequest.newBuilder().setType(CDCRequest.Type.ACK_STREAMING).setAckStreamingRequestBody(AckStreamingRequestBody.newBuilder().setAckId(result.getAckId()).build()).build());
    }

    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        this.exceptionHandler.handleException(ctx, cause);
    }

    @Generated
    public CDCRequestHandler(Consumer<List<DataRecordResult.Record>> consumer, ExceptionHandler exceptionHandler, ServerErrorResultHandler errorResultHandler) {
        this.consumer = consumer;
        this.exceptionHandler = exceptionHandler;
        this.errorResultHandler = errorResultHandler;
    }
}

