/*
 * Decompiled with CFR 0.152.
 */
package org.apache.spark.streaming.flume;

import java.net.InetSocketAddress;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.avro.ipc.NettyTransceiver;
import org.apache.avro.ipc.Transceiver;
import org.apache.avro.ipc.specific.SpecificRequestor;
import org.apache.spark.Logging;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.flume.FlumeBatchFetcher;
import org.apache.spark.streaming.flume.FlumeConnection;
import org.apache.spark.streaming.flume.SparkFlumeEvent;
import org.apache.spark.streaming.flume.sink.SparkFlumeProtocol;
import org.apache.spark.streaming.receiver.Receiver;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
import org.slf4j.Logger;
import org.spark-project.guava.util.concurrent.ThreadFactoryBuilder;
import scala.Function0;
import scala.Function1;
import scala.Predef$;
import scala.Serializable;
import scala.collection.IterableLike;
import scala.collection.JavaConverters$;
import scala.collection.Seq;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.RichInt$;

@ScalaSignature(bytes="\u0006\u0001\u0005Ma!B\u0001\u0003\u0001\u0011a!\u0001\u0006$mk6,\u0007k\u001c7mS:<'+Z2fSZ,'O\u0003\u0002\u0004\t\u0005)a\r\\;nK*\u0011QAB\u0001\ngR\u0014X-Y7j]\u001eT!a\u0002\u0005\u0002\u000bM\u0004\u0018M]6\u000b\u0005%Q\u0011AB1qC\u000eDWMC\u0001\f\u0003\ry'oZ\n\u0004\u000159\u0002c\u0001\b\u0012'5\tqB\u0003\u0002\u0011\t\u0005A!/Z2fSZ,'/\u0003\u0002\u0013\u001f\tA!+Z2fSZ,'\u000f\u0005\u0002\u0015+5\t!!\u0003\u0002\u0017\u0005\ty1\u000b]1sW\u001acW/\\3Fm\u0016tG\u000f\u0005\u0002\u001935\ta!\u0003\u0002\u001b\r\t9Aj\\4hS:<\u0007\u0002\u0003\u000f\u0001\u0005\u0003\u0005\u000b\u0011\u0002\u0010\u0002\u0013\u0005$GM]3tg\u0016\u001c8\u0001\u0001\t\u0004?%bcB\u0001\u0011'\u001d\t\tC%D\u0001#\u0015\t\u0019S$\u0001\u0004=e>|GOP\u0005\u0002K\u0005)1oY1mC&\u0011q\u0005K\u0001\ba\u0006\u001c7.Y4f\u0015\u0005)\u0013B\u0001\u0016,\u0005\r\u0019V-\u001d\u0006\u0003O!\u0002\"!\f\u001a\u000e\u00039R!a\f\u0019\u0002\u00079,GOC\u00012\u0003\u0011Q\u0017M^1\n\u0005Mr#!E%oKR\u001cvnY6fi\u0006#GM]3tg\"AQ\u0007\u0001B\u0001B\u0003%a'\u0001\u0007nCb\u0014\u0015\r^2i'&TX\r\u0005\u00028q5\t\u0001&\u0003\u0002:Q\t\u0019\u0011J\u001c;\t\u0011m\u0002!\u0011!Q\u0001\nY\n1\u0002]1sC2dW\r\\5t[\"IQ\b\u0001B\u0001B\u0003%a\bR\u0001\rgR|'/Y4f\u0019\u00164X\r\u001c\t\u0003\u007f\tk\u0011\u0001\u0011\u0006\u0003\u0003\u001a\tqa\u001d;pe\u0006<W-\u0003\u0002D\u0001\na1\u000b^8sC\u001e,G*\u001a<fY&\u0011Q(\u0005\u0005\u0006\r\u0002!\taR\u0001\u0007y%t\u0017\u000e\u001e \u0015\u000b!K%j\u0013'\u0011\u0005Q\u0001\u0001\"\u0002\u000fF\u0001\u0004q\u0002\"B\u001bF\u0001\u00041\u0004\"B\u001eF\u0001\u00041\u0004\"B\u001fF\u0001\u0004q\u0004\u0002\u0003(\u0001\u0011\u000b\u0007I\u0011A(\u0002-\rD\u0017M\u001c8fY\u001a\u000b7\r^8ss\u0016CXmY;u_J,\u0012\u0001\u0015\t\u0003#Zk\u0011A\u0015\u0006\u0003'R\u000b!bY8oGV\u0014(/\u001a8u\u0015\t)\u0006'\u0001\u0003vi&d\u0017BA,S\u0005=)\u00050Z2vi>\u00148+\u001a:wS\u000e,\u0007\u0002C-\u0001\u0011\u0003\u0005\u000b\u0015\u0002)\u0002/\rD\u0017M\u001c8fY\u001a\u000b7\r^8ss\u0016CXmY;u_J\u0004\u0003\u0002C.\u0001\u0011\u000b\u0007I\u0011\u0001/\u0002\u001d\rD\u0017M\u001c8fY\u001a\u000b7\r^8ssV\tQ\f\u0005\u0002_S6\tqL\u0003\u0002aC\u0006\u0019a.[8\u000b\u0005\t\u001c\u0017AB:pG.,GO\u0003\u0002eK\u000691\r[1o]\u0016d'B\u00014h\u0003\u0015qW\r\u001e;z\u0015\tA'\"A\u0003kE>\u001c8/\u0003\u0002k?\nib*[8DY&,g\u000e^*pG.,Go\u00115b]:,GNR1di>\u0014\u0018\u0010\u0003\u0005m\u0001!\u0005\t\u0015)\u0003^\u0003=\u0019\u0007.\u00198oK24\u0015m\u0019;pef\u0004\u0003\u0002\u00038\u0001\u0011\u000b\u0007I\u0011A(\u0002!I,7-Z5wKJ,\u00050Z2vi>\u0014\b\u0002\u00039\u0001\u0011\u0003\u0005\u000b\u0015\u0002)\u0002#I,7-Z5wKJ,\u00050Z2vi>\u0014\b\u0005\u0003\u0005s\u0001!\u0015\r\u0011\"\u0003t\u0003-\u0019wN\u001c8fGRLwN\\:\u0016\u0003Q\u00042!U;x\u0013\t1(KA\nMS:\\W\r\u001a\"m_\u000e\\\u0017N\\4Rk\u0016,X\r\u0005\u0002\u0015q&\u0011\u0011P\u0001\u0002\u0010\r2,X.Z\"p]:,7\r^5p]\"A1\u0010\u0001E\u0001B\u0003&A/\u0001\u0007d_:tWm\u0019;j_:\u001c\b\u0005C\u0003~\u0001\u0011\u0005c0A\u0004p]N#\u0018M\u001d;\u0015\u0003}\u00042aNA\u0001\u0013\r\t\u0019\u0001\u000b\u0002\u0005+:LG\u000f\u0003\u0004\u0002\b\u0001!\tE`\u0001\u0007_:\u001cFo\u001c9\t\u000f\u0005-\u0001\u0001\"\u0001\u0003g\u0006qq-\u001a;D_:tWm\u0019;j_:\u001c\b\u0002CA\b\u0001\u0011\u0005!!!\u0005\u0002\u001f\u001d,G/T1y\u0005\u0006$8\r[*ju\u0016,\u0012A\u000e")
public class FlumePollingReceiver
extends Receiver<SparkFlumeEvent>
implements Logging {
    private final Seq<InetSocketAddress> addresses;
    private final int maxBatchSize;
    private final int parallelism;
    private ExecutorService channelFactoryExecutor;
    private NioClientSocketChannelFactory channelFactory;
    private ExecutorService receiverExecutor;
    private LinkedBlockingQueue<FlumeConnection> org$apache$spark$streaming$flume$FlumePollingReceiver$$connections;
    private transient Logger org$apache$spark$Logging$$log_;
    private volatile byte bitmap$0;

    private ExecutorService channelFactoryExecutor$lzycompute() {
        FlumePollingReceiver flumePollingReceiver = this;
        synchronized (flumePollingReceiver) {
            if ((byte)(this.bitmap$0 & 1) == 0) {
                this.channelFactoryExecutor = Executors.newCachedThreadPool(new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Flume Receiver Channel Thread - %d").build());
                this.bitmap$0 = (byte)(this.bitmap$0 | 1);
            }
            return this.channelFactoryExecutor;
        }
    }

    private NioClientSocketChannelFactory channelFactory$lzycompute() {
        FlumePollingReceiver flumePollingReceiver = this;
        synchronized (flumePollingReceiver) {
            if ((byte)(this.bitmap$0 & 2) == 0) {
                this.channelFactory = new NioClientSocketChannelFactory((Executor)this.channelFactoryExecutor(), (Executor)this.channelFactoryExecutor());
                this.bitmap$0 = (byte)(this.bitmap$0 | 2);
            }
            return this.channelFactory;
        }
    }

    private ExecutorService receiverExecutor$lzycompute() {
        FlumePollingReceiver flumePollingReceiver = this;
        synchronized (flumePollingReceiver) {
            if ((byte)(this.bitmap$0 & 4) == 0) {
                this.receiverExecutor = Executors.newFixedThreadPool(this.parallelism, new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Flume Receiver Thread - %d").build());
                this.bitmap$0 = (byte)(this.bitmap$0 | 4);
            }
            return this.receiverExecutor;
        }
    }

    private LinkedBlockingQueue org$apache$spark$streaming$flume$FlumePollingReceiver$$connections$lzycompute() {
        FlumePollingReceiver flumePollingReceiver = this;
        synchronized (flumePollingReceiver) {
            if ((byte)(this.bitmap$0 & 8) == 0) {
                this.org$apache$spark$streaming$flume$FlumePollingReceiver$$connections = new LinkedBlockingQueue();
                this.bitmap$0 = (byte)(this.bitmap$0 | 8);
            }
            return this.org$apache$spark$streaming$flume$FlumePollingReceiver$$connections;
        }
    }

    public Logger org$apache$spark$Logging$$log_() {
        return this.org$apache$spark$Logging$$log_;
    }

    public void org$apache$spark$Logging$$log__$eq(Logger x$1) {
        this.org$apache$spark$Logging$$log_ = x$1;
    }

    public String logName() {
        return Logging.class.logName((Logging)this);
    }

    public Logger log() {
        return Logging.class.log((Logging)this);
    }

    public void logInfo(Function0<String> msg) {
        Logging.class.logInfo((Logging)this, msg);
    }

    public void logDebug(Function0<String> msg) {
        Logging.class.logDebug((Logging)this, msg);
    }

    public void logTrace(Function0<String> msg) {
        Logging.class.logTrace((Logging)this, msg);
    }

    public void logWarning(Function0<String> msg) {
        Logging.class.logWarning((Logging)this, msg);
    }

    public void logError(Function0<String> msg) {
        Logging.class.logError((Logging)this, msg);
    }

    public void logInfo(Function0<String> msg, Throwable throwable) {
        Logging.class.logInfo((Logging)this, msg, (Throwable)throwable);
    }

    public void logDebug(Function0<String> msg, Throwable throwable) {
        Logging.class.logDebug((Logging)this, msg, (Throwable)throwable);
    }

    public void logTrace(Function0<String> msg, Throwable throwable) {
        Logging.class.logTrace((Logging)this, msg, (Throwable)throwable);
    }

    public void logWarning(Function0<String> msg, Throwable throwable) {
        Logging.class.logWarning((Logging)this, msg, (Throwable)throwable);
    }

    public void logError(Function0<String> msg, Throwable throwable) {
        Logging.class.logError((Logging)this, msg, (Throwable)throwable);
    }

    public boolean isTraceEnabled() {
        return Logging.class.isTraceEnabled((Logging)this);
    }

    public ExecutorService channelFactoryExecutor() {
        return (byte)(this.bitmap$0 & 1) == 0 ? this.channelFactoryExecutor$lzycompute() : this.channelFactoryExecutor;
    }

    public NioClientSocketChannelFactory channelFactory() {
        return (byte)(this.bitmap$0 & 2) == 0 ? this.channelFactory$lzycompute() : this.channelFactory;
    }

    public ExecutorService receiverExecutor() {
        return (byte)(this.bitmap$0 & 4) == 0 ? this.receiverExecutor$lzycompute() : this.receiverExecutor;
    }

    public LinkedBlockingQueue<FlumeConnection> org$apache$spark$streaming$flume$FlumePollingReceiver$$connections() {
        return (byte)(this.bitmap$0 & 8) == 0 ? this.org$apache$spark$streaming$flume$FlumePollingReceiver$$connections$lzycompute() : this.org$apache$spark$streaming$flume$FlumePollingReceiver$$connections;
    }

    public void onStart() {
        this.addresses.foreach((Function1)new Serializable(this){
            public static final long serialVersionUID = 0L;
            private final /* synthetic */ FlumePollingReceiver $outer;

            public final boolean apply(InetSocketAddress host) {
                NettyTransceiver transceiver = new NettyTransceiver(host, (ChannelFactory)this.$outer.channelFactory());
                SparkFlumeProtocol.Callback client = (SparkFlumeProtocol.Callback)SpecificRequestor.getClient(SparkFlumeProtocol.Callback.class, (Transceiver)transceiver);
                return this.$outer.org$apache$spark$streaming$flume$FlumePollingReceiver$$connections().add(new FlumeConnection(transceiver, client));
            }
            {
                if ($outer == null) {
                    throw new NullPointerException();
                }
                this.$outer = $outer;
            }
        });
        RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), this.parallelism).foreach((Function1)new Serializable(this){
            public static final long serialVersionUID = 0L;
            private final /* synthetic */ FlumePollingReceiver $outer;

            public final Future<?> apply(int i) {
                this.$outer.logInfo((Function0<String>)new Serializable(this){
                    public static final long serialVersionUID = 0L;

                    public final String apply() {
                        return "Starting Flume Polling Receiver worker threads..";
                    }
                });
                return this.$outer.receiverExecutor().submit(new FlumeBatchFetcher(this.$outer));
            }
            {
                if ($outer == null) {
                    throw new NullPointerException();
                }
                this.$outer = $outer;
            }
        });
    }

    public void onStop() {
        this.logInfo((Function0<String>)new Serializable(this){
            public static final long serialVersionUID = 0L;

            public final String apply() {
                return "Shutting down Flume Polling Receiver";
            }
        });
        this.receiverExecutor().shutdown();
        Object object = this.receiverExecutor().awaitTermination(60L, TimeUnit.SECONDS) ? BoxedUnit.UNIT : this.receiverExecutor().shutdownNow();
        ((IterableLike)JavaConverters$.MODULE$.collectionAsScalaIterableConverter(this.org$apache$spark$streaming$flume$FlumePollingReceiver$$connections()).asScala()).foreach((Function1)new Serializable(this){
            public static final long serialVersionUID = 0L;

            public final void apply(FlumeConnection x$1) {
                x$1.transceiver().close();
            }
        });
        this.channelFactory().releaseExternalResources();
    }

    public LinkedBlockingQueue<FlumeConnection> getConnections() {
        return this.org$apache$spark$streaming$flume$FlumePollingReceiver$$connections();
    }

    public int getMaxBatchSize() {
        return this.maxBatchSize;
    }

    public FlumePollingReceiver(Seq<InetSocketAddress> addresses, int maxBatchSize, int parallelism, StorageLevel storageLevel) {
        this.addresses = addresses;
        this.maxBatchSize = maxBatchSize;
        this.parallelism = parallelism;
        super(storageLevel);
        Logging.class.$init$((Logging)this);
    }
}

