/*
 * Decompiled with CFR 0.152.
 */
package zio.kafka.consumer.internal;

import java.io.Serializable;
import java.time.Duration;
import org.apache.kafka.common.TopicPartition;
import scala.;
import scala.$less$colon$less$;
import scala.Function1;
import scala.MatchError;
import scala.Option;
import scala.Product;
import scala.Tuple2;
import scala.Tuple2$;
import scala.collection.immutable.Seq;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.Nothing$;
import scala.runtime.ScalaRunTime$;
import scala.runtime.Statics;
import scala.runtime.function.JProcedure1;
import scala.util.control.NoStackTrace;
import zio.Chunk;
import zio.Clock$;
import zio.IsSubtypeOfOutput$;
import zio.LogAnnotation;
import zio.LogAnnotation$;
import zio.Promise;
import zio.Queue;
import zio.Ref;
import zio.ZIO;
import zio.ZIO$;
import zio.kafka.consumer.CommittableRecord;
import zio.kafka.consumer.Offset;
import zio.kafka.consumer.diagnostics.Diagnostics;
import zio.kafka.consumer.internal.PartitionStream;
import zio.kafka.consumer.internal.PartitionStreamControl$;
import zio.kafka.consumer.internal.PartitionStreamControl$QueueInfo$;
import zio.stream.Take;
import zio.stream.Take$;
import zio.stream.ZStream;

