/*
 * Decompiled with CFR 0.152.
 */
package com.aliyun.odps.table.write.impl.batch;

import com.aliyun.odps.Column;
import com.aliyun.odps.OdpsException;
import com.aliyun.odps.PartitionSpec;
import com.aliyun.odps.commons.transport.Connection;
import com.aliyun.odps.commons.transport.Response;
import com.aliyun.odps.commons.util.IOUtils;
import com.aliyun.odps.rest.ResourceBuilder;
import com.aliyun.odps.rest.RestClient;
import com.aliyun.odps.table.DataFormat;
import com.aliyun.odps.table.DataSchema;
import com.aliyun.odps.table.SessionStatus;
import com.aliyun.odps.table.TableIdentifier;
import com.aliyun.odps.table.configuration.ArrowOptions;
import com.aliyun.odps.table.configuration.DynamicPartitionOptions;
import com.aliyun.odps.table.configuration.WriterOptions;
import com.aliyun.odps.table.distribution.Distribution;
import com.aliyun.odps.table.distribution.Distributions;
import com.aliyun.odps.table.enviroment.EnvironmentSettings;
import com.aliyun.odps.table.enviroment.ExecutionEnvironment;
import com.aliyun.odps.table.order.NullOrdering;
import com.aliyun.odps.table.order.SortDirection;
import com.aliyun.odps.table.order.SortOrder;
import com.aliyun.odps.table.utils.HttpUtils;
import com.aliyun.odps.table.utils.Preconditions;
import com.aliyun.odps.table.utils.SchemaUtils;
import com.aliyun.odps.table.utils.SessionUtils;
import com.aliyun.odps.table.write.BatchWriter;
import com.aliyun.odps.table.write.TableWriteCapabilities;
import com.aliyun.odps.table.write.WriterAttemptId;
import com.aliyun.odps.table.write.WriterCommitMessage;
import com.aliyun.odps.table.write.impl.batch.ArrowWriterImpl;
import com.aliyun.odps.table.write.impl.batch.TableBatchWriteSessionBase;
import com.aliyun.odps.table.write.impl.batch.WriterCommitMessageImpl;
import com.aliyun.odps.tunnel.TunnelException;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import com.google.gson.JsonPrimitive;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Map;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TableBatchWriteSessionImpl
extends TableBatchWriteSessionBase {
    private static final Logger logger = LoggerFactory.getLogger((String)TableBatchWriteSessionImpl.class.getName());
    private transient ExecutionEnvironment executionEnvironment;
    private transient RestClient restClient;

    public TableBatchWriteSessionImpl(TableIdentifier identifier, PartitionSpec partitionSpec, boolean overwrite, DynamicPartitionOptions dynamicPartitionOptions, ArrowOptions arrowOptions, TableWriteCapabilities capabilities, EnvironmentSettings settings, long maxFieldSize) throws IOException {
        super(identifier, partitionSpec, overwrite, dynamicPartitionOptions, arrowOptions, capabilities, settings, maxFieldSize);
    }

    public TableBatchWriteSessionImpl(TableIdentifier identifier, String sessionId, EnvironmentSettings settings) throws IOException {
        super(identifier, sessionId, settings);
    }

    @Override
    protected void initSession() throws IOException {
        this.ensureInitialized();
        Map<String, String> headers = HttpUtils.createCommonHeader(this.settings);
        headers.put("Content-Type", "application/json");
        Map<String, String> params = HttpUtils.createCommonParams(this.settings);
        params.put("session_type", this.getType().toString());
        try {
            String response;
            Response resp;
            String req = this.generateWriteSessionRequest();
            if (logger.isDebugEnabled()) {
                logger.debug(String.format("Write table '%s'.\nSession request:\n%s", this.identifier.toString(), req));
            }
            if ((resp = this.restClient.stringRequest(ResourceBuilder.buildTableSessionResource((String)"v1", (String)this.identifier.getProject(), (String)this.identifier.getSchema(), (String)this.identifier.getTable(), null), "POST", params, headers, req)).isOK()) {
                response = new String(resp.getBody());
                if (logger.isDebugEnabled()) {
                    logger.debug(String.format("Write table '%s'.\nSession response:\n%s", this.identifier.toString(), response));
                }
            } else {
                throw new TunnelException(resp.getHeader("x-odps-request-id"), (InputStream)new ByteArrayInputStream(resp.getBody()), Integer.valueOf(resp.getStatus()));
            }
            this.loadResultFromJson(response);
        }
        catch (Exception e) {
            throw new IOException(e.getMessage(), e);
        }
    }

    @Override
    protected String reloadSession() throws IOException {
        this.ensureInitialized();
        Preconditions.checkString((String)this.sessionId, (String)"Table write session id");
        Map<String, String> headers = HttpUtils.createCommonHeader(this.settings);
        Map<String, String> params = HttpUtils.createCommonParams(this.settings);
        params.put("session_type", this.getType().toString());
        Connection conn = null;
        try {
            conn = this.restClient.connect(ResourceBuilder.buildTableSessionResource((String)"v1", (String)this.identifier.getProject(), (String)this.identifier.getSchema(), (String)this.identifier.getTable(), (String)this.sessionId), "GET", params, headers);
            Response resp = conn.getResponse();
            if (resp.isOK()) {
                String response = IOUtils.readStreamAsString((InputStream)conn.getInputStream());
                this.loadResultFromJson(response);
                String string = response;
                return string;
            }
            try {
                throw new TunnelException(resp.getHeader("x-odps-request-id"), conn.getInputStream(), Integer.valueOf(resp.getStatus()));
            }
            catch (IOException e) {
                throw new IOException("Failed to reload table write session with endpoint: " + this.restClient.getEndpoint(), e);
            }
            catch (OdpsException e) {
                throw new IOException(e);
            }
        }
        finally {
            if (conn != null) {
                try {
                    conn.disconnect();
                }
                catch (IOException iOException) {}
            }
        }
    }

    @Override
    public BatchWriter<VectorSchemaRoot> createArrowWriter(long blockNumber, WriterAttemptId attemptId, WriterOptions options) throws IOException {
        Preconditions.checkString((String)this.sessionId, (String)"Table write session id");
        Preconditions.checkNotNull((Object)this.identifier, (String)"Table write session id");
        Preconditions.checkLong((Long)blockNumber, (long)0L, (String)"Block number");
        Preconditions.checkNotNull((Object)attemptId, (String)"Attempt id");
        Preconditions.checkNotNull((Object)options, (String)"Writer options");
        if (options.maxBlockNumber().isPresent()) {
            if (blockNumber >= options.maxBlockNumber().get()) {
                throw new IOException("User defined max block writer number: " + options.maxBlockNumber().get());
            }
        } else if (this.maxBlockNumber().isPresent() && blockNumber >= this.maxBlockNumber().get()) {
            throw new IOException("Server max block writer number: " + this.maxBlockNumber().get());
        }
        return new ArrowWriterImpl(this.sessionId, this.identifier, this.requiredSchema, blockNumber, attemptId, options, this.arrowOptions);
    }

    @Override
    public boolean supportsDataFormat(DataFormat dataFormat) {
        if (this.supportDataFormats != null) {
            return this.supportDataFormats.contains(dataFormat);
        }
        return false;
    }

    @Override
    public void commit(WriterCommitMessage[] messages) throws IOException {
        this.ensureInitialized();
        if (messages == null) {
            throw new IllegalArgumentException("Invalid argument: messages.");
        }
        Preconditions.checkString((String)this.sessionId, (String)"Table write session id");
        Map<String, String> headers = HttpUtils.createCommonHeader(this.settings);
        headers.put("Content-Type", "application/json");
        Map<String, String> params = HttpUtils.createCommonParams(this.settings);
        params.put("session_id", this.sessionId);
        try {
            Response resp;
            String commitRequest = this.generateCommitRequest(messages);
            if (logger.isDebugEnabled()) {
                logger.debug(String.format("Commit table '%s'.\nSession request:\n%s", this.identifier.toString(), commitRequest));
            }
            if (!(resp = this.restClient.stringRequest(ResourceBuilder.buildTableCommitResource((String)"v1", (String)this.identifier.getProject(), (String)this.identifier.getSchema(), (String)this.identifier.getTable()), "POST", params, headers, commitRequest)).isOK()) {
                throw new TunnelException(resp.getHeader("x-odps-request-id"), (InputStream)new ByteArrayInputStream(resp.getBody()), Integer.valueOf(resp.getStatus()));
            }
            String response = new String(resp.getBody());
            this.loadResultFromJson(response);
            if (this.sessionStatus != SessionStatus.COMMITTED) {
                long asyncIntervalInMills = HttpUtils.getAsyncIntervalInMills(this.settings);
                long asyncTimeoutInMills = (long)HttpUtils.getAsyncTimeoutInSeconds(this.settings) * 1000L;
                long startTime = System.currentTimeMillis();
                while (this.sessionStatus == SessionStatus.NORMAL || this.sessionStatus == SessionStatus.COMMITTING) {
                    Thread.sleep(asyncIntervalInMills);
                    logger.trace(String.format("Async commit table: '%s', session id: %s", this.identifier.toString(), this.sessionId));
                    response = this.reloadSession();
                    if (System.currentTimeMillis() - startTime < asyncTimeoutInMills) continue;
                    throw new IOException(String.format("Commit table write session timeout.\nTable identifier: %s.\nSession status: %s.\nSession id: %s.\nError message: %s.", new Object[]{this.identifier.toString(), this.sessionStatus, this.sessionId, this.errorMessage}));
                }
            }
            if (this.sessionStatus != SessionStatus.COMMITTED) {
                throw new IOException(String.format("Commit table write session failed.\nTable identifier: %s.\nSession status: %s.\nSession id: %s.\nError message: %s.", new Object[]{this.identifier.toString(), this.sessionStatus, this.sessionId, this.errorMessage}));
            }
            if (logger.isDebugEnabled()) {
                logger.debug(String.format("Commit table '%s' success.\nSession response:\n%s", this.identifier.toString(), response));
            }
        }
        catch (Exception e) {
            throw new IOException(e.getMessage(), e);
        }
    }

    @Override
    public void cleanup() {
    }

    private void ensureInitialized() {
        if (this.executionEnvironment == null) {
            this.executionEnvironment = ExecutionEnvironment.create(this.settings);
        }
        if (this.restClient == null) {
            this.restClient = this.executionEnvironment.createHttpClient(this.identifier.getProject());
        }
    }

    private String generateWriteSessionRequest() {
        JsonObject request = new JsonObject();
        request.add("PartitionSpec", (JsonElement)new JsonPrimitive(this.targetPartitionSpec.toString(false, true)));
        request.add("Overwrite", (JsonElement)new JsonPrimitive(Boolean.valueOf(this.overwrite)));
        JsonObject jsonArrowOptions = new JsonObject();
        jsonArrowOptions.addProperty("TimestampUnit", this.arrowOptions.getTimestampUnit().toString());
        jsonArrowOptions.addProperty("DatetimeUnit", this.arrowOptions.getDateTimeUnit().toString());
        request.add("ArrowOptions", (JsonElement)jsonArrowOptions);
        JsonObject dynamicOptions = new JsonObject();
        dynamicOptions.addProperty("InvalidStrategy", this.dynamicPartitionOptions.getInvalidStrategy().toString());
        dynamicOptions.addProperty("InvalidLimit", (Number)this.dynamicPartitionOptions.getInvalidLimit());
        dynamicOptions.addProperty("DynamicPartitionLimit", (Number)this.dynamicPartitionOptions.getDynamicPartitionLimit());
        request.add("DynamicPartitionOptions", (JsonElement)dynamicOptions);
        if (this.writeCapabilities.supportHashBuckets() || this.writeCapabilities.supportRangeBuckets()) {
            request.add("SupportWriteCluster", (JsonElement)new JsonPrimitive(Boolean.valueOf(true)));
        } else {
            request.add("SupportWriteCluster", (JsonElement)new JsonPrimitive(Boolean.valueOf(false)));
        }
        request.addProperty("MaxFieldSize", (Number)this.maxFieldSize);
        Gson gson = new GsonBuilder().disableHtmlEscaping().create();
        return gson.toJson((JsonElement)request);
    }

    private String generateCommitRequest(WriterCommitMessage[] messages) {
        JsonObject request = new JsonObject();
        JsonArray messageArray = new JsonArray();
        for (WriterCommitMessage commitMessage : messages) {
            if (commitMessage == null) continue;
            WriterCommitMessageImpl msg = (WriterCommitMessageImpl)commitMessage;
            messageArray.add((JsonElement)new JsonPrimitive(msg.getCommitMessage()));
        }
        request.add("CommitMessages", (JsonElement)messageArray);
        return request.toString();
    }

    private void loadResultFromJson(String json) throws TunnelException {
        try {
            JsonObject tree = new JsonParser().parse(json).getAsJsonObject();
            if (tree.has("SessionId")) {
                this.sessionId = tree.get("SessionId").getAsString();
            }
            if (tree.has("ExpirationTime")) {
                this.expirationTime = tree.get("ExpirationTime").getAsLong();
            }
            if (tree.has("SessionType")) {
                String sessionType = tree.get("SessionType").getAsString();
                if (!this.getType().toString().equals(sessionType.toLowerCase())) {
                    throw new UnsupportedOperationException("Unsupported session type: " + sessionType);
                }
            }
            if (tree.has("SessionStatus")) {
                String status = tree.get("SessionStatus").getAsString().toUpperCase();
                this.sessionStatus = SessionStatus.valueOf(status);
            }
            if (tree.has("Message")) {
                this.errorMessage = tree.get("Message").getAsString();
            }
            if (tree.has("DataSchema")) {
                JsonObject column;
                int i;
                JsonObject dataSchema = tree.get("DataSchema").getAsJsonObject();
                ArrayList<Column> schemaColumns = new ArrayList<Column>();
                ArrayList<String> partitionKeys = new ArrayList<String>();
                if (dataSchema.has("DataColumns")) {
                    JsonArray dataColumns = dataSchema.get("DataColumns").getAsJsonArray();
                    for (i = 0; i < dataColumns.size(); ++i) {
                        column = dataColumns.get(i).getAsJsonObject();
                        schemaColumns.add(SchemaUtils.parseColumn(column));
                    }
                }
                if (dataSchema.has("PartitionColumns")) {
                    JsonArray partitionColumns = dataSchema.get("PartitionColumns").getAsJsonArray();
                    for (i = 0; i < partitionColumns.size(); ++i) {
                        column = partitionColumns.get(i).getAsJsonObject();
                        Column partitionCol = SchemaUtils.parseColumn(column);
                        schemaColumns.add(partitionCol);
                        partitionKeys.add(partitionCol.getName());
                    }
                }
                this.requiredSchema = DataSchema.newBuilder().columns(schemaColumns).partitionBy(partitionKeys).build();
            }
            if (tree.has("SupportedDataFormat")) {
                this.supportDataFormats = new HashSet();
                JsonArray formats = tree.get("SupportedDataFormat").getAsJsonArray();
                formats.forEach(format -> this.supportDataFormats.add(SessionUtils.parseDataFormat(format.getAsJsonObject())));
            }
            if (tree.has("MaxBlockNumber")) {
                this.maxBlockNumber = tree.get("MaxBlockNumber").getAsLong();
            }
            if (tree.has("RequiredOrdering")) {
                JsonArray orders = tree.get("RequiredOrdering").getAsJsonArray();
                ArrayList sortOrders = new ArrayList();
                orders.forEach(order -> sortOrders.add(this.parseOrders(order.getAsJsonObject())));
                this.requiredSortOrders = sortOrders.toArray(new SortOrder[0]);
            }
            if (tree.has("RequiredDistribution")) {
                JsonObject distribution = tree.get("RequiredDistribution").getAsJsonObject();
                Distribution.Type type = Distribution.Type.UNSPECIFIED;
                if (distribution.has("Type")) {
                    type = Distribution.Type.valueOf(distribution.get("Type").getAsString().toUpperCase());
                }
                if (type.equals((Object)Distribution.Type.UNSPECIFIED)) {
                    this.requiredDistribution = Distributions.unspecified();
                } else if (type.equals((Object)Distribution.Type.HASH) || type.equals((Object)Distribution.Type.RANGE)) {
                    ArrayList<String> clusterKeys = new ArrayList<String>();
                    if (distribution.has("ClusterKeys")) {
                        JsonArray keys = distribution.get("ClusterKeys").getAsJsonArray();
                        keys.forEach(key -> clusterKeys.add(key.getAsString()));
                    }
                    int bucketsNumber = -1;
                    if (distribution.has("BucketsNumber")) {
                        bucketsNumber = distribution.get("BucketsNumber").getAsInt();
                    }
                    this.requiredDistribution = Distributions.clustered(clusterKeys, type, bucketsNumber);
                }
            }
        }
        catch (Exception e) {
            throw new TunnelException("Invalid session response: \n" + json, (Throwable)e);
        }
    }

    private SortOrder parseOrders(JsonObject order) {
        Preconditions.checkArgument((boolean)order.has("Name"), (String)"Sort order must has name");
        String name = order.get("Name").getAsString();
        SortDirection sortDirection = order.has("SortDirection") ? SortDirection.valueOf(order.get("SortDirection").getAsString().toUpperCase()) : SortDirection.NONE;
        NullOrdering nullOrdering = order.has("NullOrdering") ? NullOrdering.valueOf(order.get("NullOrdering").getAsString().toUpperCase()) : NullOrdering.ANY;
        return new SortOrder(name, sortDirection, nullOrdering);
    }
}

