/*
 * Decompiled with CFR 0.152.
 */
package com.volcengine.service.tls;

import com.volcengine.model.tls.exception.LogException;
import com.volcengine.model.tls.pb.PutLogRequest;
import com.volcengine.model.tls.producer.BatchLog;
import com.volcengine.model.tls.producer.CallBack;
import com.volcengine.model.tls.producer.ProducerConfig;
import com.volcengine.service.tls.BatchHandler;
import com.volcengine.service.tls.LogDispatcher;
import com.volcengine.service.tls.Mover;
import com.volcengine.service.tls.Producer;
import com.volcengine.service.tls.RetryManager;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class ProducerImpl
implements Producer {
    private static final Log LOG = LogFactory.getLog(ProducerImpl.class);
    private ProducerConfig producerConfig;
    private final LogDispatcher dispatcher;
    private static final AtomicInteger INSTANCE_ID = new AtomicInteger(0);
    private final String name;
    private final Semaphore memoryLock;
    private final BatchHandler successHandler;
    private final BatchHandler failHandler;
    private final RetryManager retryManager;
    private final AtomicInteger batchCount = new AtomicInteger(0);
    private final Mover mover;

    public ProducerImpl(ProducerConfig producerConfig) throws LogException {
        this.producerConfig = producerConfig;
        this.name = "TLS-" + INSTANCE_ID.incrementAndGet();
        LinkedBlockingQueue<BatchLog> successQueue = new LinkedBlockingQueue<BatchLog>();
        LinkedBlockingQueue<BatchLog> failureQueue = new LinkedBlockingQueue<BatchLog>();
        this.memoryLock = new Semaphore(producerConfig.getTotalSizeInBytes());
        this.retryManager = new RetryManager();
        this.dispatcher = new LogDispatcher(producerConfig, this.name, successQueue, failureQueue, this.memoryLock, this.batchCount, this.retryManager);
        this.successHandler = new BatchHandler("success batch handler-" + this.name, this.memoryLock, successQueue, this.batchCount);
        this.failHandler = new BatchHandler("fail batch handler-" + this.name, this.memoryLock, failureQueue, this.batchCount);
        this.mover = new Mover(this.name + "-mover", producerConfig, this.dispatcher, this.retryManager, successQueue, failureQueue);
    }

    public static Producer defaultProducer(String endpoint, String region, String accessKey, String accessSecret, String token) throws LogException {
        return new ProducerImpl(new ProducerConfig(endpoint, region, accessKey, accessSecret, token));
    }

    @Override
    public void sendLog(String hashKey, String topicId, String source, String filename, PutLogRequest.Log log, CallBack callBack) throws InterruptedException, LogException {
        if (topicId == null || log == null) {
            throw new LogException("InvalidArgument", String.format("topic id:%s,log:%s", topicId, log), null);
        }
        PutLogRequest.LogGroup logGroup = PutLogRequest.LogGroup.newBuilder().setFileName(filename).setSource(source).addLogs(log).build();
        this.sendLogGroup(hashKey, topicId, source, filename, logGroup, callBack);
    }

    @Override
    public void sendLogGroup(String hashKey, String topicId, String source, String filename, PutLogRequest.LogGroup logGroup, CallBack callBack) throws InterruptedException, LogException {
        if (topicId == null || logGroup == null || logGroup.getLogsList() == null || logGroup.getLogsList().size() == 0) {
            throw new LogException("InvalidArgument", String.format("topic id:%s,log group:%s", topicId, logGroup), null);
        }
        if (logGroup.getLogsList().size() > this.producerConfig.getMaxBatchCount()) {
            throw new LogException("InvalidArgument", String.format("log list size %d is  greater than threshold %d", logGroup.getLogsList().size(), this.producerConfig.getMaxBatchCount()), null);
        }
        this.dispatcher.addBatch(hashKey, topicId, source, filename, logGroup, callBack);
    }

    @Override
    public void resetAccessKeyToken(String accessKey, String secretKey, String securityToken) throws LogException {
        if (StringUtils.isEmpty((CharSequence)accessKey) || StringUtils.isEmpty((CharSequence)secretKey)) {
            throw new LogException("InvalidArgument", String.format("reset producer %s access key failed,accessKey is %s,secretKey is %s, token is %s", this.name, accessKey, secretKey, securityToken), null);
        }
        this.dispatcher.resetAccessKeyToken(accessKey, secretKey, securityToken);
    }

    @Override
    public void start() throws LogException {
        this.producerConfig.validConfig();
        this.dispatcher.start();
        this.successHandler.start();
        this.failHandler.start();
        this.mover.start();
        LOG.info((Object)String.format("producer %s started", this.name));
    }

    @Override
    public void close() throws InterruptedException, LogException {
        this.dispatcher.close();
        this.successHandler.interrupt();
        this.failHandler.interrupt();
        this.mover.close();
        LOG.info((Object)String.format("producer %s closed", this.name));
    }

    @Override
    public void closeNow() throws InterruptedException, LogException {
        this.dispatcher.closeNow();
        this.successHandler.close();
        this.failHandler.close();
        this.mover.close();
        LOG.info((Object)String.format("producer %s closed now", this.name));
    }

    @Override
    public void config(ProducerConfig producerConfig) throws LogException {
        if (producerConfig != null) {
            this.producerConfig = producerConfig;
            producerConfig.validConfig();
            LOG.info((Object)String.format("producer %s configured,config: %s", this.name, producerConfig));
        }
    }
}

