/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.ververica.connectors.sls.sink;

import com.alibaba.ververica.connectors.common.MetricUtils;
import com.alibaba.ververica.connectors.common.metrics.SimpleGauge;
import com.alibaba.ververica.connectors.common.sink.HasRetryTimeout;
import com.alibaba.ververica.connectors.common.sink.Syncable;
import com.alibaba.ververica.connectors.sls.SLSAccessInfo;
import com.alibaba.ververica.connectors.sls.SLSUtils;
import com.alibaba.ververica.connectors.sls.sink.SLSLogProducerProvider;
import com.alibaba.ververica.connectors.sls.sink.SLSRichOutputFormat;
import com.alibaba.ververica.connectors.sls.sink.SinkRecord;
import com.aliyun.openservices.aliyun.log.producer.LogProducer;
import com.aliyun.openservices.aliyun.log.producer.Result;
import com.aliyun.openservices.aliyun.log.producer.errors.ProducerException;
import com.aliyun.openservices.aliyun.log.producer.errors.ResultFailedException;
import com.shade.google.common.util.concurrent.FutureCallback;
import com.shade.google.common.util.concurrent.Futures;
import com.shade.google.common.util.concurrent.ListenableFuture;
import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.io.RichOutputFormat;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Meter;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SLSOutputFormat
extends RichOutputFormat<SinkRecord>
implements Syncable,
HasRetryTimeout {
    private static final long serialVersionUID = -8829623574993898905L;
    private static final transient Logger LOG = LoggerFactory.getLogger(SLSRichOutputFormat.class);
    private boolean failOnError = true;
    private int retryTimeoutMS = 300000;
    private Meter outTps;
    private SimpleGauge latencyGauge;
    private AtomicLong sent = new AtomicLong(0L);
    private AtomicLong committed = new AtomicLong(0L);
    private transient SendFutureCallback sendCallback;
    private transient ExecutorService executor;
    private transient RuntimeException sendError = null;
    private SLSAccessInfo accessInfo;
    private Configuration props;
    private transient LogProducer producer;
    private transient SLSLogProducerProvider logProducerProvider;

    public SLSOutputFormat(Configuration configuration) {
        this.accessInfo = SLSUtils.parseProducerConfig(configuration);
        this.props = configuration;
        this.logProducerProvider = new SLSLogProducerProvider(this.accessInfo, configuration);
    }

    public void open(int taskNumber, int numTasks) {
        this.producer = (LogProducer)this.logProducerProvider.getClient();
        this.sendCallback = new SendFutureCallback();
        this.executor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
        this.outTps = MetricUtils.registerNumRecordsOutRate(this.getRuntimeContext());
        this.latencyGauge = MetricUtils.registerCurrentSendTime(this.getRuntimeContext());
    }

    public void writeRecord(SinkRecord record) throws IOException {
        if (record == null) {
            return;
        }
        long start = System.currentTimeMillis();
        int retryInterval = 1000;
        while (true) {
            try {
                ListenableFuture<Result> future = this.producer.send(this.accessInfo.getProjectName(), this.accessInfo.getLogstore(), record.getTopic(), record.getSource(), record.getPartitionKey(), record.getContent());
                Futures.addCallback(future, this.sendCallback, this.executor);
                this.sent.incrementAndGet();
            }
            catch (InterruptedException ex) {
                this.sendError = new RuntimeException(ex);
                Thread.currentThread().interrupt();
                continue;
            }
            catch (ProducerException ex) {
                if (System.currentTimeMillis() - start < this.getRetryTimeout()) {
                    if (this.logProducerProvider != null) {
                        this.producer = (LogProducer)this.logProducerProvider.getClient(true, true);
                    }
                    LOG.warn("Write record failed, wait {} ms and retry", (Object)retryInterval, (Object)ex);
                    try {
                        Thread.sleep(retryInterval);
                    }
                    catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    continue;
                }
                this.sendError = new RuntimeException(ex);
                LOG.warn("Error sinking records to SLS", this.sendError);
                if (this.failOnError) {
                    throw this.sendError;
                }
                this.sendError = null;
                long end = System.currentTimeMillis();
                this.latencyGauge.report(end - start, 1L);
                this.outTps.markEvent();
            }
            break;
        }
    }

    @Override
    public void sync() throws IOException {
        while (this.sent.get() != this.committed.get()) {
            try {
                Thread.sleep(10L);
            }
            catch (InterruptedException e) {
                throw new IOException("Fail to write to sls", e);
            }
        }
        if (this.sendError != null) {
            LOG.error("Fail to write to sls", this.sendError);
            if (this.failOnError) {
                throw new IOException("Fail to write to sls", this.sendError);
            }
            this.sendError = null;
        }
    }

    public void close() throws IOException {
        LOG.info("closing producer");
        this.closeProducer();
        if (this.executor != null) {
            this.executor.shutdown();
            try {
                if (!this.executor.awaitTermination(200L, TimeUnit.MILLISECONDS)) {
                    this.executor.shutdownNow();
                    if (!this.executor.awaitTermination(200L, TimeUnit.MILLISECONDS)) {
                        LOG.warn("Stopping executor failed");
                    }
                }
            }
            catch (InterruptedException ex) {
                LOG.warn("Await termination interrupted");
                Thread.currentThread().interrupt();
            }
        }
        LOG.info("Closing the producer instance.");
    }

    private synchronized void closeProducer() {
        if (this.producer == null) {
            return;
        }
        try {
            this.producer.close();
        }
        catch (InterruptedException interruptedException) {
        }
        catch (ProducerException e) {
            LOG.error("Error closing producer", e);
        }
        this.producer = null;
        if (this.logProducerProvider != null) {
            this.logProducerProvider.closeClient();
        }
    }

    public void configure(Configuration config) {
        this.props.addAll(config);
        this.accessInfo = SLSUtils.parseProducerConfig(this.props);
        this.closeProducer();
        this.logProducerProvider = new SLSLogProducerProvider(this.accessInfo, config);
        this.producer = (LogProducer)this.logProducerProvider.getClient();
    }

    @Override
    public long getRetryTimeout() {
        return this.retryTimeoutMS;
    }

    @VisibleForTesting
    AtomicLong getCommitted() {
        return this.committed;
    }

    @VisibleForTesting
    RuntimeException getSendError() {
        return this.sendError;
    }

    public String toString() {
        return String.format("SLS Sink to %s.%s", this.accessInfo.getProjectName(), this.accessInfo.getLogstore());
    }

    @VisibleForTesting
    public final class SendFutureCallback
    implements FutureCallback<Result> {
        @Override
        public void onSuccess(@Nullable Result result) {
            LOG.debug("SLS sink callback result: {}", (Object)result);
            if (result != null && !result.isSuccessful()) {
                SLSOutputFormat.this.sendError = new RuntimeException(result.getErrorMessage());
            }
            SLSOutputFormat.this.committed.incrementAndGet();
        }

        @Override
        public void onFailure(Throwable throwable) {
            if (SLSOutputFormat.this.sendError == null) {
                LOG.error("SLS sink callback error", throwable);
                if (throwable instanceof ResultFailedException) {
                    ResultFailedException exception = (ResultFailedException)throwable;
                    SLSOutputFormat.this.sendError = new RuntimeException("An exception was thrown, result: " + exception.getResult().toString(), throwable);
                } else {
                    SLSOutputFormat.this.sendError = new RuntimeException("An exception was thrown", throwable);
                }
            }
            SLSOutputFormat.this.committed.incrementAndGet();
        }
    }
}

