/*
 * Decompiled with CFR 0.152.
 */
package com.aliyun.odps.table.write.impl.batch;

import com.aliyun.odps.OdpsException;
import com.aliyun.odps.commons.transport.Connection;
import com.aliyun.odps.commons.transport.Response;
import com.aliyun.odps.commons.util.IOUtils;
import com.aliyun.odps.rest.ResourceBuilder;
import com.aliyun.odps.rest.RestClient;
import com.aliyun.odps.table.DataSchema;
import com.aliyun.odps.table.TableIdentifier;
import com.aliyun.odps.table.arrow.ArrowWriter;
import com.aliyun.odps.table.arrow.ArrowWriterFactory;
import com.aliyun.odps.table.configuration.ArrowOptions;
import com.aliyun.odps.table.configuration.WriterOptions;
import com.aliyun.odps.table.enviroment.ExecutionEnvironment;
import com.aliyun.odps.table.metrics.Metrics;
import com.aliyun.odps.table.metrics.count.BytesCount;
import com.aliyun.odps.table.metrics.count.RecordCount;
import com.aliyun.odps.table.utils.SchemaUtils;
import com.aliyun.odps.table.write.BatchWriter;
import com.aliyun.odps.table.write.WriterAttemptId;
import com.aliyun.odps.table.write.WriterCommitMessage;
import com.aliyun.odps.table.write.impl.batch.WriterCommitMessageImpl;
import com.aliyun.odps.tunnel.TunnelException;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import java.io.IOException;
import java.io.InputStream;
import java.util.HashMap;
import javax.annotation.Nullable;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.types.pojo.Schema;

public class ArrowWriterImpl
implements BatchWriter<VectorSchemaRoot> {
    private boolean isClosed;
    private final long blockNumber;
    private final WriterOptions writerOptions;
    private final Schema arrowSchema;
    private final String sessionId;
    private final TableIdentifier identifier;
    private final WriterAttemptId attemptId;
    private Connection connection;
    private ArrowWriter batchWriter;
    private WriterCommitMessage commitMessage;
    private Metrics metrics;
    private BytesCount bytesCount;
    private RecordCount recordCount;

    public ArrowWriterImpl(String sessionId, TableIdentifier identifier, DataSchema schema, long blockNumber, WriterAttemptId attemptId, WriterOptions writerOptions, ArrowOptions arrowOptions) {
        this.sessionId = sessionId;
        this.identifier = identifier;
        this.blockNumber = blockNumber;
        this.attemptId = attemptId;
        this.writerOptions = writerOptions;
        this.arrowSchema = SchemaUtils.toArrowSchema(schema.getColumns(), arrowOptions);
        this.isClosed = false;
        this.initMetrics();
    }

    @Override
    public VectorSchemaRoot newElement() {
        return VectorSchemaRoot.create((Schema)this.arrowSchema, (BufferAllocator)this.writerOptions.getBufferAllocator());
    }

    @Override
    public void write(VectorSchemaRoot root) throws IOException {
        if (this.isClosed) {
            throw new IOException("Arrow writer is closed");
        }
        if (this.batchWriter == null) {
            this.openWriterConnection(this.sessionId, this.identifier, this.blockNumber, this.attemptId);
            this.batchWriter = ArrowWriterFactory.getRecordBatchWriter(this.connection.getOutputStream(), this.writerOptions);
        }
        try {
            this.batchWriter.writeBatch(root);
            this.recordCount.inc(root.getRowCount());
            this.bytesCount.setValue(this.batchWriter.bytesWritten());
        }
        catch (IOException e) {
            Response response = this.connection.getResponse();
            if (response != null && !response.isOK()) {
                TunnelException exception = new TunnelException(response.getHeader("x-odps-request-id"), this.connection.getInputStream(), Integer.valueOf(response.getStatus()));
                throw new IOException(exception.getMessage(), (Throwable)exception);
            }
            throw new IOException("ArrowHttpOutputStream Serialize Exception", e);
        }
    }

    @Override
    public void abort() throws IOException {
        this.disconnect();
    }

    @Override
    @Nullable
    public WriterCommitMessage commit() throws IOException {
        this.close();
        return this.commitMessage;
    }

    @Override
    public void close() throws IOException {
        if (!this.isClosed) {
            try {
                if (this.batchWriter != null) {
                    this.batchWriter.close();
                    Response response = this.connection.getResponse();
                    if (!response.isOK()) {
                        TunnelException exception = new TunnelException(response.getHeader("x-odps-request-id"), this.connection.getInputStream(), Integer.valueOf(response.getStatus()));
                        throw new IOException(exception.getMessage(), (Throwable)exception);
                    }
                    this.commitMessage = new WriterCommitMessageImpl(this.blockNumber, this.loadResultFromJson(this.connection.getInputStream()));
                }
            }
            finally {
                this.disconnect();
                this.isClosed = true;
            }
        }
    }

    @Override
    public Metrics currentMetricsValues() {
        return this.metrics;
    }

    private void initMetrics() {
        this.bytesCount = new BytesCount();
        this.recordCount = new RecordCount();
        this.metrics = new Metrics();
        this.metrics.register(this.bytesCount);
        this.metrics.register(this.recordCount);
    }

    private void openWriterConnection(String sessionId, TableIdentifier identifier, long blockNumber, WriterAttemptId attemptId) throws IOException {
        HashMap<String, String> headers = new HashMap<String, String>();
        headers.put("Transfer-Encoding", "chunked");
        headers.put("Content-Type", "application/octet-stream");
        HashMap<String, String> params = new HashMap<String, String>();
        params.put("block_number", Long.toString(blockNumber));
        params.put("attempt_number", Integer.toString(attemptId.getAttemptNumber()));
        params.put("data_format_type", this.writerOptions.getDataFormat().getType().toString());
        params.put("data_format_version", this.writerOptions.getDataFormat().getVersion().toString());
        if (this.writerOptions.getSettings() != null && this.writerOptions.getSettings().getQuotaName().isPresent()) {
            params.put("quotaName", this.writerOptions.getSettings().getQuotaName().get());
        }
        String resource = ResourceBuilder.buildTableSessionDataResource((String)"v1", (String)identifier.getProject(), (String)identifier.getSchema(), (String)identifier.getTable(), (String)sessionId);
        try {
            RestClient restClient = ExecutionEnvironment.create(this.writerOptions.getSettings()).createHttpClient(identifier.getProject());
            restClient.setChunkSize(this.writerOptions.getChunkSize());
            this.connection = restClient.connect(resource, "POST", params, headers);
        }
        catch (OdpsException | IOException e) {
            this.disconnect();
            throw new IOException(e.getMessage(), e);
        }
    }

    private String loadResultFromJson(InputStream is) throws IOException {
        String result = "";
        try {
            String json = IOUtils.readStreamAsString((InputStream)is);
            JsonObject tree = new JsonParser().parse(json).getAsJsonObject();
            if (tree.has("CommitMessage")) {
                result = tree.get("CommitMessage").getAsString();
            }
        }
        catch (Exception e) {
            throw new IOException("Parse writer commit response failed", e);
        }
        finally {
            if (is != null) {
                is.close();
            }
        }
        return result;
    }

    private void disconnect() throws IOException {
        if (this.connection != null) {
            this.connection.disconnect();
        }
    }
}

