/*
 * Decompiled with CFR 0.152.
 */
package com.tresata.spark.kafka;

import com.tresata.spark.kafka.Broker;
import com.tresata.spark.kafka.Broker$;
import com.tresata.spark.kafka.KafkaRDD;
import com.tresata.spark.kafka.SimpleConsumerConfig;
import java.net.ConnectException;
import java.util.Properties;
import kafka.api.OffsetRequest;
import kafka.api.OffsetRequest$;
import kafka.api.PartitionMetadata;
import kafka.api.PartitionOffsetRequestInfo;
import kafka.api.PartitionOffsetsResponse;
import kafka.api.TopicMetadata;
import kafka.api.TopicMetadataRequest;
import kafka.common.BrokerNotAvailableException;
import kafka.common.ErrorMapping$;
import kafka.common.LeaderNotAvailableException;
import kafka.common.NotLeaderForPartitionException;
import kafka.common.TopicAndPartition;
import kafka.consumer.SimpleConsumer;
import kafka.producer.KeyedMessage;
import kafka.producer.Producer;
import kafka.producer.ProducerConfig;
import org.apache.spark.SparkContext;
import org.apache.spark.TaskContext;
import org.apache.spark.rdd.RDD;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Function0;
import scala.Function1;
import scala.Function2;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Option$;
import scala.Predef;
import scala.Predef$;
import scala.Serializable;
import scala.Some;
import scala.StringContext;
import scala.Tuple2;
import scala.collection.GenSet;
import scala.collection.GenTraversableOnce;
import scala.collection.Iterable;
import scala.collection.Iterable$;
import scala.collection.IterableLike;
import scala.collection.Iterator;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.TraversableOnce;
import scala.collection.generic.CanBuildFrom;
import scala.collection.immutable.List;
import scala.collection.immutable.List$;
import scala.collection.immutable.Map;
import scala.collection.immutable.Map$;
import scala.collection.mutable.StringBuilder;
import scala.collection.parallel.ParIterableLike;
import scala.collection.parallel.immutable.ParMap$;
import scala.math.Ordering;
import scala.reflect.ClassTag$;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.sys.package$;
import scala.util.Random$;

