/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.influxdb.sink;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.IOException;
import java.net.ConnectException;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
import org.apache.seatunnel.connectors.seatunnel.influxdb.client.InfluxDBClient;
import org.apache.seatunnel.connectors.seatunnel.influxdb.config.SinkConfig;
import org.apache.seatunnel.connectors.seatunnel.influxdb.exception.InfluxdbConnectorErrorCode;
import org.apache.seatunnel.connectors.seatunnel.influxdb.exception.InfluxdbConnectorException;
import org.apache.seatunnel.connectors.seatunnel.influxdb.serialize.DefaultSerializer;
import org.apache.seatunnel.connectors.seatunnel.influxdb.serialize.Serializer;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.influxdb.InfluxDB;
import org.influxdb.dto.BatchPoints;
import org.influxdb.dto.Point;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class InfluxDBSinkWriter
extends AbstractSinkWriter<SeaTunnelRow, Void> {
    private static final Logger log = LoggerFactory.getLogger(InfluxDBSinkWriter.class);
    private final Serializer serializer;
    private InfluxDB influxdb;
    private final SinkConfig sinkConfig;
    private final List<Point> batchList;
    private ScheduledExecutorService scheduler;
    private ScheduledFuture<?> scheduledFuture;
    private volatile Exception flushException;
    private final Integer batchIntervalMs;

    public InfluxDBSinkWriter(Config pluginConfig, SeaTunnelRowType seaTunnelRowType) throws ConnectException {
        this.sinkConfig = SinkConfig.loadConfig(pluginConfig);
        this.batchIntervalMs = this.sinkConfig.getBatchIntervalMs();
        this.serializer = new DefaultSerializer(seaTunnelRowType, this.sinkConfig.getPrecision().getTimeUnit(), this.sinkConfig.getKeyTags(), this.sinkConfig.getKeyTime(), this.sinkConfig.getMeasurement());
        this.batchList = new ArrayList<Point>();
        if (this.batchIntervalMs != null) {
            this.scheduler = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat("influxDB-sink-output-%s").build());
            this.scheduledFuture = this.scheduler.scheduleAtFixedRate(() -> {
                try {
                    this.flush();
                }
                catch (IOException e) {
                    this.flushException = e;
                }
            }, this.batchIntervalMs.intValue(), this.batchIntervalMs.intValue(), TimeUnit.MILLISECONDS);
        }
        this.connect();
    }

    public void write(SeaTunnelRow element) throws IOException {
        Point record = this.serializer.serialize(element);
        this.write(record);
    }

    @Override
    public Optional<Void> prepareCommit() {
        this.flush();
        return super.prepareCommit();
    }

    public void close() throws IOException {
        if (this.scheduledFuture != null) {
            this.scheduledFuture.cancel(false);
            this.scheduler.shutdown();
        }
        this.flush();
        if (this.influxdb != null) {
            this.influxdb.close();
            this.influxdb = null;
        }
    }

    public void write(Point record) throws IOException {
        this.checkFlushException();
        this.batchList.add(record);
        if (this.sinkConfig.getBatchSize() > 0 && this.batchList.size() >= this.sinkConfig.getBatchSize()) {
            this.flush();
        }
    }

    public void flush() throws IOException {
        this.checkFlushException();
        if (this.batchList.isEmpty()) {
            return;
        }
        BatchPoints.Builder batchPoints = BatchPoints.database(this.sinkConfig.getDatabase());
        for (int i = 0; i <= this.sinkConfig.getMaxRetries(); ++i) {
            try {
                batchPoints.points(this.batchList);
                this.influxdb.write(batchPoints.build());
                continue;
            }
            catch (Exception e) {
                log.error("Writing records to influxdb failed, retry times = {}", (Object)i, (Object)e);
                if (i >= this.sinkConfig.getMaxRetries()) {
                    throw new InfluxdbConnectorException((SeaTunnelErrorCode)CommonErrorCode.FLUSH_DATA_FAILED, "Writing records to InfluxDB failed.", e);
                }
                try {
                    long backoff = Math.min(this.sinkConfig.getRetryBackoffMultiplierMs() * i, this.sinkConfig.getMaxRetryBackoffMs());
                    Thread.sleep(backoff);
                    continue;
                }
                catch (InterruptedException ex) {
                    Thread.currentThread().interrupt();
                    throw new InfluxdbConnectorException((SeaTunnelErrorCode)CommonErrorCode.FLUSH_DATA_FAILED, "Unable to flush; interrupted while doing another attempt.", e);
                }
            }
        }
        this.batchList.clear();
    }

    private void checkFlushException() {
        if (this.flushException != null) {
            throw new InfluxdbConnectorException((SeaTunnelErrorCode)CommonErrorCode.FLUSH_DATA_FAILED, "Writing records to InfluxDB failed.", this.flushException);
        }
    }

    public void connect() throws ConnectException {
        if (this.influxdb == null) {
            this.influxdb = InfluxDBClient.getWriteClient(this.sinkConfig);
            String version = this.influxdb.version();
            if (!this.influxdb.ping().isGood()) {
                throw new InfluxdbConnectorException((SeaTunnelErrorCode)InfluxdbConnectorErrorCode.CONNECT_FAILED, String.format("connect influxdb failed, due to influxdb version info is unknown, the url is: {%s}", this.sinkConfig.getUrl()));
            }
            log.info("connect influxdb successful. sever version :{}.", (Object)version);
        }
    }
}

