/*
 * Decompiled with CFR 0.152.
 */
package com.aliyun.odps.table.read.impl.batch;

import com.aliyun.odps.commons.transport.Connection;
import com.aliyun.odps.commons.transport.Response;
import com.aliyun.odps.rest.ResourceBuilder;
import com.aliyun.odps.rest.RestClient;
import com.aliyun.odps.table.TableIdentifier;
import com.aliyun.odps.table.arrow.ArrowReader;
import com.aliyun.odps.table.arrow.ArrowReaderFactory;
import com.aliyun.odps.table.configuration.CompressionCodec;
import com.aliyun.odps.table.configuration.ReaderOptions;
import com.aliyun.odps.table.enviroment.ExecutionEnvironment;
import com.aliyun.odps.table.metrics.Metrics;
import com.aliyun.odps.table.metrics.count.BytesCount;
import com.aliyun.odps.table.metrics.count.RecordCount;
import com.aliyun.odps.table.read.SplitReader;
import com.aliyun.odps.table.read.split.InputSplit;
import com.aliyun.odps.table.read.split.InputSplitWithIndex;
import com.aliyun.odps.table.read.split.InputSplitWithRowRange;
import com.aliyun.odps.table.utils.HttpUtils;
import com.aliyun.odps.tunnel.TunnelException;
import java.io.IOException;
import java.util.Map;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SplitArrowReaderImpl
implements SplitReader<VectorSchemaRoot> {
    private static final Logger logger = LoggerFactory.getLogger((String)SplitArrowReaderImpl.class.getName());
    private final ArrowReader reader;
    private Connection connection;
    private boolean isClosed;
    private Metrics metrics;
    private BytesCount bytesCount;
    private RecordCount recordCount;
    private String requestId;

    public SplitArrowReaderImpl(TableIdentifier identifier, InputSplit split, ReaderOptions options) throws IOException {
        this.openReaderConnection(identifier, split, options);
        this.initMetrics();
        this.isClosed = false;
        this.reader = ArrowReaderFactory.getRecordBatchReader(this.connection.getInputStream(), options);
    }

    @Override
    public boolean hasNext() throws IOException {
        try {
            return this.reader.nextBatch();
        }
        catch (IOException e) {
            logger.error("Get next record batch failed, requestId=" + this.requestId, (Throwable)e);
            throw e;
        }
    }

    @Override
    public VectorSchemaRoot get() {
        VectorSchemaRoot root = this.reader.getCurrentValue();
        this.recordCount.inc(root.getRowCount());
        this.bytesCount.setValue(this.reader.bytesRead());
        return root;
    }

    @Override
    public void close() throws IOException {
        if (!this.isClosed) {
            if (this.reader != null) {
                this.reader.close();
            }
            if (this.connection != null) {
                this.connection.disconnect();
            }
            this.isClosed = true;
        }
    }

    @Override
    public Metrics currentMetricsValues() {
        return this.metrics;
    }

    private void initMetrics() {
        this.bytesCount = new BytesCount();
        this.recordCount = new RecordCount();
        this.metrics = new Metrics();
        this.metrics.register(this.bytesCount);
        this.metrics.register(this.recordCount);
    }

    private void openReaderConnection(TableIdentifier identifier, InputSplit split, ReaderOptions options) throws IOException {
        RestClient restClient = ExecutionEnvironment.create(options.getSettings()).createHttpClient(identifier.getProject());
        Map<String, String> headers = HttpUtils.createCommonHeader(options.getSettings());
        if (options.getCompressionCodec().equals(CompressionCodec.ZSTD)) {
            headers.put("Accept-Encoding", CompressionCodec.ZSTD.toString());
        } else if (options.getCompressionCodec().equals(CompressionCodec.LZ4_FRAME)) {
            headers.put("Accept-Encoding", CompressionCodec.LZ4_FRAME.toString());
        }
        Map<String, String> params = HttpUtils.createCommonParams(options.getSettings());
        params.put("session_id", split.getSessionId());
        if (split instanceof InputSplitWithRowRange) {
            InputSplitWithRowRange rowRangeInputSplit = (InputSplitWithRowRange)split;
            params.put("row_index", String.valueOf(rowRangeInputSplit.getRowRange().getStartIndex()));
            params.put("row_count", String.valueOf(rowRangeInputSplit.getRowRange().getNumRecord()));
        } else if (split instanceof InputSplitWithIndex) {
            InputSplitWithIndex indexedInputSplit = (InputSplitWithIndex)split;
            params.put("split_index", String.valueOf(indexedInputSplit.getSplitIndex()));
        } else {
            throw new UnsupportedOperationException("Unsupported split type: " + split);
        }
        params.put("max_batch_rows", String.valueOf(options.getBatchRowCount()));
        if (options.getBatchRawSize() != 0L) {
            params.put("max_batch_raw_size", String.valueOf(options.getBatchRawSize()));
        }
        params.put("data_format_type", options.getDataFormat().getType().toString());
        params.put("data_format_version", options.getDataFormat().getVersion().toString());
        try {
            String resource = ResourceBuilder.buildTableDataResource((String)"v1", (String)identifier.getProject(), (String)identifier.getSchema(), (String)identifier.getTable());
            Connection conn = restClient.connect(resource, "GET", params, headers);
            Response resp = conn.getResponse();
            if (!resp.isOK()) {
                TunnelException err = new TunnelException(conn.getInputStream());
                err.setRequestId(resp.getHeader("x-odps-request-id"));
                throw err;
            }
            this.connection = conn;
            this.requestId = resp.getHeader("x-odps-request-id");
        }
        catch (Exception e) {
            if (this.connection != null) {
                this.connection.disconnect();
            }
            logger.error("Open split reader failed", (Throwable)e);
            throw new IOException(e.getMessage(), e);
        }
    }
}