public final class KafkaRDD$
implements Serializable {
    public static final KafkaRDD$ MODULE$;
    private final Logger com$tresata$spark$kafka$KafkaRDD$$log;

    static {
        new KafkaRDD$();
    }

    public Logger com$tresata$spark$kafka$KafkaRDD$$log() {
        return this.com$tresata$spark$kafka$KafkaRDD$$log;
    }

    public SimpleConsumer com$tresata$spark$kafka$KafkaRDD$$simpleConsumer(Broker broker, SimpleConsumerConfig config) {
        return new SimpleConsumer(broker.host(), broker.port(), config.socketTimeoutMs(), config.socketReceiveBufferBytes(), config.clientId());
    }

    public Map<Object, Option<Broker>> com$tresata$spark$kafka$KafkaRDD$$partitionLeaders(String topic, SimpleConsumer consumer) {
        TopicMetadata topicMeta = (TopicMetadata)consumer.send(new TopicMetadataRequest((Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{topic})), 0)).topicsMetadata().head();
        ErrorMapping$.MODULE$.maybeThrowException(topicMeta.errorCode());
        return ((TraversableOnce)topicMeta.partitionsMetadata().map((Function1)new Serializable(){
            public static final long serialVersionUID = 0L;

            public final Tuple2<Object, Option<Broker>> apply(PartitionMetadata partitionMeta) {
                ErrorMapping$.MODULE$.maybeThrowException(partitionMeta.errorCode());
                return new Tuple2((Object)BoxesRunTime.boxToInteger((int)partitionMeta.partitionId()), (Object)partitionMeta.leader().map((Function1)new Serializable(this){
                    public static final long serialVersionUID = 0L;

                    public final Broker apply(kafka.cluster.Broker b) {
                        return new Broker(b.host(), b.port());
                    }
                }));
            }
        }, Seq$.MODULE$.canBuildFrom())).toMap(Predef$.MODULE$.$conforms());
    }

    public Map<Object, Option<Broker>> com$tresata$spark$kafka$KafkaRDD$$partitionLeaders(String topic, Iterable<Broker> brokers, SimpleConsumerConfig config) {
        Iterator it = ((IterableLike)((IterableLike)Random$.MODULE$.shuffle(brokers, Iterable$.MODULE$.canBuildFrom())).take(5)).iterator().flatMap((Function1)new Serializable(topic, config){
            public static final long serialVersionUID = 0L;
            private final String topic$1;
            private final SimpleConsumerConfig config$1;

            public final Iterable<Map<Object, Option<Broker>>> apply(Broker broker) {
                Iterable iterable;
                try (SimpleConsumer consumer = KafkaRDD$.MODULE$.com$tresata$spark$kafka$KafkaRDD$$simpleConsumer(broker, this.config$1);){
                    try {
                        iterable = Option$.MODULE$.option2Iterable((Option)new Some(KafkaRDD$.MODULE$.com$tresata$spark$kafka$KafkaRDD$$partitionLeaders(this.topic$1, consumer)));
                    }
                    catch (ConnectException connectException) {
                        KafkaRDD$.MODULE$.com$tresata$spark$kafka$KafkaRDD$$log().warn("connection failed for broker {}", new Object[]{broker});
                        iterable = Option$.MODULE$.option2Iterable((Option)None$.MODULE$);
                    }
                }
                return iterable;
            }
            {
                this.topic$1 = topic$1;
                this.config$1 = config$1;
            }
        });
        if (it.hasNext()) {
            return (Map)it.next();
        }
        throw new BrokerNotAvailableException("operation failed for all brokers");
    }

    public long com$tresata$spark$kafka$KafkaRDD$$partitionOffset(TopicAndPartition tap, long time, SimpleConsumer consumer) {
        PartitionOffsetsResponse partitionOffsetsResponse = (PartitionOffsetsResponse)consumer.getOffsetsBefore(new OffsetRequest((Map)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)tap), (Object)new PartitionOffsetRequestInfo(time, 1))})), OffsetRequest$.MODULE$.apply$default$2(), OffsetRequest$.MODULE$.apply$default$3(), OffsetRequest$.MODULE$.apply$default$4(), OffsetRequest$.MODULE$.apply$default$5())).partitionErrorAndOffsets().apply((Object)tap);
        ErrorMapping$.MODULE$.maybeThrowException(partitionOffsetsResponse.error());
        return BoxesRunTime.unboxToLong((Object)partitionOffsetsResponse.offsets().head());
    }

    public Map<Object, Object> com$tresata$spark$kafka$KafkaRDD$$partitionOffsets(String topic, long time, Map<Object, Option<Broker>> leaders, SimpleConsumerConfig config) {
        return (Map)((ParIterableLike)((ParIterableLike)leaders.par()).map((Function1)new Serializable(topic, time, config){
            public static final long serialVersionUID = 0L;
            private final String topic$2;
            private final long time$1;
            private final SimpleConsumerConfig config$2;

            /*
             * WARNING - void declaration
             * Enabled force condition propagation
             * Lifted jumps to return sites
             */
            public final Tuple2<Object, Object> apply(Tuple2<Object, Option<Broker>> x0$1) {
                Tuple2.mcIJ.sp sp2;
                Tuple2<Object, Option<Broker>> tuple2 = x0$1;
                if (tuple2 != null) {
                    int partition = tuple2._1$mcI$sp();
                    Option option = (Option)tuple2._2();
                    if (None$.MODULE$.equals(option)) {
                        throw new LeaderNotAvailableException(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"no leader for partition ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToInteger((int)partition)})));
                    }
                }
                if (tuple2 == null) throw new MatchError(tuple2);
                int partition = tuple2._1$mcI$sp();
                Option option = (Option)tuple2._2();
                if (!(option instanceof Some)) throw new MatchError(tuple2);
                Some some = (Some)option;
                Broker leader = (Broker)some.x();
                SimpleConsumer consumer = KafkaRDD$.MODULE$.com$tresata$spark$kafka$KafkaRDD$$simpleConsumer(leader, this.config$2);
                try {
                    sp2 = new Tuple2.mcIJ.sp(partition, KafkaRDD$.MODULE$.com$tresata$spark$kafka$KafkaRDD$$partitionOffset(new TopicAndPartition(this.topic$2, partition), this.time$1, consumer));
                }
                catch (Throwable throwable) {
                    void var10_9;
                    var10_9.close();
                    throw throwable;
                }
                consumer.close();
                return sp2;
            }
            {
                this.topic$2 = topic$2;
                this.time$1 = time$1;
                this.config$2 = config$2;
            }
        }, (CanBuildFrom)ParMap$.MODULE$.canBuildFrom())).seq();
    }

    private <E> E retryIfNoLeader(Function0<E> e, SimpleConsumerConfig config) {
        return (E)this.attempt$1(e, this.attempt$default$2$1(), config);
    }

    public List<Broker> com$tresata$spark$kafka$KafkaRDD$$brokerList(SimpleConsumerConfig config) {
        return (List)Predef$.MODULE$.refArrayOps((Object[])config.metadataBrokerList().split(",")).toList().map((Function1)new Serializable(){
            public static final long serialVersionUID = 0L;

            public final Broker apply(String s) {
                return Broker$.MODULE$.apply(s);
            }
        }, List$.MODULE$.canBuildFrom());
    }

    public KafkaRDD apply(SparkContext sc, String topic, Map<Object, Tuple2<Object, Object>> offsets, SimpleConsumerConfig config) {
        return new KafkaRDD(sc, topic, offsets, config);
    }

    public KafkaRDD apply(SparkContext sc, String topic, Map<Object, Object> startOffsets2, long stopTime, SimpleConsumerConfig config) {
        List<Broker> brokers = this.com$tresata$spark$kafka$KafkaRDD$$brokerList(config);
        Map stopOffsets2 = (Map)this.retryIfNoLeader((Function0)new Serializable(topic, stopTime, config, brokers){
            public static final long serialVersionUID = 0L;
            private final String topic$3;
            private final long stopTime$1;
            private final SimpleConsumerConfig config$4;
            private final List brokers$1;

            public final Map<Object, Object> apply() {
                Map<Object, Option<Broker>> leaders = KafkaRDD$.MODULE$.com$tresata$spark$kafka$KafkaRDD$$partitionLeaders(this.topic$3, (Iterable<Broker>)this.brokers$1, this.config$4);
                return KafkaRDD$.MODULE$.com$tresata$spark$kafka$KafkaRDD$$partitionOffsets(this.topic$3, this.stopTime$1, leaders, this.config$4);
            }
            {
                this.topic$3 = topic$3;
                this.stopTime$1 = stopTime$1;
                this.config$4 = config$4;
                this.brokers$1 = brokers$1;
            }
        }, config);
        Predef$.MODULE$.require(stopOffsets2.keySet().subsetOf((GenSet)startOffsets2.keySet()), (Function0)new Serializable(startOffsets2, stopOffsets2){
            public static final long serialVersionUID = 0L;
            private final Map startOffsets$1;
            private final Map stopOffsets$1;

            public final String apply() {
                return new StringBuilder().append((Object)"missing start offset for partition(s) ").append((Object)((TraversableOnce)((TraversableOnce)this.stopOffsets$1.keySet().$minus$minus((GenTraversableOnce)this.startOffsets$1.keySet())).toList().sorted((Ordering)Ordering.Int$.MODULE$)).mkString(", ")).toString();
            }
            {
                this.startOffsets$1 = startOffsets$1;
                this.stopOffsets$1 = stopOffsets$1;
            }
        });
        Map offsets = (Map)stopOffsets2.map((Function1)new Serializable(startOffsets2){
            public static final long serialVersionUID = 0L;
            private final Map startOffsets$1;

            public final Tuple2<Object, Tuple2<Object, Object>> apply(Tuple2<Object, Object> x0$2) {
                Tuple2<Object, Object> tuple2 = x0$2;
                if (tuple2 != null) {
                    int partition = tuple2._1$mcI$sp();
                    long stopOffset = tuple2._2$mcJ$sp();
                    Tuple2 tuple22 = new Tuple2((Object)BoxesRunTime.boxToInteger((int)partition), (Object)new Tuple2.mcJJ.sp(BoxesRunTime.unboxToLong((Object)this.startOffsets$1.apply((Object)BoxesRunTime.boxToInteger((int)partition))), stopOffset));
                    return tuple22;
                }
                throw new MatchError(tuple2);
            }
            {
                this.startOffsets$1 = startOffsets$1;
            }
        }, Map$.MODULE$.canBuildFrom());
        return new KafkaRDD(sc, topic, (Map<Object, Tuple2<Object, Object>>)offsets, config);
    }

    public KafkaRDD apply(SparkContext sc, String topic, long startTime, long stopTime, SimpleConsumerConfig config) {
        List<Broker> brokers = this.com$tresata$spark$kafka$KafkaRDD$$brokerList(config);
        Tuple2 tuple2 = (Tuple2)this.retryIfNoLeader((Function0)new Serializable(topic, startTime, stopTime, config, brokers){
            public static final long serialVersionUID = 0L;
            private final String topic$4;
            private final long startTime$1;
            private final long stopTime$2;
            private final SimpleConsumerConfig config$5;
            private final List brokers$2;

            public final Tuple2<Map<Object, Object>, Map<Object, Object>> apply() {
                Map<Object, Option<Broker>> leaders = KafkaRDD$.MODULE$.com$tresata$spark$kafka$KafkaRDD$$partitionLeaders(this.topic$4, (Iterable<Broker>)this.brokers$2, this.config$5);
                return new Tuple2(KafkaRDD$.MODULE$.com$tresata$spark$kafka$KafkaRDD$$partitionOffsets(this.topic$4, this.startTime$1, leaders, this.config$5), KafkaRDD$.MODULE$.com$tresata$spark$kafka$KafkaRDD$$partitionOffsets(this.topic$4, this.stopTime$2, leaders, this.config$5));
            }
            {
                this.topic$4 = topic$4;
                this.startTime$1 = startTime$1;
                this.stopTime$2 = stopTime$2;
                this.config$5 = config$5;
                this.brokers$2 = brokers$2;
            }
        }, config);
        if (tuple2 != null) {
            Tuple2 tuple22;
            Map startOffsets2 = (Map)tuple2._1();
            Map stopOffsets2 = (Map)tuple2._2();
            Tuple2 tuple23 = tuple22 = new Tuple2((Object)startOffsets2, (Object)stopOffsets2);
            Map startOffsets3 = (Map)tuple23._1();
            Map stopOffsets3 = (Map)tuple23._2();
            Map offsets = (Map)startOffsets3.map((Function1)new Serializable(stopOffsets3){
                public static final long serialVersionUID = 0L;
                private final Map stopOffsets$2;

                public final Tuple2<Object, Tuple2<Object, Object>> apply(Tuple2<Object, Object> x0$3) {
                    Tuple2<Object, Object> tuple2 = x0$3;
                    if (tuple2 != null) {
                        int partition = tuple2._1$mcI$sp();
                        long startOffset = tuple2._2$mcJ$sp();
                        Tuple2 tuple22 = new Tuple2((Object)BoxesRunTime.boxToInteger((int)partition), (Object)new Tuple2.mcJJ.sp(startOffset, BoxesRunTime.unboxToLong((Object)this.stopOffsets$2.apply((Object)BoxesRunTime.boxToInteger((int)partition)))));
                        return tuple22;
                    }
                    throw new MatchError(tuple2);
                }
                {
                    this.stopOffsets$2 = stopOffsets$2;
                }
            }, Map$.MODULE$.canBuildFrom());
            return new KafkaRDD(sc, topic, (Map<Object, Tuple2<Object, Object>>)offsets, config);
        }
        throw new MatchError((Object)tuple2);
    }

    public <K, V> void writeWithKeysToKafka(RDD<Tuple2<K, V>> rdd, String topic, ProducerConfig config) {
        Properties props = config.props().props();
        rdd.context().runJob(rdd, (Function2)new Serializable(topic, props){
            public static final long serialVersionUID = 0L;
            private final String topic$5;
            private final Properties props$1;

            public final void apply(TaskContext context, Iterator<Tuple2<K, V>> iter) {
                KafkaRDD$.MODULE$.com$tresata$spark$kafka$KafkaRDD$$write$1(context, iter, this.topic$5, this.props$1);
            }
            {
                this.topic$5 = topic$5;
                this.props$1 = props$1;
            }
        }, ClassTag$.MODULE$.Unit());
    }

    public <V> void writeToKafka(RDD<V> rdd, String topic, ProducerConfig config) {
        this.writeWithKeysToKafka(rdd.map((Function1)new Serializable(){
            public static final long serialVersionUID = 0L;

            public final Tuple2<V, V> apply(V x$2) {
                return new Tuple2(null, x$2);
            }
        }, ClassTag$.MODULE$.apply(Tuple2.class)), topic, config);
    }

    public Tuple2<Map<Object, Object>, Map<Object, Object>> smallestAndLargestOffsets(String topic, SimpleConsumerConfig config) {
        List<Broker> brokers = this.com$tresata$spark$kafka$KafkaRDD$$brokerList(config);
        return (Tuple2)this.retryIfNoLeader((Function0)new Serializable(topic, config, brokers){
            public static final long serialVersionUID = 0L;
            private final String topic$6;
            private final SimpleConsumerConfig config$6;
            private final List brokers$3;

            public final Tuple2<Map<Object, Object>, Map<Object, Object>> apply() {
                Map<Object, Option<Broker>> leaders = KafkaRDD$.MODULE$.com$tresata$spark$kafka$KafkaRDD$$partitionLeaders(this.topic$6, (Iterable<Broker>)this.brokers$3, this.config$6);
                return new Tuple2(KafkaRDD$.MODULE$.com$tresata$spark$kafka$KafkaRDD$$partitionOffsets(this.topic$6, OffsetRequest$.MODULE$.EarliestTime(), leaders, this.config$6), KafkaRDD$.MODULE$.com$tresata$spark$kafka$KafkaRDD$$partitionOffsets(this.topic$6, OffsetRequest$.MODULE$.LatestTime(), leaders, this.config$6));
            }
            {
                this.topic$6 = topic$6;
                this.config$6 = config$6;
                this.brokers$3 = brokers$3;
            }
        }, config);
    }

    private Object readResolve() {
        return MODULE$;
    }

    private final void sleep$1(SimpleConsumerConfig config$3) {
        this.com$tresata$spark$kafka$KafkaRDD$$log().warn("sleeping for {} ms", (Object)BoxesRunTime.boxToInteger((int)config$3.refreshLeaderBackoffMs()));
        Thread.sleep(config$3.refreshLeaderBackoffMs());
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    private final Object attempt$1(Function0 e, int nr, SimpleConsumerConfig config$3) {
        while (true) {
            Object object;
            if (nr >= config$3.refreshLeaderMaxRetries()) {
                object = e.apply();
                return object;
            }
            try {
                object = e.apply();
                return object;
            }
            catch (LeaderNotAvailableException leaderNotAvailableException) {
                this.sleep$1(config$3);
                ++nr;
                continue;
            }
            catch (ConnectException connectException) {
                this.sleep$1(config$3);
                ++nr;
                continue;
            }
            catch (NotLeaderForPartitionException notLeaderForPartitionException) {
                this.sleep$1(config$3);
                ++nr;
                continue;
            }
            break;
        }
    }

    private final int attempt$default$2$1() {
        return 1;
    }

    /*
     * WARNING - void declaration
     */
    public final void com$tresata$spark$kafka$KafkaRDD$$write$1(TaskContext context, Iterator iter, String topic$5, Properties props$1) {
        ProducerConfig config = new ProducerConfig(props$1);
        Producer producer = new Producer(config);
        try {
            iter.foreach((Function1)new Serializable(topic$5, context, producer){
                public static final long serialVersionUID = 0L;
                private final String topic$5;
                private final TaskContext context$1;
                private final Producer producer$1;

                public final void apply(Tuple2<K, V> x0$4) {
                    Tuple2<K, V> tuple2 = x0$4;
                    if (tuple2 != null) {
                        Object key = tuple2._1();
                        Object msg = tuple2._2();
                        if (this.context$1.isInterrupted()) {
                            throw package$.MODULE$.error("interrupted");
                        }
                        this.producer$1.send((Seq)Predef$.MODULE$.wrapRefArray((Object[])new KeyedMessage[]{new KeyedMessage(this.topic$5, key, msg)}));
                        BoxedUnit boxedUnit = BoxedUnit.UNIT;
                        return;
                    }
                    throw new MatchError(tuple2);
                }
                {
                    this.topic$5 = topic$5;
                    this.context$1 = context$1;
                    this.producer$1 = producer$1;
                }
            });
        }
        catch (Throwable throwable) {
            void var6_6;
            var6_6.close();
            throw throwable;
        }
        producer.close();
    }

    private KafkaRDD$() {
        MODULE$ = this;
        this.com$tresata$spark$kafka$KafkaRDD$$log = LoggerFactory.getLogger(this.getClass());
    }
}