public final class PartitionStreamControl
extends PartitionStream {
    private final TopicPartition tp;
    private final ZStream stream;
    private final Queue<Take<Throwable, CommittableRecord<byte[], byte[]>>> dataQueue;
    private final Promise<Throwable, Nothing$> interruptionPromise;
    private final Promise completedPromise;
    private final Ref<QueueInfo> queueInfoRef;
    private final Duration maxStreamPullInterval;
    private final long maxStreamPullIntervalNanos;
    private final ZIO.LogAnnotate logAnnotate;
    private final Tuple2 tpStream;

    public static ZIO<Object, Nothing$, PartitionStreamControl> newPartitionStream(TopicPartition topicPartition, ZIO<Object, Nothing$, BoxedUnit> zIO, Diagnostics diagnostics, Duration duration) {
        return PartitionStreamControl$.MODULE$.newPartitionStream(topicPartition, zIO, diagnostics, duration);
    }

    public PartitionStreamControl(TopicPartition tp, ZStream<Object, Throwable, CommittableRecord<byte[], byte[]>> stream, Queue<Take<Throwable, CommittableRecord<byte[], byte[]>>> dataQueue, Promise<Throwable, Nothing$> interruptionPromise, Promise<Nothing$, Option<Offset>> completedPromise, Ref<QueueInfo> queueInfoRef, Duration maxStreamPullInterval) {
        this.tp = tp;
        this.stream = stream;
        this.dataQueue = dataQueue;
        this.interruptionPromise = interruptionPromise;
        this.completedPromise = completedPromise;
        this.queueInfoRef = queueInfoRef;
        this.maxStreamPullInterval = maxStreamPullInterval;
        this.maxStreamPullIntervalNanos = maxStreamPullInterval.toNanos();
        this.logAnnotate = ZIO$.MODULE$.logAnnotate(() -> PartitionStreamControl.$init$$$anonfun$1(tp), (Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new LogAnnotation[]{LogAnnotation$.MODULE$.apply("partition", BoxesRunTime.boxToInteger((int)tp.partition()).toString())}));
        this.tpStream = Tuple2$.MODULE$.apply((Object)tp, stream);
    }

    @Override
    public TopicPartition tp() {
        return this.tp;
    }

    public ZStream<Object, Throwable, CommittableRecord<byte[], byte[]>> stream() {
        return this.stream;
    }

    public Promise<Nothing$, Option<Offset>> completedPromise() {
        return this.completedPromise;
    }

    public ZIO<Object, Nothing$, BoxedUnit> offerRecords(Chunk<CommittableRecord<byte[], byte[]>> data) {
        if (data.isEmpty()) {
            return this.queueInfoRef.update((Function1 & Serializable)_$1 -> _$1.withEmptyPoll(), (Object)"zio.kafka.consumer.internal.PartitionStreamControl.offerRecords(PartitionStreamControl.scala:56)");
        }
        return Clock$.MODULE$.nanoTime((Object)"zio.kafka.consumer.internal.PartitionStreamControl.offerRecords(PartitionStreamControl.scala:59)").map((Function1 & Serializable)now -> this.offerRecords$$anonfun$2(BoxesRunTime.unboxToLong((Object)now)), (Object)"zio.kafka.consumer.internal.PartitionStreamControl.offerRecords(PartitionStreamControl.scala:60)").flatMap((Function1 & Serializable)x$12 -> {
            Tuple2 tuple2 = x$12;
            if (tuple2 != null) {
                long now = tuple2._1$mcJ$sp();
                long newPullDeadline = tuple2._2$mcJ$sp();
                return this.queueInfoRef.update((Function1 & Serializable)_$2 -> _$2.withOffer(newPullDeadline, data.size()), (Object)"zio.kafka.consumer.internal.PartitionStreamControl.offerRecords(PartitionStreamControl.scala:61)").flatMap((Function1 & Serializable)x$1 -> {
                    BoxedUnit boxedUnit = BoxedUnit.UNIT;
                    return this.dataQueue.offer((Object)new Take(Take$.MODULE$.chunk(data)), (Object)"zio.kafka.consumer.internal.PartitionStreamControl.offerRecords(PartitionStreamControl.scala:62)").map((Function1 & Serializable)v1 -> {
                        bl = BoxesRunTime.unboxToBoolean((Object)v1);
                        return BoxedUnit.UNIT;
                    }, (Object)"zio.kafka.consumer.internal.PartitionStreamControl.offerRecords(PartitionStreamControl.scala:63)");
                }, (Object)"zio.kafka.consumer.internal.PartitionStreamControl.offerRecords(PartitionStreamControl.scala:63)");
            }
            throw new MatchError((Object)tuple2);
        }, (Object)"zio.kafka.consumer.internal.PartitionStreamControl.offerRecords(PartitionStreamControl.scala:63)");
    }

    @Override
    public ZIO<Object, Nothing$, Object> queueSize() {
        return this.queueInfoRef.get((Object)"zio.kafka.consumer.internal.PartitionStreamControl.queueSize(PartitionStreamControl.scala:66)").map((Function1 & Serializable)_$3 -> _$3.size(), (Object)"zio.kafka.consumer.internal.PartitionStreamControl.queueSize(PartitionStreamControl.scala:66)");
    }

    public ZIO<Object, Nothing$, Option<Offset>> lastPulledOffset() {
        return this.queueInfoRef.get((Object)"zio.kafka.consumer.internal.PartitionStreamControl.lastPulledOffset(PartitionStreamControl.scala:68)").map((Function1 & Serializable)_$4 -> _$4.lastPulledOffset(), (Object)"zio.kafka.consumer.internal.PartitionStreamControl.lastPulledOffset(PartitionStreamControl.scala:68)");
    }

    public ZIO<Object, Nothing$, Object> outstandingPolls() {
        return this.queueInfoRef.get((Object)"zio.kafka.consumer.internal.PartitionStreamControl.outstandingPolls(PartitionStreamControl.scala:75)").map((Function1 & Serializable)_$5 -> _$5.outstandingPolls(), (Object)"zio.kafka.consumer.internal.PartitionStreamControl.outstandingPolls(PartitionStreamControl.scala:75)");
    }

    public ZIO<Object, Nothing$, Object> maxStreamPullIntervalExceeded(long now) {
        return this.queueInfoRef.get((Object)"zio.kafka.consumer.internal.PartitionStreamControl.maxStreamPullIntervalExceeded(PartitionStreamControl.scala:85)").map((Function1 & Serializable)_$6 -> _$6.deadlineExceeded(now), (Object)"zio.kafka.consumer.internal.PartitionStreamControl.maxStreamPullIntervalExceeded(PartitionStreamControl.scala:85)");
    }

    public ZIO<Object, Nothing$, BoxedUnit> halt() {
        String timeOutMessage = new StringBuilder(59).append("No records were pulled for more than ").append(this.maxStreamPullInterval).append(" for topic partition ").append(this.tp()).append(".").toString();
        NoStackTrace consumeTimeout = new NoStackTrace(timeOutMessage){
            {
                NoStackTrace.$init$((NoStackTrace)this);
            }

            public Throwable scala$util$control$NoStackTrace$$super$fillInStackTrace() {
                return super.fillInStackTrace();
            }
        };
        return this.interruptionPromise.fail((Object)consumeTimeout, (Object)"zio.kafka.consumer.internal.PartitionStreamControl.halt(PartitionStreamControl.scala:91)").unit((Object)"zio.kafka.consumer.internal.PartitionStreamControl.halt(PartitionStreamControl.scala:91)");
    }

    public ZIO<Object, Nothing$, BoxedUnit> lost() {
        return this.logAnnotate.apply(ZIO$.MODULE$.logDebug(this::lost$$anonfun$1, (Object)"zio.kafka.consumer.internal.PartitionStreamControl.lost(PartitionStreamControl.scala:98)").flatMap((Function1 & Serializable)x$1 -> {
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
            return this.dataQueue.takeAll((Object)"zio.kafka.consumer.internal.PartitionStreamControl.lost(PartitionStreamControl.scala:99)").map((Function1 & Serializable)_$7 -> _$7.size(), (Object)"zio.kafka.consumer.internal.PartitionStreamControl.lost(PartitionStreamControl.scala:99)").flatMap((Function1 & Serializable)taken -> this.lost$$anonfun$2$$anonfun$2(BoxesRunTime.unboxToInt((Object)taken)), (Object)"zio.kafka.consumer.internal.PartitionStreamControl.lost(PartitionStreamControl.scala:103)");
        }, (Object)"zio.kafka.consumer.internal.PartitionStreamControl.lost(PartitionStreamControl.scala:103)"), (Object)"zio.kafka.consumer.internal.PartitionStreamControl.lost(PartitionStreamControl.scala:104)");
    }

    public ZIO<Object, Nothing$, BoxedUnit> end() {
        return this.logAnnotate.apply(ZIO$.MODULE$.logDebug(this::end$$anonfun$1, (Object)"zio.kafka.consumer.internal.PartitionStreamControl.end(PartitionStreamControl.scala:109)").$times$greater(this::end$$anonfun$2, (Object)"zio.kafka.consumer.internal.PartitionStreamControl.end(PartitionStreamControl.scala:110)"), (Object)"zio.kafka.consumer.internal.PartitionStreamControl.end(PartitionStreamControl.scala:111)");
    }

    public ZIO<Object, Nothing$, Object> isCompleted() {
        return this.completedPromise().isDone((Object)"zio.kafka.consumer.internal.PartitionStreamControl.isCompleted(PartitionStreamControl.scala:115)");
    }

    public ZIO<Object, Nothing$, Object> isRunning() {
        return this.isCompleted().negate(IsSubtypeOfOutput$.MODULE$.impl((.less.colon.less)$less$colon$less$.MODULE$.refl()), (Object)"zio.kafka.consumer.internal.PartitionStreamControl.isRunning(PartitionStreamControl.scala:119)");
    }

    public Tuple2<TopicPartition, ZStream<Object, Throwable, CommittableRecord<byte[], byte[]>>> tpStream() {
        return this.tpStream;
    }

    private static final LogAnnotation $init$$$anonfun$1(TopicPartition tp$1) {
        return LogAnnotation$.MODULE$.apply("topic", tp$1.topic());
    }

    private final /* synthetic */ Tuple2 offerRecords$$anonfun$2(long now) {
        long newPullDeadline = now + this.maxStreamPullIntervalNanos;
        return new Tuple2.mcJJ.sp(now, newPullDeadline);
    }

    private final String lost$$anonfun$1() {
        return new StringBuilder(15).append("Partition ").append(this.tp().toString()).append(" lost").toString();
    }

    private static final String lost$$anonfun$2$$anonfun$2$$anonfun$1$$anonfun$2$$anonfun$1(int taken$3) {
        return new StringBuilder(34).append("Ignored ").append(taken$3).append(" records on lost partition").toString();
    }

    private static final boolean lost$$anonfun$2$$anonfun$2$$anonfun$1$$anonfun$2$$anonfun$2(int taken$4) {
        return taken$4 != 0;
    }

    private final /* synthetic */ ZIO lost$$anonfun$2$$anonfun$2$$anonfun$1(int taken$1, boolean x$1) {
        boolean bl = x$1;
        return this.queueInfoRef.update((Function1 & Serializable)_$8 -> _$8.withQueueClearedOnLost(), (Object)"zio.kafka.consumer.internal.PartitionStreamControl.lost(PartitionStreamControl.scala:101)").flatMap((Function1 & Serializable)x$12 -> {
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
            return ZIO$.MODULE$.logDebug(() -> PartitionStreamControl.lost$$anonfun$2$$anonfun$2$$anonfun$1$$anonfun$2$$anonfun$1(taken$1), (Object)"zio.kafka.consumer.internal.PartitionStreamControl.lost(PartitionStreamControl.scala:102)").when(() -> PartitionStreamControl.lost$$anonfun$2$$anonfun$2$$anonfun$1$$anonfun$2$$anonfun$2(taken$1), (Object)"zio.kafka.consumer.internal.PartitionStreamControl.lost(PartitionStreamControl.scala:102)").map((Function1)(JProcedure1 & Serializable)x$1 -> {
                Option option = x$1;
            }, (Object)"zio.kafka.consumer.internal.PartitionStreamControl.lost(PartitionStreamControl.scala:103)");
        }, (Object)"zio.kafka.consumer.internal.PartitionStreamControl.lost(PartitionStreamControl.scala:103)");
    }

    private final /* synthetic */ ZIO lost$$anonfun$2$$anonfun$2(int taken) {
        return this.dataQueue.offer((Object)new Take(Take$.MODULE$.end()), (Object)"zio.kafka.consumer.internal.PartitionStreamControl.lost(PartitionStreamControl.scala:100)").flatMap((Function1 & Serializable)x$1 -> this.lost$$anonfun$2$$anonfun$2$$anonfun$1(taken, BoxesRunTime.unboxToBoolean((Object)x$1)), (Object)"zio.kafka.consumer.internal.PartitionStreamControl.lost(PartitionStreamControl.scala:103)");
    }

    private final String end$$anonfun$1() {
        return new StringBuilder(17).append("Partition ").append(this.tp().toString()).append(" ending").toString();
    }

    private final ZIO end$$anonfun$2() {
        return this.dataQueue.offer((Object)new Take(Take$.MODULE$.end()), (Object)"zio.kafka.consumer.internal.PartitionStreamControl.end(PartitionStreamControl.scala:110)").unit((Object)"zio.kafka.consumer.internal.PartitionStreamControl.end(PartitionStreamControl.scala:110)");
    }

    public static final class QueueInfo
    implements Product,
    Serializable {
        private final long pullDeadline;
        private final int size;
        private final Option lastPulledOffset;
        private final int outstandingPolls;

        public static QueueInfo apply(long l, int n, Option<Offset> option, int n2) {
            return PartitionStreamControl$QueueInfo$.MODULE$.apply(l, n, option, n2);
        }

        public static QueueInfo fromProduct(Product product) {
            return PartitionStreamControl$QueueInfo$.MODULE$.fromProduct(product);
        }

        public static QueueInfo unapply(QueueInfo queueInfo) {
            return PartitionStreamControl$QueueInfo$.MODULE$.unapply(queueInfo);
        }

        public QueueInfo(long pullDeadline, int size, Option<Offset> lastPulledOffset, int outstandingPolls) {
            this.pullDeadline = pullDeadline;
            this.size = size;
            this.lastPulledOffset = lastPulledOffset;
            this.outstandingPolls = outstandingPolls;
        }

        public int hashCode() {
            int n = -889275714;
            n = Statics.mix((int)n, (int)this.productPrefix().hashCode());
            n = Statics.mix((int)n, (int)Statics.longHash((long)this.pullDeadline()));
            n = Statics.mix((int)n, (int)this.size());
            n = Statics.mix((int)n, (int)Statics.anyHash(this.lastPulledOffset()));
            n = Statics.mix((int)n, (int)this.outstandingPolls());
            return Statics.finalizeHash((int)n, (int)4);
        }

        /*
         * Enabled force condition propagation
         * Lifted jumps to return sites
         */
        public boolean equals(Object x$0) {
            if (this == x$0) return true;
            Object object = x$0;
            if (!(object instanceof QueueInfo)) return false;
            QueueInfo queueInfo = (QueueInfo)object;
            if (this.size() != queueInfo.size()) return false;
            if (this.outstandingPolls() != queueInfo.outstandingPolls()) return false;
            if (this.pullDeadline() != queueInfo.pullDeadline()) return false;
            Option<Offset> option = this.lastPulledOffset();
            Option<Offset> option2 = queueInfo.lastPulledOffset();
            if (option != null) {
                if (!option.equals(option2)) return false;
                return true;
            }
            if (option2 == null) return true;
            return false;
        }

        public String toString() {
            return ScalaRunTime$.MODULE$._toString((Product)this);
        }

        public boolean canEqual(Object that) {
            return that instanceof QueueInfo;
        }

        public int productArity() {
            return 4;
        }

        public String productPrefix() {
            return "QueueInfo";
        }

        public Object productElement(int n) {
            int n2 = n;
            switch (n2) {
                case 0: {
                    return BoxesRunTime.boxToLong((long)this._1());
                }
                case 1: {
                    return BoxesRunTime.boxToInteger((int)this._2());
                }
                case 2: {
                    return this._3();
                }
                case 3: {
                    return BoxesRunTime.boxToInteger((int)this._4());
                }
            }
            throw new IndexOutOfBoundsException(BoxesRunTime.boxToInteger((int)n).toString());
        }

        public String productElementName(int n) {
            int n2 = n;
            switch (n2) {
                case 0: {
                    return "pullDeadline";
                }
                case 1: {
                    return "size";
                }
                case 2: {
                    return "lastPulledOffset";
                }
                case 3: {
                    return "outstandingPolls";
                }
            }
            throw new IndexOutOfBoundsException(BoxesRunTime.boxToInteger((int)n).toString());
        }

        public long pullDeadline() {
            return this.pullDeadline;
        }

        public int size() {
            return this.size;
        }

        public Option<Offset> lastPulledOffset() {
            return this.lastPulledOffset;
        }

        public int outstandingPolls() {
            return this.outstandingPolls;
        }

        public QueueInfo withEmptyPoll() {
            int n = this.outstandingPolls() + 1;
            long l = this.copy$default$1();
            int n2 = this.copy$default$2();
            Option<Offset> option = this.copy$default$3();
            return this.copy(l, n2, option, n);
        }

        public QueueInfo withOffer(long newPullDeadline, int recordCount) {
            return PartitionStreamControl$QueueInfo$.MODULE$.apply(this.size() <= 0 ? newPullDeadline : this.pullDeadline(), this.size() + recordCount, this.lastPulledOffset(), this.outstandingPolls() + 1);
        }

        public QueueInfo withPull(long newPullDeadline, Chunk<CommittableRecord<byte[], byte[]>> records) {
            return PartitionStreamControl$QueueInfo$.MODULE$.apply(newPullDeadline, this.size() - records.size(), (Option<Offset>)records.lastOption().map(PartitionStreamControl$::zio$kafka$consumer$internal$PartitionStreamControl$QueueInfo$$_$withPull$$anonfun$1).orElse(this::withPull$$anonfun$2), 0);
        }

        public QueueInfo withQueueClearedOnLost() {
            return this.copy(this.copy$default$1(), 0, this.copy$default$3(), this.copy$default$4());
        }

        public boolean deadlineExceeded(long now) {
            return this.size() > 0 && this.pullDeadline() <= now;
        }

        public QueueInfo copy(long pullDeadline, int size, Option<Offset> lastPulledOffset, int outstandingPolls) {
            return new QueueInfo(pullDeadline, size, lastPulledOffset, outstandingPolls);
        }

        public long copy$default$1() {
            return this.pullDeadline();
        }

        public int copy$default$2() {
            return this.size();
        }

        public Option<Offset> copy$default$3() {
            return this.lastPulledOffset();
        }

        public int copy$default$4() {
            return this.outstandingPolls();
        }

        public long _1() {
            return this.pullDeadline();
        }

        public int _2() {
            return this.size();
        }

        public Option<Offset> _3() {
            return this.lastPulledOffset();
        }

        public int _4() {
            return this.outstandingPolls();
        }

        private final Option withPull$$anonfun$2() {
            return this.lastPulledOffset();
        }
    }
}

