/*
 * Decompiled with CFR 0.152.
 */
package com.starrocks.connector.spark.backend;

import com.starrocks.connector.spark.cfg.Settings;
import com.starrocks.connector.spark.exception.ConnectedFailedException;
import com.starrocks.connector.spark.exception.StarrocksException;
import com.starrocks.connector.spark.exception.StarrocksInternalException;
import com.starrocks.connector.spark.serialization.Routing;
import com.starrocks.shade.org.apache.thrift.TException;
import com.starrocks.shade.org.apache.thrift.protocol.TBinaryProtocol;
import com.starrocks.shade.org.apache.thrift.protocol.TProtocol;
import com.starrocks.shade.org.apache.thrift.transport.TSocket;
import com.starrocks.shade.org.apache.thrift.transport.TTransport;
import com.starrocks.shade.org.apache.thrift.transport.TTransportException;
import com.starrocks.thrift.TScanBatchResult;
import com.starrocks.thrift.TScanCloseParams;
import com.starrocks.thrift.TScanCloseResult;
import com.starrocks.thrift.TScanNextBatchParams;
import com.starrocks.thrift.TScanOpenParams;
import com.starrocks.thrift.TScanOpenResult;
import com.starrocks.thrift.TStarrocksExternalService;
import com.starrocks.thrift.TStatusCode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BackendClient {
    private static Logger logger = LoggerFactory.getLogger(BackendClient.class);
    private Routing routing;
    private TStarrocksExternalService.Client client;
    private TTransport transport;
    private boolean isConnected = false;
    private final int retries;
    private final int socketTimeout;
    private final int connectTimeout;

    public BackendClient(Routing routing, Settings settings) throws ConnectedFailedException {
        this.routing = routing;
        this.connectTimeout = settings.getIntegerProperty("starrocks.request.connect.timeout.ms", 30000);
        this.socketTimeout = settings.getIntegerProperty("starrocks.request.read.timeout.ms", 30000);
        this.retries = settings.getIntegerProperty("starrocks.request.retries", 3);
        logger.trace("connect timeout set to '{}'. socket timeout set to '{}'. retries set to '{}'.", new Object[]{this.connectTimeout, this.socketTimeout, this.retries});
        this.open();
    }

    private void open() throws ConnectedFailedException {
        logger.debug("Open client to StarRocks BE '{}'.", (Object)this.routing);
        TTransportException ex = null;
        for (int attempt = 0; !this.isConnected && attempt < this.retries; ++attempt) {
            logger.debug("Attempt {} to connect {}.", (Object)attempt, (Object)this.routing);
            TBinaryProtocol.Factory factory = new TBinaryProtocol.Factory();
            try {
                this.transport = new TSocket(this.routing.getHost(), this.routing.getPort(), this.socketTimeout, this.connectTimeout);
                TProtocol protocol = factory.getProtocol(this.transport);
                this.client = new TStarrocksExternalService.Client(protocol);
                logger.trace("Connect status before open transport to {} is '{}'.", (Object)this.routing, (Object)this.isConnected);
                if (!this.transport.isOpen()) {
                    this.transport.open();
                    this.isConnected = true;
                }
            }
            catch (TTransportException e) {
                logger.warn("Connect to StarRocks {} failed.", (Object)this.routing, (Object)e);
                ex = e;
            }
            if (!this.isConnected) continue;
            logger.info("Success connect to {}.", (Object)this.routing);
            break;
        }
        if (!this.isConnected) {
            logger.error("Connect to StarRocks {} failed.", (Object)this.routing);
            throw new ConnectedFailedException(this.routing.toString(), ex);
        }
    }

    private void close() {
        logger.trace("Connect status before close with '{}' is '{}'.", (Object)this.routing, (Object)this.isConnected);
        this.isConnected = false;
        if (null != this.client) {
            this.client = null;
        }
        if (this.transport != null && this.transport.isOpen()) {
            this.transport.close();
            logger.info("Closed a connection to {}.", (Object)this.routing);
        }
    }

    public TScanOpenResult openScanner(TScanOpenParams openParams) throws ConnectedFailedException {
        logger.debug("OpenScanner to '{}', parameter is '{}'.", (Object)this.routing, (Object)openParams);
        if (!this.isConnected) {
            this.open();
        }
        TException ex = null;
        for (int attempt = 0; attempt < this.retries; ++attempt) {
            logger.debug("Attempt {} to openScanner {}.", (Object)attempt, (Object)this.routing);
            try {
                TScanOpenResult result = this.client.open_scanner(openParams);
                if (result == null) {
                    logger.warn("Open scanner result from {} is null.", (Object)this.routing);
                    continue;
                }
                if (!TStatusCode.OK.equals(result.getStatus().getStatus_code())) {
                    logger.warn("The status of open scanner result from {} is '{}', error message is: {}.", new Object[]{this.routing, result.getStatus().getStatus_code(), result.getStatus().getError_msgs()});
                    continue;
                }
                return result;
            }
            catch (TException e) {
                logger.warn("Open scanner from {} failed.", (Object)this.routing, (Object)e);
                ex = e;
            }
        }
        logger.error("Connect to StarRocks {} failed.", (Object)this.routing);
        throw new ConnectedFailedException(this.routing.toString(), ex);
    }

    public TScanBatchResult getNext(TScanNextBatchParams nextBatchParams) throws StarrocksException {
        logger.debug("GetNext to '{}', parameter is '{}'.", (Object)this.routing, (Object)nextBatchParams);
        if (!this.isConnected) {
            this.open();
        }
        TException ex = null;
        TScanBatchResult result = null;
        for (int attempt = 0; attempt < this.retries; ++attempt) {
            logger.debug("Attempt {} to getNext {}.", (Object)attempt, (Object)this.routing);
            try {
                result = this.client.get_next(nextBatchParams);
                if (result == null) {
                    logger.warn("GetNext result from {} is null.", (Object)this.routing);
                    continue;
                }
                if (!TStatusCode.OK.equals(result.getStatus().getStatus_code())) {
                    logger.warn("The status of get next result from {} is '{}', error message is: {}.", new Object[]{this.routing, result.getStatus().getStatus_code(), result.getStatus().getError_msgs()});
                    continue;
                }
                return result;
            }
            catch (TException e) {
                logger.warn("Get next from {} failed.", (Object)this.routing, (Object)e);
                ex = e;
            }
        }
        if (result != null && TStatusCode.OK != result.getStatus().getStatus_code()) {
            logger.error("StarRocks server '{}' internal failed, status is '{}', error message is '{}'", new Object[]{this.routing, result.getStatus().getStatus_code(), result.getStatus().getError_msgs()});
            throw new StarrocksInternalException(this.routing.toString(), result.getStatus().getStatus_code(), result.getStatus().getError_msgs());
        }
        logger.error("Connect to StarRocks {} failed.", (Object)this.routing);
        throw new ConnectedFailedException(this.routing.toString(), ex);
    }

    public void closeScanner(TScanCloseParams closeParams) {
        logger.debug("CloseScanner to '{}', parameter is '{}'.", (Object)this.routing, (Object)closeParams);
        if (!this.isConnected) {
            try {
                this.open();
            }
            catch (ConnectedFailedException e) {
                logger.warn("Cannot connect to StarRocks BE {} when close scanner.", (Object)this.routing);
                return;
            }
        }
        for (int attempt = 0; attempt < this.retries; ++attempt) {
            logger.debug("Attempt {} to closeScanner {}.", (Object)attempt, (Object)this.routing);
            try {
                TScanCloseResult result = this.client.close_scanner(closeParams);
                if (result == null) {
                    logger.warn("CloseScanner result from {} is null.", (Object)this.routing);
                    continue;
                }
                if (TStatusCode.OK.equals(result.getStatus().getStatus_code())) break;
                logger.warn("The status of get next result from {} is '{}', error message is: {}.", new Object[]{this.routing, result.getStatus().getStatus_code(), result.getStatus().getError_msgs()});
                continue;
            }
            catch (TException e) {
                logger.warn("Close scanner from {} failed.", (Object)this.routing, (Object)e);
            }
        }
        logger.info("CloseScanner to StarRocks BE '{}' success.", (Object)this.routing);
        this.close();
    }
}

