/*
 * Decompiled with CFR 0.152.
 */
package com.aliyun.odps.table.read.impl.batch;

import com.aliyun.odps.Column;
import com.aliyun.odps.OdpsException;
import com.aliyun.odps.PartitionSpec;
import com.aliyun.odps.commons.transport.Connection;
import com.aliyun.odps.commons.transport.Response;
import com.aliyun.odps.commons.util.IOUtils;
import com.aliyun.odps.data.ArrayRecord;
import com.aliyun.odps.rest.ResourceBuilder;
import com.aliyun.odps.rest.RestClient;
import com.aliyun.odps.table.DataFormat;
import com.aliyun.odps.table.DataSchema;
import com.aliyun.odps.table.SessionStatus;
import com.aliyun.odps.table.TableIdentifier;
import com.aliyun.odps.table.configuration.ArrowOptions;
import com.aliyun.odps.table.configuration.ReaderOptions;
import com.aliyun.odps.table.configuration.SplitOptions;
import com.aliyun.odps.table.enviroment.EnvironmentSettings;
import com.aliyun.odps.table.enviroment.ExecutionEnvironment;
import com.aliyun.odps.table.optimizer.predicate.Predicate;
import com.aliyun.odps.table.read.SplitReader;
import com.aliyun.odps.table.read.impl.batch.SplitArrowReaderImpl;
import com.aliyun.odps.table.read.impl.batch.SplitRecordReaderImpl;
import com.aliyun.odps.table.read.impl.batch.TableBatchReadSessionBase;
import com.aliyun.odps.table.read.split.InputSplit;
import com.aliyun.odps.table.read.split.impl.IndexedInputSplitAssigner;
import com.aliyun.odps.table.read.split.impl.RowRangeInputSplitAssigner;
import com.aliyun.odps.table.utils.HttpUtils;
import com.aliyun.odps.table.utils.Preconditions;
import com.aliyun.odps.table.utils.SchemaUtils;
import com.aliyun.odps.table.utils.SessionUtils;
import com.aliyun.odps.tunnel.TunnelException;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import com.google.gson.JsonPrimitive;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TableBatchReadSessionImpl
extends TableBatchReadSessionBase {
    private static final Logger logger = LoggerFactory.getLogger((String)TableBatchReadSessionImpl.class.getName());
    private transient RestClient restClient;

    public TableBatchReadSessionImpl(TableIdentifier identifier, String sessionId, EnvironmentSettings settings) throws IOException {
        super(identifier, sessionId, settings);
    }

    public TableBatchReadSessionImpl(TableIdentifier identifier, List<PartitionSpec> requiredPartitions, List<String> requiredDataColumns, List<String> requiredPartitionColumns, List<Integer> bucketIds, SplitOptions splitOptions, ArrowOptions arrowOptions, EnvironmentSettings settings, Predicate filterPredicate) throws IOException {
        super(identifier, requiredPartitions, requiredDataColumns, requiredPartitionColumns, bucketIds, splitOptions, arrowOptions, settings, filterPredicate);
    }

    @Override
    public SplitReader<VectorSchemaRoot> createArrowReader(InputSplit split, ReaderOptions options) throws IOException {
        Preconditions.checkNotNull((Object)this.identifier, (String)"Table read identifier");
        Preconditions.checkNotNull((Object)split, (String)"Input split");
        Preconditions.checkNotNull((Object)options, (String)"Reader options");
        return new SplitArrowReaderImpl(this.identifier, split, options);
    }

    @Override
    public SplitReader<ArrayRecord> createRecordReader(InputSplit split, ReaderOptions options) throws IOException {
        SplitReader<VectorSchemaRoot> arrowReader = this.createArrowReader(split, options);
        return new SplitRecordReaderImpl(arrowReader, this.readSchema, options);
    }

    @Override
    public boolean supportsDataFormat(DataFormat dataFormat) {
        if (this.supportDataFormats != null) {
            return this.supportDataFormats.contains(dataFormat);
        }
        return false;
    }

    @Override
    protected void planInputSplits() throws IOException {
        this.ensureClientInitialized();
        Map<String, String> headers = HttpUtils.createCommonHeader();
        headers.put("Content-Type", "application/json");
        HashMap<String, String> params = new HashMap<String, String>();
        params.put("session_type", this.getType().toString());
        if (this.settings != null && this.settings.getQuotaName().isPresent()) {
            params.put("quotaName", this.settings.getQuotaName().get());
        }
        try {
            String request = this.generateReadSessionRequest();
            logger.debug(String.format("Read table '%s'.\nSession request:\n%s", this.identifier.toString(), request));
            Response resp = this.restClient.stringRequest(ResourceBuilder.buildTableSessionResource((String)"v1", (String)this.identifier.getProject(), (String)this.identifier.getSchema(), (String)this.identifier.getTable(), null), "POST", params, headers, request);
            if (!resp.isOK()) {
                throw new TunnelException(resp.getHeader("x-odps-request-id"), (InputStream)new ByteArrayInputStream(resp.getBody()), Integer.valueOf(resp.getStatus()));
            }
            String response = new String(resp.getBody());
            this.loadResultFromJson(response);
            if (this.sessionStatus != SessionStatus.NORMAL) {
                long asyncIntervalInMills = HttpUtils.getAsyncIntervalInMills(this.settings);
                long asyncTimeoutInMills = (long)HttpUtils.getAsyncTimeoutInSeconds(this.settings) * 1000L;
                long startTime = System.currentTimeMillis();
                while (this.sessionStatus == SessionStatus.INIT) {
                    Thread.sleep(asyncIntervalInMills);
                    logger.trace(String.format("Async read table: '%s', session id: %s", this.identifier.toString(), this.sessionId));
                    response = this.reloadInputSplits();
                    if (System.currentTimeMillis() - startTime < asyncTimeoutInMills) continue;
                    throw new IOException(String.format("Create table read session timeout.\nTable identifier: %s.\nSession status: %s.\nSession id: %s.\nError message: %s.", new Object[]{this.identifier.toString(), this.sessionStatus, this.sessionId, this.errorMessage}));
                }
            }
            if (this.sessionStatus != SessionStatus.NORMAL) {
                throw new IOException(String.format("Create table read session failed.\nTable identifier: %s.\nSession status: %s.\nSession id: %s.\nError message: %s.", new Object[]{this.identifier.toString(), this.sessionStatus, this.sessionId, this.errorMessage}));
            }
            logger.debug(String.format("Read table '%s'.\nSession response:\n%s", this.identifier.toString(), response));
        }
        catch (Exception e) {
            throw new IOException(e.getMessage(), e);
        }
    }

    @Override
    protected String reloadInputSplits() throws IOException {
        this.ensureClientInitialized();
        Preconditions.checkString((String)this.sessionId, (String)"Table read session id");
        Map<String, String> headers = HttpUtils.createCommonHeader();
        HashMap<String, String> params = new HashMap<String, String>();
        params.put("session_type", this.getType().toString());
        if (this.settings != null && this.settings.getQuotaName().isPresent()) {
            params.put("quotaName", this.settings.getQuotaName().get());
        }
        Connection conn = null;
        try {
            conn = this.restClient.connect(ResourceBuilder.buildTableSessionResource((String)"v1", (String)this.identifier.getProject(), (String)this.identifier.getSchema(), (String)this.identifier.getTable(), (String)this.sessionId), "GET", params, headers);
            Response resp = conn.getResponse();
            if (resp.isOK()) {
                String response = IOUtils.readStreamAsString((InputStream)conn.getInputStream());
                this.loadResultFromJson(response);
                String string = response;
                return string;
            }
            try {
                throw new TunnelException(resp.getHeader("x-odps-request-id"), conn.getInputStream(), Integer.valueOf(resp.getStatus()));
            }
            catch (IOException e) {
                throw new IOException("Failed to reload table read session with endpoint: " + this.restClient.getEndpoint(), e);
            }
            catch (OdpsException e) {
                throw new IOException(e);
            }
        }
        finally {
            if (conn != null) {
                conn.disconnect();
            }
        }
    }

    private void ensureClientInitialized() {
        if (this.restClient == null) {
            this.restClient = ExecutionEnvironment.create(this.settings).createHttpClient(this.identifier.getProject());
        }
    }

    private String generateReadSessionRequest() {
        JsonObject request = new JsonObject();
        JsonArray dataColumns = new JsonArray();
        this.requiredDataColumns.stream().map(JsonPrimitive::new).forEach(arg_0 -> ((JsonArray)dataColumns).add(arg_0));
        request.add("RequiredDataColumns", (JsonElement)dataColumns);
        JsonArray partitionColumns = new JsonArray();
        this.requiredPartitionColumns.stream().map(JsonPrimitive::new).forEach(arg_0 -> ((JsonArray)partitionColumns).add(arg_0));
        request.add("RequiredPartitionColumns", (JsonElement)partitionColumns);
        JsonArray partitionFilters = new JsonArray();
        this.requiredPartitions.stream().map(partitionSpec -> partitionSpec.toString(false, true)).map(JsonPrimitive::new).forEach(arg_0 -> ((JsonArray)partitionFilters).add(arg_0));
        request.add("RequiredPartitions", (JsonElement)partitionFilters);
        JsonArray bucketIds = new JsonArray();
        this.requiredBucketIds.stream().map(JsonPrimitive::new).forEach(arg_0 -> ((JsonArray)bucketIds).add(arg_0));
        request.add("RequiredBucketIds", (JsonElement)bucketIds);
        JsonObject jsonSplitOptions = new JsonObject();
        jsonSplitOptions.addProperty("SplitMode", this.splitOptions.getSplitMode().toString());
        jsonSplitOptions.addProperty("SplitNumber", (Number)this.splitOptions.getSplitNumber());
        jsonSplitOptions.addProperty("CrossPartition", Boolean.valueOf(this.splitOptions.isCrossPartition()));
        request.add("SplitOptions", (JsonElement)jsonSplitOptions);
        request.addProperty("SplitMaxFileNum", (Number)this.splitOptions.getSplitMaxFileNum());
        JsonObject jsonArrowOptions = new JsonObject();
        jsonArrowOptions.addProperty("TimestampUnit", this.arrowOptions.getTimestampUnit().toString());
        jsonArrowOptions.addProperty("DatetimeUnit", this.arrowOptions.getDateTimeUnit().toString());
        request.add("ArrowOptions", (JsonElement)jsonArrowOptions);
        request.add("FilterPredicate", (JsonElement)new JsonPrimitive(this.filterPredicate.toString()));
        Gson gson = new GsonBuilder().disableHtmlEscaping().create();
        return gson.toJson((JsonElement)request);
    }

    private void loadResultFromJson(String json) throws TunnelException {
        try {
            int splitsCount;
            long recordCount;
            JsonObject tree = new JsonParser().parse(json).getAsJsonObject();
            if (tree.has("SessionId")) {
                this.sessionId = tree.get("SessionId").getAsString();
            }
            if (tree.has("ExpirationTime")) {
                this.expirationTime = tree.get("ExpirationTime").getAsLong();
            }
            if (tree.has("SessionType")) {
                String sessionType = tree.get("SessionType").getAsString();
                if (!this.getType().toString().equals(sessionType.toLowerCase())) {
                    throw new UnsupportedOperationException("Unsupported session type: " + sessionType);
                }
            }
            if (tree.has("SessionStatus")) {
                String status = tree.get("SessionStatus").getAsString().toUpperCase();
                this.sessionStatus = SessionStatus.valueOf(status);
            }
            if (tree.has("Message")) {
                this.errorMessage = tree.get("Message").getAsString();
            }
            if (tree.has("DataSchema")) {
                JsonObject column;
                int i;
                JsonObject dataSchema = tree.get("DataSchema").getAsJsonObject();
                ArrayList<Column> schemaColumns = new ArrayList<Column>();
                ArrayList<String> partitionKeys = new ArrayList<String>();
                if (dataSchema.has("DataColumns")) {
                    JsonArray dataColumns = dataSchema.get("DataColumns").getAsJsonArray();
                    for (i = 0; i < dataColumns.size(); ++i) {
                        column = dataColumns.get(i).getAsJsonObject();
                        schemaColumns.add(SchemaUtils.parseColumn(column));
                    }
                }
                if (dataSchema.has("PartitionColumns")) {
                    JsonArray partitionColumns = dataSchema.get("PartitionColumns").getAsJsonArray();
                    for (i = 0; i < partitionColumns.size(); ++i) {
                        column = partitionColumns.get(i).getAsJsonObject();
                        Column partitionCol = SchemaUtils.parseColumn(column);
                        schemaColumns.add(partitionCol);
                        partitionKeys.add(partitionCol.getName());
                    }
                }
                this.readSchema = DataSchema.newBuilder().columns(schemaColumns).partitionBy(partitionKeys).build();
            }
            if (tree.has("SupportedDataFormat")) {
                this.supportDataFormats = new HashSet();
                JsonArray formats = tree.get("SupportedDataFormat").getAsJsonArray();
                formats.forEach(format -> this.supportDataFormats.add(SessionUtils.parseDataFormat(format.getAsJsonObject())));
            }
            if (tree.has("RecordCount") && (recordCount = tree.get("RecordCount").getAsLong()) >= 0L && this.splitOptions.getSplitMode().equals((Object)SplitOptions.SplitMode.ROW_OFFSET)) {
                this.inputSplitAssigner = new RowRangeInputSplitAssigner(this.sessionId, recordCount);
            }
            if (tree.has("SplitsCount") && (splitsCount = tree.get("SplitsCount").getAsInt()) >= 0 && !this.splitOptions.getSplitMode().equals((Object)SplitOptions.SplitMode.BUCKET)) {
                this.inputSplitAssigner = new IndexedInputSplitAssigner(this.sessionId, splitsCount);
            }
        }
        catch (Exception e) {
            throw new TunnelException("Invalid session response: \n" + json, (Throwable)e);
        }
    }
}

