/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.neo4j.sink;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
import org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jSinkQueryInfo;
import org.apache.seatunnel.connectors.seatunnel.neo4j.constants.CypherEnum;
import org.apache.seatunnel.connectors.seatunnel.neo4j.exception.Neo4jConnectorErrorCode;
import org.apache.seatunnel.connectors.seatunnel.neo4j.exception.Neo4jConnectorException;
import org.apache.seatunnel.connectors.seatunnel.neo4j.internal.SeaTunnelRowNeo4jValue;
import org.neo4j.driver.Driver;
import org.neo4j.driver.Query;
import org.neo4j.driver.Session;
import org.neo4j.driver.SessionConfig;
import org.neo4j.driver.Value;
import org.neo4j.driver.Values;
import org.neo4j.driver.exceptions.ClientException;
import org.neo4j.driver.exceptions.Neo4jException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Neo4jSinkWriter
implements SinkWriter<SeaTunnelRow, Void, Void> {
    private static final Logger log = LoggerFactory.getLogger(Neo4jSinkWriter.class);
    private final Neo4jSinkQueryInfo neo4jSinkQueryInfo;
    private final transient Driver driver;
    private final transient Session session;
    private final SeaTunnelRowType seaTunnelRowType;
    private final List<SeaTunnelRowNeo4jValue> writeBuffer;
    private final Integer maxBatchSize;

    public Neo4jSinkWriter(Neo4jSinkQueryInfo neo4jSinkQueryInfo, SeaTunnelRowType seaTunnelRowType) {
        this.neo4jSinkQueryInfo = neo4jSinkQueryInfo;
        this.driver = this.neo4jSinkQueryInfo.getDriverBuilder().build();
        this.session = this.driver.session(SessionConfig.forDatabase(neo4jSinkQueryInfo.getDriverBuilder().getDatabase()));
        this.seaTunnelRowType = seaTunnelRowType;
        this.maxBatchSize = Optional.ofNullable(neo4jSinkQueryInfo.getMaxBatchSize()).orElse(0);
        this.writeBuffer = new ArrayList<SeaTunnelRowNeo4jValue>(this.maxBatchSize);
    }

    public void write(SeaTunnelRow element) throws IOException {
        if (this.neo4jSinkQueryInfo.batchMode()) {
            this.writeByBatchSize(element);
        } else {
            this.writeOneByOne(element);
        }
    }

    private void writeOneByOne(SeaTunnelRow element) {
        Map<String, Object> queryParamPosition = this.neo4jSinkQueryInfo.getQueryParamPosition().entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> element.getField(((Integer)e.getValue()).intValue())));
        Query query = new Query(this.neo4jSinkQueryInfo.getQuery(), queryParamPosition);
        this.writeByQuery(query);
    }

    private void writeByBatchSize(SeaTunnelRow element) {
        this.writeBuffer.add(new SeaTunnelRowNeo4jValue(this.seaTunnelRowType, element));
        this.tryWriteByBatchSize();
    }

    private void tryWriteByBatchSize() {
        if (!this.writeBuffer.isEmpty() && this.writeBuffer.size() >= this.maxBatchSize) {
            Query query = this.batchQuery();
            this.writeByQuery(query);
            this.writeBuffer.clear();
        }
    }

    private Query batchQuery() {
        try {
            Value batchValues = Values.parameters(CypherEnum.BATCH.getValue(), this.writeBuffer);
            return new Query(this.neo4jSinkQueryInfo.getQuery(), batchValues);
        }
        catch (ClientException e) {
            log.error("Failed to build cypher statement", (Throwable)e);
            throw new Neo4jConnectorException((SeaTunnelErrorCode)SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED, String.format("PluginName: %s, PluginType: %s, Message: %s", "Neo4j", PluginType.SINK, e.getMessage()));
        }
    }

    private void writeByQuery(Query query) {
        try {
            this.session.writeTransaction(tx -> {
                tx.run(query);
                return null;
            });
        }
        catch (Neo4jException e) {
            throw new Neo4jConnectorException((SeaTunnelErrorCode)Neo4jConnectorErrorCode.DATE_BASE_ERROR, e.getMessage());
        }
    }

    public Optional<Void> prepareCommit() throws IOException {
        return Optional.empty();
    }

    public void abortPrepare() {
    }

    public void close() throws IOException {
        this.flushWriteBuffer();
        this.session.close();
        this.driver.close();
    }

    private void flushWriteBuffer() {
        if (!this.writeBuffer.isEmpty()) {
            Query query = this.batchQuery();
            this.writeByQuery(query);
            this.writeBuffer.clear();
        }
    }
}

