/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.easysearch.sink;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.table.type.RowKind;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
import org.apache.seatunnel.common.utils.RetryUtils;
import org.apache.seatunnel.connectors.seatunnel.easysearch.client.EasysearchClient;
import org.apache.seatunnel.connectors.seatunnel.easysearch.config.EasysearchSinkOptions;
import org.apache.seatunnel.connectors.seatunnel.easysearch.dto.BulkResponse;
import org.apache.seatunnel.connectors.seatunnel.easysearch.dto.IndexInfo;
import org.apache.seatunnel.connectors.seatunnel.easysearch.exception.EasysearchConnectorErrorCode;
import org.apache.seatunnel.connectors.seatunnel.easysearch.exception.EasysearchConnectorException;
import org.apache.seatunnel.connectors.seatunnel.easysearch.serialize.EasysearchRowSerializer;
import org.apache.seatunnel.connectors.seatunnel.easysearch.serialize.SeaTunnelRowSerializer;
import org.apache.seatunnel.connectors.seatunnel.easysearch.state.EasysearchCommitInfo;
import org.apache.seatunnel.connectors.seatunnel.easysearch.state.EasysearchSinkState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class EasysearchSinkWriter
implements SinkWriter<SeaTunnelRow, EasysearchCommitInfo, EasysearchSinkState> {
    private static final Logger log = LoggerFactory.getLogger(EasysearchSinkWriter.class);
    private static final long DEFAULT_SLEEP_TIME_MS = 200L;
    private final SinkWriter.Context context;
    private final int maxBatchSize;
    private final SeaTunnelRowSerializer seaTunnelRowSerializer;
    private final List<String> requestEzsList;
    private EasysearchClient ezsClient;
    private RetryUtils.RetryMaterial retryMaterial;

    public EasysearchSinkWriter(SinkWriter.Context context, SeaTunnelRowType seaTunnelRowType, ReadonlyConfig pluginConfig) {
        this.context = context;
        this.maxBatchSize = (Integer)pluginConfig.get(EasysearchSinkOptions.MAX_BATCH_SIZE);
        IndexInfo indexInfo = new IndexInfo(pluginConfig);
        this.ezsClient = EasysearchClient.createInstance(pluginConfig);
        this.seaTunnelRowSerializer = new EasysearchRowSerializer(indexInfo, seaTunnelRowType);
        this.requestEzsList = new ArrayList<String>(this.maxBatchSize);
        this.retryMaterial = new RetryUtils.RetryMaterial(((Integer)pluginConfig.get(EasysearchSinkOptions.MAX_RETRY_COUNT)).intValue(), true, exception -> true, 200L);
    }

    public void write(SeaTunnelRow element) {
        if (RowKind.UPDATE_BEFORE.equals((Object)element.getRowKind())) {
            return;
        }
        String indexRequestRow = this.seaTunnelRowSerializer.serializeRow(element);
        this.requestEzsList.add(indexRequestRow);
        if (this.requestEzsList.size() >= this.maxBatchSize) {
            this.bulkEzsWithRetry(this.ezsClient, this.requestEzsList);
        }
    }

    public Optional<EasysearchCommitInfo> prepareCommit() {
        this.bulkEzsWithRetry(this.ezsClient, this.requestEzsList);
        return Optional.empty();
    }

    public void abortPrepare() {
    }

    public synchronized void bulkEzsWithRetry(EasysearchClient ezsClient, List<String> requestEzsList) {
        try {
            RetryUtils.retryWithException(() -> {
                if (requestEzsList.size() > 0) {
                    String requestBody = String.join((CharSequence)"\n", requestEzsList) + "\n";
                    BulkResponse bulkResponse = ezsClient.bulk(requestBody);
                    if (bulkResponse.isErrors()) {
                        throw new EasysearchConnectorException((SeaTunnelErrorCode)EasysearchConnectorErrorCode.BULK_RESPONSE_ERROR, "bulk ezs error: " + bulkResponse.getResponse());
                    }
                    return bulkResponse;
                }
                return null;
            }, (RetryUtils.RetryMaterial)this.retryMaterial);
            requestEzsList.clear();
        }
        catch (Exception e) {
            throw new EasysearchConnectorException(EasysearchConnectorErrorCode.SQL_OPERATION_FAILED, "Easysearch execute batch statement error", e);
        }
    }

    public void close() throws IOException {
        this.bulkEzsWithRetry(this.ezsClient, this.requestEzsList);
        this.ezsClient.close();
    }
}

