/*
 * Decompiled with CFR 0.152.
 */
package com.starrocks.connector.spark.sql.write;

import com.starrocks.connector.spark.sql.conf.WriteStarRocksConfig;
import com.starrocks.connector.spark.sql.schema.CsvRowStringConverter;
import com.starrocks.connector.spark.sql.schema.JSONRowStringConverter;
import com.starrocks.connector.spark.sql.schema.RowStringConverter;
import com.starrocks.connector.spark.sql.write.StarRocksWriterCommitMessage;
import com.starrocks.connector.spark.util.EnvUtils;
import com.starrocks.data.load.stream.StreamLoadManager;
import com.starrocks.data.load.stream.StreamLoadSnapshot;
import com.starrocks.data.load.stream.v2.StreamLoadManagerV2;
import java.io.IOException;
import java.io.Serializable;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.connector.write.DataWriter;
import org.apache.spark.sql.connector.write.WriterCommitMessage;
import org.apache.spark.sql.types.StructType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StarRocksDataWriter
implements DataWriter<InternalRow>,
Serializable {
    private static final Logger log = LoggerFactory.getLogger(StarRocksDataWriter.class);
    private final WriteStarRocksConfig config;
    private final int partitionId;
    private final long taskId;
    private final long epochId;
    private final RowStringConverter converter;
    private final StreamLoadManager manager;

    public StarRocksDataWriter(WriteStarRocksConfig config, StructType schema, int partitionId, long taskId, long epochId) {
        this.config = config;
        this.partitionId = partitionId;
        this.taskId = taskId;
        this.epochId = epochId;
        if ("csv".equalsIgnoreCase(config.getFormat())) {
            this.converter = new CsvRowStringConverter(schema, config.getColumnSeparator(), config.getTimeZone());
        } else if ("json".equalsIgnoreCase(config.getFormat())) {
            this.converter = new JSONRowStringConverter(schema, config.getStreamLoadColumnNames(), config.getTimeZone());
        } else {
            throw new RuntimeException("Unsupported format " + config.getFormat());
        }
        this.manager = new StreamLoadManagerV2(config.toStreamLoadProperties(), true);
    }

    public void open() {
        this.manager.init();
        log.info("Open data writer for partition: {}, task: {}, epoch: {}, {}", new Object[]{this.partitionId, this.taskId, this.epochId, EnvUtils.getGitInformation()});
    }

    public void write(InternalRow internalRow) throws IOException {
        String data = this.converter.fromRow(internalRow);
        this.manager.write(null, this.config.getDatabase(), this.config.getTable(), data);
        log.debug("partitionId: {}, taskId: {}, epochId: {}, receive raw row: {}", new Object[]{this.partitionId, this.taskId, this.epochId, internalRow});
        log.debug("partitionId: {}, taskId: {}, epochId: {}, receive converted row: {}", new Object[]{this.partitionId, this.taskId, this.epochId, data});
    }

    public WriterCommitMessage commit() throws IOException {
        log.info("partitionId: {}, taskId: {}, epochId: {} commit", new Object[]{this.partitionId, this.taskId, this.epochId});
        try {
            this.manager.flush();
            return new StarRocksWriterCommitMessage(this.partitionId, this.taskId, this.epochId, null);
        }
        catch (Exception e) {
            String errMsg = String.format("Failed to commit, partitionId: %s, taskId: %s, epochId: %s", this.partitionId, this.taskId, this.epochId);
            log.error("{}", (Object)errMsg, (Object)e);
            throw new IOException(errMsg, e);
        }
    }

    public void abort() throws IOException {
        log.info("partitionId: {}, taskId: {}, epochId: {} abort", new Object[]{this.partitionId, this.taskId, this.epochId});
        StreamLoadSnapshot snapshot = this.manager.snapshot();
        try {
            boolean success = this.manager.abort(snapshot);
            if (success) {
                return;
            }
            throw new IOException("abort not successful");
        }
        catch (Exception e) {
            String errMsg = String.format("Failed to abort, partitionId: %s, taskId: %s, epochId: %s", this.partitionId, this.taskId, this.epochId);
            log.error("{}", (Object)errMsg, (Object)e);
            throw new IOException(errMsg, e);
        }
    }

    public void close() throws IOException {
        log.info("partitionId: {}, taskId: {}, epochId: {} close", new Object[]{this.partitionId, this.taskId, this.epochId});
        this.manager.close();
    }
}

