/*
 * Decompiled with CFR 0.152.
 */
package kafka.producer.async;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import kafka.metrics.KafkaMetricsGroup;
import kafka.metrics.KafkaMetricsGroup$class;
import kafka.producer.KeyedMessage;
import kafka.producer.async.EventHandler;
import kafka.producer.async.IllegalQueueStateException;
import kafka.producer.async.ProducerSendThread$;
import kafka.utils.Log4jController$;
import kafka.utils.Logging;
import kafka.utils.Logging$class;
import kafka.utils.SystemTime$;
import org.apache.log4j.Logger;
import scala.Function0;
import scala.Function1;
import scala.Predef$;
import scala.ScalaObject;
import scala.collection.Seq;
import scala.collection.mutable.ArrayBuffer;
import scala.collection.mutable.StringBuilder;
import scala.package$;
import scala.reflect.ScalaSignature;
import scala.runtime.BooleanRef;
import scala.runtime.BoxesRunTime;
import scala.runtime.LongRef;
import scala.runtime.ObjectRef;

/*
 * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
 */
@ScalaSignature(bytes="\u0006\u0001\u0005\u0005c\u0001B\u0001\u0003\u0001%\u0011!\u0003\u0015:pIV\u001cWM]*f]\u0012$\u0006N]3bI*\u00111\u0001B\u0001\u0006CNLhn\u0019\u0006\u0003\u000b\u0019\t\u0001\u0002\u001d:pIV\u001cWM\u001d\u0006\u0002\u000f\u0005)1.\u00194lC\u000e\u0001Qc\u0001\u0006C\u0019N)\u0001aC\n\u001a?A\u0011A\"E\u0007\u0002\u001b)\u0011abD\u0001\u0005Y\u0006twMC\u0001\u0011\u0003\u0011Q\u0017M^1\n\u0005Ii!A\u0002+ie\u0016\fG\r\u0005\u0002\u0015/5\tQC\u0003\u0002\u0017\r\u0005)Q\u000f^5mg&\u0011\u0001$\u0006\u0002\b\u0019><w-\u001b8h!\tQR$D\u0001\u001c\u0015\tab!A\u0004nKR\u0014\u0018nY:\n\u0005yY\"!E&bM.\fW*\u001a;sS\u000e\u001cxI]8vaB\u0011\u0001eI\u0007\u0002C)\t!%A\u0003tG\u0006d\u0017-\u0003\u0002%C\tY1kY1mC>\u0013'.Z2u\u0011!1\u0003A!b\u0001\n\u00039\u0013A\u0003;ie\u0016\fGMT1nKV\t\u0001\u0006\u0005\u0002*Y9\u0011\u0001EK\u0005\u0003W\u0005\na\u0001\u0015:fI\u00164\u0017BA\u0017/\u0005\u0019\u0019FO]5oO*\u00111&\t\u0005\ta\u0001\u0011\t\u0011)A\u0005Q\u0005YA\u000f\u001b:fC\u0012t\u0015-\\3!\u0011!\u0011\u0004A!b\u0001\n\u0003\u0019\u0014!B9vKV,W#\u0001\u001b\u0011\u0007URD(D\u00017\u0015\t9\u0004(\u0001\u0006d_:\u001cWO\u001d:f]RT!!O\b\u0002\tU$\u0018\u000e\\\u0005\u0003wY\u0012QB\u00117pG.LgnZ)vKV,\u0007\u0003B\u001f?\u0001.k\u0011\u0001B\u0005\u0003\u007f\u0011\u0011AbS3zK\u0012lUm]:bO\u0016\u0004\"!\u0011\"\r\u0001\u0011A1\t\u0001C\u0001\u0002\u000b\u0007AIA\u0001L#\t)\u0005\n\u0005\u0002!\r&\u0011q)\t\u0002\b\u001d>$\b.\u001b8h!\t\u0001\u0013*\u0003\u0002KC\t\u0019\u0011I\\=\u0011\u0005\u0005cE\u0001C'\u0001\t\u0003\u0005)\u0019\u0001#\u0003\u0003YC\u0001b\u0014\u0001\u0003\u0002\u0003\u0006I\u0001N\u0001\u0007cV,W/\u001a\u0011\t\u0011E\u0003!Q1A\u0005\u0002I\u000bq\u0001[1oI2,'/F\u0001T!\u0011!V\u000bQ&\u000e\u0003\tI!A\u0016\u0002\u0003\u0019\u00153XM\u001c;IC:$G.\u001a:\t\u0011a\u0003!\u0011!Q\u0001\nM\u000b\u0001\u0002[1oI2,'\u000f\t\u0005\t5\u0002\u0011)\u0019!C\u00017\u0006I\u0011/^3vKRKW.Z\u000b\u00029B\u0011\u0001%X\u0005\u0003=\u0006\u0012A\u0001T8oO\"A\u0001\r\u0001B\u0001B\u0003%A,\u0001\u0006rk\u0016,X\rV5nK\u0002B\u0001B\u0019\u0001\u0003\u0006\u0004%\taY\u0001\nE\u0006$8\r[*ju\u0016,\u0012\u0001\u001a\t\u0003A\u0015L!AZ\u0011\u0003\u0007%sG\u000f\u0003\u0005i\u0001\t\u0005\t\u0015!\u0003e\u0003)\u0011\u0017\r^2i'&TX\r\t\u0005\tU\u0002\u0011)\u0019!C\u0001O\u0005A1\r\\5f]RLE\r\u0003\u0005m\u0001\t\u0005\t\u0015!\u0003)\u0003%\u0019G.[3oi&#\u0007\u0005C\u0003o\u0001\u0011\u0005q.\u0001\u0004=S:LGO\u0010\u000b\baF\u00148\u000f^;w!\u0011!\u0006\u0001Q&\t\u000b\u0019j\u0007\u0019\u0001\u0015\t\u000bIj\u0007\u0019\u0001\u001b\t\u000bEk\u0007\u0019A*\t\u000bik\u0007\u0019\u0001/\t\u000b\tl\u0007\u0019\u00013\t\u000b)l\u0007\u0019\u0001\u0015\t\u000fa\u0004!\u0019!C\u0005s\u0006i1\u000f[;uI><h\u000eT1uG\",\u0012A\u001f\t\u0003kmL!\u0001 \u001c\u0003\u001d\r{WO\u001c;E_^tG*\u0019;dQ\"1a\u0010\u0001Q\u0001\ni\fab\u001d5vi\u0012|wO\u001c'bi\u000eD\u0007\u0005C\u0005\u0002\u0002\u0001\u0011\r\u0011\"\u0003\u0002\u0004\u0005y1\u000f[;uI><hnQ8n[\u0006tG-F\u0001=\u0011\u001d\t9\u0001\u0001Q\u0001\nq\n\u0001c\u001d5vi\u0012|wO\\\"p[6\fg\u000e\u001a\u0011\t\u000f\u0005-\u0001\u0001\"\u0011\u0002\u000e\u0005\u0019!/\u001e8\u0015\u0005\u0005=\u0001c\u0001\u0011\u0002\u0012%\u0019\u00111C\u0011\u0003\tUs\u0017\u000e\u001e\u0005\b\u0003/\u0001A\u0011AA\r\u0003!\u0019\b.\u001e;e_^tWCAA\b\u0011\u001d\ti\u0002\u0001C\u0005\u0003\u001b\tQ\u0002\u001d:pG\u0016\u001c8/\u0012<f]R\u001c\bbBA\u0011\u0001\u0011\u0005\u00111E\u0001\fiJLHk\u001c%b]\u0012dW\r\u0006\u0003\u0002\u0010\u0005\u0015\u0002\u0002CA\u0014\u0003?\u0001\r!!\u000b\u0002\r\u00154XM\u001c;t!\u0015\tY#a\u000f=\u001d\u0011\ti#a\u000e\u000f\t\u0005=\u0012QG\u0007\u0003\u0003cQ1!a\r\t\u0003\u0019a$o\\8u}%\t!%C\u0002\u0002:\u0005\nq\u0001]1dW\u0006<W-\u0003\u0003\u0002>\u0005}\"aA*fc*\u0019\u0011\u0011H\u0011")
public class ProducerSendThread<K, V>
extends Thread
implements Logging,
KafkaMetricsGroup,
ScalaObject {
    private final String threadName;
    private final BlockingQueue<KeyedMessage<K, V>> queue;
    private final EventHandler<K, V> handler;
    private final long queueTime;
    private final int batchSize;
    private final String clientId;
    private final CountDownLatch shutdownLatch;
    private final KeyedMessage<K, V> kafka$producer$async$ProducerSendThread$$shutdownCommand;
    private final String loggerName;
    private final Logger logger;
    private String logIdent;
    private final Log4jController$ kafka$utils$Logging$$log4jController;
    public volatile int bitmap$0;

    public String threadName() {
        return this.threadName;
    }

    public BlockingQueue<KeyedMessage<K, V>> queue() {
        return this.queue;
    }

    public EventHandler<K, V> handler() {
        return this.handler;
    }

    public long queueTime() {
        return this.queueTime;
    }

    public int batchSize() {
        return this.batchSize;
    }

    public String clientId() {
        return this.clientId;
    }

    private CountDownLatch shutdownLatch() {
        return this.shutdownLatch;
    }

    public final KeyedMessage<K, V> kafka$producer$async$ProducerSendThread$$shutdownCommand() {
        return this.kafka$producer$async$ProducerSendThread$$shutdownCommand;
    }

    @Override
    public void run() {
        try {
            this.processEvents();
            this.shutdownLatch().countDown();
        }
        catch (Throwable throwable) {
            this.error((Function0<String>)new $anonfun$run$1(this), (Function0<Throwable>)new $anonfun$run$2(this, throwable));
        }
        return;
        {
            finally {
                this.shutdownLatch().countDown();
            }
        }
    }

    public void shutdown() {
        this.info((Function0<String>)new $anonfun$shutdown$1(this));
        this.queue().put(this.kafka$producer$async$ProducerSendThread$$shutdownCommand());
        this.shutdownLatch().await();
        this.info((Function0<String>)new $anonfun$shutdown$2(this));
    }

    private void processEvents() {
        LongRef lastSend$1 = new LongRef(SystemTime$.MODULE$.milliseconds());
        ObjectRef events$1 = new ObjectRef((Object)new ArrayBuffer());
        BooleanRef full$1 = new BooleanRef(false);
        package$.MODULE$.Stream().continually((Function0)new $anonfun$processEvents$1(this, lastSend$1)).takeWhile((Function1)new $anonfun$processEvents$2(this)).foreach((Function1)new $anonfun$processEvents$3(this, lastSend$1, events$1, full$1));
        this.tryToHandle((Seq<KeyedMessage<K, V>>)((ArrayBuffer)events$1.elem));
        if (this.queue().size() > 0) {
            throw new IllegalQueueStateException(Predef$.MODULE$.augmentString("Invalid queue state! After queue shutdown, %d remaining items in the queue").format((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToInteger((int)this.queue().size())})));
        }
    }

    public void tryToHandle(Seq<KeyedMessage<K, V>> events) {
        int size$1 = events.size();
        try {
            this.debug((Function0<String>)new $anonfun$tryToHandle$1(this, size$1));
            if (size$1 > 0) {
                this.handler().handle(events);
            }
        }
        catch (Throwable throwable) {
            this.error((Function0<String>)new $anonfun$tryToHandle$2(this, size$1), (Function0<Throwable>)new $anonfun$tryToHandle$3(this, throwable));
        }
    }

    public ProducerSendThread(String threadName, BlockingQueue<KeyedMessage<K, V>> queue, EventHandler<K, V> handler, long queueTime, int batchSize, String clientId) {
        this.threadName = threadName;
        this.queue = queue;
        this.handler = handler;
        this.queueTime = queueTime;
        this.batchSize = batchSize;
        this.clientId = clientId;
        super(threadName);
        Logging$class.$init$(this);
        KafkaMetricsGroup$class.$init$(this);
        this.shutdownLatch = new CountDownLatch(1);
        this.kafka$producer$async$ProducerSendThread$$shutdownCommand = new KeyedMessage<Object, Object>("shutdown", null, null);
        this.newGauge(new StringBuilder().append((Object)clientId).append((Object)"-ProducerQueueSize").toString(), new $anon$1(this));
    }
}

