/*
 * Decompiled with CFR 0.152.
 */
package org.apache.shardingsphere.data.pipeline.cdc.client;

import com.google.common.hash.Hashing;
import com.google.protobuf.MessageLite;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
import java.util.List;
import java.util.function.Consumer;
import lombok.Generated;
import org.apache.shardingsphere.data.pipeline.cdc.client.config.CDCClientConfiguration;
import org.apache.shardingsphere.data.pipeline.cdc.client.constant.ClientConnectionStatus;
import org.apache.shardingsphere.data.pipeline.cdc.client.context.ClientConnectionContext;
import org.apache.shardingsphere.data.pipeline.cdc.client.handler.CDCRequestHandler;
import org.apache.shardingsphere.data.pipeline.cdc.client.handler.ExceptionHandler;
import org.apache.shardingsphere.data.pipeline.cdc.client.handler.ServerErrorResultHandler;
import org.apache.shardingsphere.data.pipeline.cdc.client.parameter.CDCLoginParameter;
import org.apache.shardingsphere.data.pipeline.cdc.client.parameter.StartStreamingParameter;
import org.apache.shardingsphere.data.pipeline.cdc.client.util.RequestIdUtils;
import org.apache.shardingsphere.data.pipeline.cdc.client.util.ResponseFuture;
import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CDCRequest;
import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.DropStreamingRequestBody;
import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.LoginRequestBody;
import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StartStreamingRequestBody;
import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StopStreamingRequestBody;
import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StreamDataRequestBody;
import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse;
import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.DataRecordResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class CDCClient
implements AutoCloseable {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(CDCClient.class);
    private final CDCClientConfiguration config;
    private NioEventLoopGroup group;
    private Channel channel;

    public CDCClient(CDCClientConfiguration config) {
        this.validateParameter(config);
        this.config = config;
    }

    private void validateParameter(CDCClientConfiguration parameter) {
        if (null == parameter.getAddress() || parameter.getAddress().isEmpty()) {
            throw new IllegalArgumentException("The address parameter can't be null");
        }
        if (parameter.getPort() <= 0) {
            throw new IllegalArgumentException("The port must be greater than 0");
        }
    }

    public void connect(final Consumer<List<DataRecordResult.Record>> dataConsumer, final ExceptionHandler exceptionHandler, final ServerErrorResultHandler errorResultHandler) {
        Bootstrap bootstrap = new Bootstrap();
        this.group = new NioEventLoopGroup(1);
        ((Bootstrap)((Bootstrap)((Bootstrap)((Bootstrap)bootstrap.channel(NioSocketChannel.class)).group((EventLoopGroup)this.group)).option(ChannelOption.ALLOCATOR, (Object)PooledByteBufAllocator.DEFAULT)).option(ChannelOption.SO_REUSEADDR, (Object)true)).handler((ChannelHandler)new ChannelInitializer<NioSocketChannel>(){

            protected void initChannel(NioSocketChannel channel) {
                channel.pipeline().addLast(new ChannelHandler[]{new ProtobufVarint32FrameDecoder()});
                channel.pipeline().addLast(new ChannelHandler[]{new ProtobufDecoder((MessageLite)CDCResponse.getDefaultInstance())});
                channel.pipeline().addLast(new ChannelHandler[]{new ProtobufVarint32LengthFieldPrepender()});
                channel.pipeline().addLast(new ChannelHandler[]{new ProtobufEncoder()});
                channel.pipeline().addLast(new ChannelHandler[]{new CDCRequestHandler(dataConsumer, exceptionHandler, errorResultHandler)});
            }
        });
        this.channel = bootstrap.connect(this.config.getAddress(), this.config.getPort()).sync().channel();
    }

    public void await() throws InterruptedException {
        this.channel.closeFuture().sync();
    }

    public synchronized void login(CDCLoginParameter parameter) {
        this.checkChannelActive();
        ClientConnectionContext connectionContext = (ClientConnectionContext)this.channel.attr(ClientConnectionContext.CONTEXT_KEY).get();
        if (ClientConnectionStatus.LOGGED_IN == connectionContext.getStatus().get()) {
            throw new IllegalStateException("The client is already logged in");
        }
        LoginRequestBody loginRequestBody = LoginRequestBody.newBuilder().setType(LoginRequestBody.LoginType.BASIC).setBasicBody(LoginRequestBody.BasicBody.newBuilder().setUsername(parameter.getUsername()).setPassword(Hashing.sha256().hashBytes(parameter.getPassword().getBytes()).toString().toUpperCase()).build()).build();
        String requestId = RequestIdUtils.generateRequestId();
        CDCRequest data = CDCRequest.newBuilder().setType(CDCRequest.Type.LOGIN).setVersion(1).setRequestId(requestId).setLoginRequestBody(loginRequestBody).build();
        ResponseFuture responseFuture = new ResponseFuture(requestId, CDCRequest.Type.LOGIN);
        connectionContext.getResponseFutureMap().put(requestId, responseFuture);
        this.channel.writeAndFlush((Object)data);
        responseFuture.waitResponseResult(this.config.getTimeoutMills(), connectionContext);
        log.info("Login success, username: {}", (Object)parameter.getUsername());
    }

    private void checkChannelActive() {
        if (null == this.channel || !this.channel.isActive()) {
            throw new IllegalStateException("The channel is not active, call the `connect` method first");
        }
    }

    public String startStreaming(StartStreamingParameter parameter) {
        StreamDataRequestBody streamDataRequestBody = StreamDataRequestBody.newBuilder().setDatabase(parameter.getDatabase()).setFull(parameter.isFull()).addAllSourceSchemaTable(parameter.getSchemaTables()).build();
        String requestId = RequestIdUtils.generateRequestId();
        CDCRequest request = CDCRequest.newBuilder().setRequestId(requestId).setType(CDCRequest.Type.STREAM_DATA).setStreamDataRequestBody(streamDataRequestBody).build();
        ClientConnectionContext connectionContext = (ClientConnectionContext)this.channel.attr(ClientConnectionContext.CONTEXT_KEY).get();
        ResponseFuture responseFuture = new ResponseFuture(requestId, CDCRequest.Type.STREAM_DATA);
        connectionContext.getResponseFutureMap().put(requestId, responseFuture);
        this.channel.writeAndFlush((Object)request);
        String result = responseFuture.waitResponseResult(this.config.getTimeoutMills(), connectionContext).toString();
        log.info("Start streaming success, streaming id: {}", (Object)result);
        return result;
    }

    public void restartStreaming(String streamingId) {
        String requestId = RequestIdUtils.generateRequestId();
        StartStreamingRequestBody body = StartStreamingRequestBody.newBuilder().setStreamingId(streamingId).build();
        CDCRequest request = CDCRequest.newBuilder().setRequestId(requestId).setType(CDCRequest.Type.START_STREAMING).setStartStreamingRequestBody(body).build();
        ResponseFuture responseFuture = new ResponseFuture(requestId, CDCRequest.Type.START_STREAMING);
        ClientConnectionContext connectionContext = (ClientConnectionContext)this.channel.attr(ClientConnectionContext.CONTEXT_KEY).get();
        connectionContext.getResponseFutureMap().put(requestId, responseFuture);
        this.channel.writeAndFlush((Object)request);
        responseFuture.waitResponseResult(this.config.getTimeoutMills(), connectionContext);
        log.info("Restart streaming success, streaming id: {}", (Object)streamingId);
    }

    public void stopStreaming(String streamingId) {
        String requestId = RequestIdUtils.generateRequestId();
        StopStreamingRequestBody body = StopStreamingRequestBody.newBuilder().setStreamingId(streamingId).build();
        CDCRequest request = CDCRequest.newBuilder().setRequestId(requestId).setType(CDCRequest.Type.STOP_STREAMING).setStopStreamingRequestBody(body).build();
        ResponseFuture responseFuture = new ResponseFuture(requestId, CDCRequest.Type.STOP_STREAMING);
        ClientConnectionContext connectionContext = (ClientConnectionContext)this.channel.attr(ClientConnectionContext.CONTEXT_KEY).get();
        connectionContext.getResponseFutureMap().put(requestId, responseFuture);
        this.channel.writeAndFlush((Object)request);
        responseFuture.waitResponseResult(this.config.getTimeoutMills(), connectionContext);
        connectionContext.getStreamingIds().remove(streamingId);
        log.info("Stop streaming success, streaming id: {}", (Object)streamingId);
    }

    public void dropStreaming(String streamingId) {
        String requestId = RequestIdUtils.generateRequestId();
        DropStreamingRequestBody body = DropStreamingRequestBody.newBuilder().setStreamingId(streamingId).build();
        CDCRequest request = CDCRequest.newBuilder().setRequestId(requestId).setType(CDCRequest.Type.DROP_STREAMING).setDropStreamingRequestBody(body).build();
        ResponseFuture responseFuture = new ResponseFuture(requestId, CDCRequest.Type.DROP_STREAMING);
        ClientConnectionContext connectionContext = (ClientConnectionContext)this.channel.attr(ClientConnectionContext.CONTEXT_KEY).get();
        connectionContext.getResponseFutureMap().put(requestId, responseFuture);
        this.channel.writeAndFlush((Object)request);
        responseFuture.waitResponseResult(this.config.getTimeoutMills(), connectionContext);
        connectionContext.getStreamingIds().remove(streamingId);
        log.info("Drop streaming success, streaming id: {}", (Object)streamingId);
    }

    @Override
    public void close() {
        if (null != this.channel) {
            this.channel.close().awaitUninterruptibly();
        }
        if (null != this.group) {
            this.group.shutdownGracefully();
        }
    }
}

