/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.cassandra.sink;

import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.cql.AsyncResultSet;
import com.datastax.oss.driver.api.core.cql.BatchStatement;
import com.datastax.oss.driver.api.core.cql.BoundStatement;
import com.datastax.oss.driver.api.core.cql.ColumnDefinitions;
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
import com.datastax.oss.driver.api.core.type.DataType;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
import org.apache.seatunnel.common.utils.ExceptionUtils;
import org.apache.seatunnel.connectors.seatunnel.cassandra.client.CassandraClient;
import org.apache.seatunnel.connectors.seatunnel.cassandra.config.CassandraConfig;
import org.apache.seatunnel.connectors.seatunnel.cassandra.exception.CassandraConnectorErrorCode;
import org.apache.seatunnel.connectors.seatunnel.cassandra.exception.CassandraConnectorException;
import org.apache.seatunnel.connectors.seatunnel.cassandra.util.TypeConvertUtil;
import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CassandraSinkWriter
extends AbstractSinkWriter<SeaTunnelRow, Void> {
    private static final Logger log = LoggerFactory.getLogger(CassandraSinkWriter.class);
    private final CassandraConfig cassandraConfig;
    private final SeaTunnelRowType seaTunnelRowType;
    private final ColumnDefinitions tableSchema;
    private final CqlSession session;
    private BatchStatement batchStatement;
    private List<BoundStatement> boundStatementList;
    private List<CompletionStage<AsyncResultSet>> completionStages;
    private final PreparedStatement preparedStatement;
    private final AtomicInteger counter = new AtomicInteger(0);

    public CassandraSinkWriter(CassandraConfig cassandraConfig, SeaTunnelRowType seaTunnelRowType, ColumnDefinitions tableSchema) {
        this.cassandraConfig = cassandraConfig;
        this.seaTunnelRowType = seaTunnelRowType;
        this.tableSchema = tableSchema;
        this.session = (CqlSession)CassandraClient.getCqlSessionBuilder(cassandraConfig.getHost(), cassandraConfig.getKeyspace(), cassandraConfig.getUsername(), cassandraConfig.getPassword(), cassandraConfig.getDatacenter()).build();
        this.batchStatement = BatchStatement.builder(cassandraConfig.getBatchType()).build();
        this.boundStatementList = new ArrayList<BoundStatement>();
        this.completionStages = new ArrayList<CompletionStage<AsyncResultSet>>();
        this.preparedStatement = this.session.prepare(this.initPrepareCQL());
    }

    public void write(SeaTunnelRow row) throws IOException {
        BoundStatement boundStatement = this.preparedStatement.bind(new Object[0]);
        this.addIntoBatch(row, boundStatement);
        if (this.counter.getAndIncrement() >= this.cassandraConfig.getBatchSize()) {
            this.flush();
            this.counter.set(0);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void flush() {
        if (this.cassandraConfig.getAsyncWrite().booleanValue()) {
            this.completionStages.forEach(resultStage -> resultStage.whenComplete((resultSet, error) -> {
                if (error != null) {
                    log.error(ExceptionUtils.getMessage((Throwable)error));
                }
            }));
            this.completionStages.clear();
        } else {
            try {
                this.session.execute(this.batchStatement.addAll(this.boundStatementList));
            }
            catch (Exception e) {
                log.error("Batch insert error,Try inserting one by one!");
                for (BoundStatement statement : this.boundStatementList) {
                    this.session.execute(statement);
                }
            }
            finally {
                this.batchStatement.clear();
                this.boundStatementList.clear();
            }
        }
    }

    private void addIntoBatch(SeaTunnelRow row, BoundStatement boundStatement) {
        try {
            for (int i = 0; i < this.cassandraConfig.getFields().size(); ++i) {
                String fieldName = this.cassandraConfig.getFields().get(i);
                DataType dataType = this.tableSchema.get(i).getType();
                Object fieldValue = row.getField(this.seaTunnelRowType.indexOf(fieldName));
                boundStatement = TypeConvertUtil.reconvertAndInject(boundStatement, i, dataType, fieldValue);
            }
            if (this.cassandraConfig.getAsyncWrite().booleanValue()) {
                this.completionStages.add(this.session.executeAsync(boundStatement));
            } else {
                this.boundStatementList.add(boundStatement);
            }
        }
        catch (Exception e) {
            throw new CassandraConnectorException((SeaTunnelErrorCode)CassandraConnectorErrorCode.ADD_BATCH_DATA_FAILED, e);
        }
    }

    private String initPrepareCQL() {
        Object[] placeholder = new String[this.cassandraConfig.getFields().size()];
        Arrays.fill(placeholder, "?");
        return String.format("INSERT INTO %s (%s) VALUES (%s)", this.cassandraConfig.getTable(), String.join((CharSequence)",", this.cassandraConfig.getFields()), String.join((CharSequence)",", (CharSequence[])placeholder));
    }

    public void close() throws IOException {
        this.flush();
        try {
            if (this.session != null) {
                this.session.close();
            }
        }
        catch (Exception e) {
            throw new CassandraConnectorException((SeaTunnelErrorCode)CassandraConnectorErrorCode.CLOSE_CQL_SESSION_FAILED, e);
        }
    }
}

