/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.starrocks.client.source;

import com.starrocks.shade.org.apache.thrift.TException;
import com.starrocks.shade.org.apache.thrift.protocol.TBinaryProtocol;
import com.starrocks.shade.org.apache.thrift.protocol.TProtocol;
import com.starrocks.shade.org.apache.thrift.transport.TSocket;
import com.starrocks.shade.org.apache.thrift.transport.TTransportException;
import com.starrocks.thrift.TScanBatchResult;
import com.starrocks.thrift.TScanCloseParams;
import com.starrocks.thrift.TScanNextBatchParams;
import com.starrocks.thrift.TScanOpenParams;
import com.starrocks.thrift.TScanOpenResult;
import com.starrocks.thrift.TStarrocksExternalService;
import com.starrocks.thrift.TStatusCode;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
import org.apache.seatunnel.connectors.seatunnel.common.source.arrow.reader.ArrowToSeatunnelRowReader;
import org.apache.seatunnel.connectors.seatunnel.starrocks.client.source.model.QueryPartition;
import org.apache.seatunnel.connectors.seatunnel.starrocks.config.SourceConfig;
import org.apache.seatunnel.connectors.seatunnel.starrocks.exception.StarRocksConnectorErrorCode;
import org.apache.seatunnel.connectors.seatunnel.starrocks.exception.StarRocksConnectorException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StarRocksBeReadClient
implements Serializable {
    private static final Logger log = LoggerFactory.getLogger(StarRocksBeReadClient.class);
    private static final String DEFAULT_CLUSTER_NAME = "default_cluster";
    private TStarrocksExternalService.Client client;
    private final String ip;
    private final int port;
    private String contextId;
    private int readerOffset = 0;
    private final SourceConfig sourceConfig;
    private SeaTunnelRowType seaTunnelRowType;
    private ArrowToSeatunnelRowReader rowBatch;
    protected AtomicBoolean eos = new AtomicBoolean(false);

    public StarRocksBeReadClient(String beNodeInfo, SourceConfig sourceConfig) {
        this.sourceConfig = sourceConfig;
        log.debug("Parse StarRocks BE address: '{}'.", (Object)beNodeInfo);
        String[] hostPort = beNodeInfo.split(":");
        if (hostPort.length != 2) {
            throw new StarRocksConnectorException((SeaTunnelErrorCode)StarRocksConnectorErrorCode.CREATE_BE_READER_FAILED, String.format("Format of StarRocks BE address[%s] is illegal", beNodeInfo));
        }
        this.ip = hostPort[0].trim();
        this.port = Integer.parseInt(hostPort[1].trim());
        TBinaryProtocol.Factory factory = new TBinaryProtocol.Factory();
        TSocket socket = new TSocket(this.ip, this.port, sourceConfig.getConnectTimeoutMs(), sourceConfig.getConnectTimeoutMs());
        try {
            socket.open();
        }
        catch (TTransportException e) {
            socket.close();
            throw new StarRocksConnectorException((SeaTunnelErrorCode)StarRocksConnectorErrorCode.CREATE_BE_READER_FAILED, "Failed to open socket", e);
        }
        TProtocol protocol = factory.getProtocol(socket);
        this.client = new TStarrocksExternalService.Client(protocol);
    }

    public void openScanner(QueryPartition partition, SeaTunnelRowType seaTunnelRowType) {
        Set<Long> tabletIds = partition.getTabletIds();
        TScanOpenParams params = new TScanOpenParams();
        params.setTablet_ids(new ArrayList<Long>(tabletIds));
        params.setOpaqued_query_plan(partition.getQueryPlan());
        params.setCluster(DEFAULT_CLUSTER_NAME);
        params.setDatabase(this.sourceConfig.getDatabase());
        params.setTable(partition.getTable());
        params.setUser(this.sourceConfig.getUsername());
        params.setPasswd(this.sourceConfig.getPassword());
        params.setBatch_size(this.sourceConfig.getBatchRows());
        if (this.sourceConfig.getSourceOptionProps() != null) {
            params.setProperties(this.sourceConfig.getSourceOptionProps());
        }
        short keepAliveMin = (short)Math.min(Short.MAX_VALUE, this.sourceConfig.getKeepAliveMin());
        params.setKeep_alive_min(keepAliveMin);
        params.setQuery_timeout(this.sourceConfig.getQueryTimeoutSec());
        params.setMem_limit(this.sourceConfig.getMemLimit());
        log.info("open Scan params.mem_limit {} B", (Object)params.getMem_limit());
        log.info("open Scan params.keep-alive-min {} min", (Object)params.getKeep_alive_min());
        log.info("open Scan params.batch_size {}", (Object)params.getBatch_size());
        TScanOpenResult result = null;
        try {
            result = this.client.open_scanner(params);
            if (!TStatusCode.OK.equals(result.getStatus().getStatus_code())) {
                throw new StarRocksConnectorException((SeaTunnelErrorCode)StarRocksConnectorErrorCode.SCAN_BE_DATA_FAILED, "Failed to open scanner." + result.getStatus().getStatus_code() + result.getStatus().getError_msgs());
            }
        }
        catch (TException e) {
            throw new StarRocksConnectorException((SeaTunnelErrorCode)StarRocksConnectorErrorCode.SCAN_BE_DATA_FAILED, e.getMessage());
        }
        this.contextId = result.getContext_id();
        log.info("Open scanner for {}:{} with context id {}, and there are {} tablets {}", new Object[]{this.ip, this.port, this.contextId, tabletIds.size(), tabletIds});
        this.eos.set(false);
        this.rowBatch = null;
        this.readerOffset = 0;
        this.seaTunnelRowType = seaTunnelRowType;
    }

    public boolean hasNext() {
        boolean hasNext = false;
        if (!(this.eos.get() || this.rowBatch != null && this.rowBatch.hasNext())) {
            if (this.rowBatch != null) {
                this.readerOffset += this.rowBatch.getReadRowCount();
                this.rowBatch.close();
            }
            TScanNextBatchParams nextBatchParams = new TScanNextBatchParams();
            nextBatchParams.setContext_id(this.contextId);
            nextBatchParams.setOffset(this.readerOffset);
            try {
                TScanBatchResult result = this.client.get_next(nextBatchParams);
                if (!TStatusCode.OK.equals(result.getStatus().getStatus_code())) {
                    throw new StarRocksConnectorException((SeaTunnelErrorCode)StarRocksConnectorErrorCode.SCAN_BE_DATA_FAILED, "Failed to get next from be -> ip:[" + this.ip + "] " + result.getStatus().getStatus_code() + " msg:" + result.getStatus().getError_msgs());
                }
                this.eos.set(result.isEos());
                if (!this.eos.get()) {
                    this.rowBatch = new ArrowToSeatunnelRowReader(result.getRows(), this.seaTunnelRowType).readArrow();
                }
            }
            catch (TException e) {
                throw new StarRocksConnectorException((SeaTunnelErrorCode)StarRocksConnectorErrorCode.SCAN_BE_DATA_FAILED, e.getMessage());
            }
        }
        hasNext = !this.eos.get();
        return hasNext;
    }

    public SeaTunnelRow getNext() {
        return this.rowBatch.next();
    }

    public void close() {
        log.info("Close reader for {}:{} with context id {}", new Object[]{this.ip, this.port, this.contextId});
        TScanCloseParams tScanCloseParams = new TScanCloseParams();
        tScanCloseParams.setContext_id(this.contextId);
        try {
            this.client.close_scanner(tScanCloseParams);
        }
        catch (TException e) {
            log.error("Failed to close reader {}:{} with context id {}", new Object[]{this.ip, this.port, this.contextId, e});
            throw new StarRocksConnectorException((SeaTunnelErrorCode)StarRocksConnectorErrorCode.CLOSE_BE_READER_FAILED, e);
        }
    }
}

