/*
 * Decompiled with CFR 0.152.
 */
package org.opendaylight.controller.cluster.access.client;

import akka.actor.ActorRef;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Verify;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.Serializable;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Deque;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Queue;
import org.opendaylight.controller.cluster.access.client.AveragingProgressTracker;
import org.opendaylight.controller.cluster.access.client.BackendInfo;
import org.opendaylight.controller.cluster.access.client.ConnectionEntry;
import org.opendaylight.controller.cluster.access.client.ReconnectForwarder;
import org.opendaylight.controller.cluster.access.client.TransmittedConnectionEntry;
import org.opendaylight.controller.cluster.access.concepts.Request;
import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope;
import org.opendaylight.controller.cluster.access.concepts.RequestException;
import org.opendaylight.controller.cluster.access.concepts.Response;
import org.opendaylight.controller.cluster.access.concepts.ResponseEnvelope;
import org.opendaylight.controller.cluster.access.concepts.RuntimeRequestException;
import org.opendaylight.controller.cluster.access.concepts.SliceableMessage;
import org.opendaylight.controller.cluster.messaging.MessageSlicer;
import org.opendaylight.controller.cluster.messaging.SliceOptions;
import org.opendaylight.yangtools.concepts.Identifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/*
 * Uses 'sealed' constructs - enablewith --sealed true
 */
abstract class TransmitQueue {
    private static final Logger LOG = LoggerFactory.getLogger(TransmitQueue.class);
    private final Deque<TransmittedConnectionEntry> inflight = new ArrayDeque<TransmittedConnectionEntry>();
    private final Deque<ConnectionEntry> pending = new ArrayDeque<ConnectionEntry>();
    private final AveragingProgressTracker tracker;
    private ReconnectForwarder successor;

    TransmitQueue(int targetDepth) {
        this.tracker = new AveragingProgressTracker(targetDepth);
    }

    TransmitQueue(TransmitQueue oldQueue, int targetDepth, long now) {
        this.tracker = new AveragingProgressTracker(oldQueue.tracker, targetDepth, now);
    }

    TransmitQueue(TransmitQueue oldQueue, long now) {
        this.tracker = new AveragingProgressTracker(oldQueue.tracker, now);
    }

    void cancelDebt(long now) {
        this.tracker.cancelDebt(now);
    }

    final Collection<ConnectionEntry> drain() {
        ArrayDeque<ConnectionEntry> ret = new ArrayDeque<ConnectionEntry>(this.inflight.size() + this.pending.size());
        ret.addAll(this.inflight);
        ret.addAll(this.pending);
        this.inflight.clear();
        this.pending.clear();
        return ret;
    }

    final long ticksStalling(long now) {
        return this.tracker.ticksStalling(now);
    }

    final boolean hasSuccessor() {
        return this.successor != null;
    }

    final Optional<TransmittedConnectionEntry> complete(ResponseEnvelope<?> envelope, long now) {
        this.preComplete(envelope);
        Optional<TransmittedConnectionEntry> maybeEntry = TransmitQueue.findMatchingEntry(this.inflight, envelope);
        if (maybeEntry == null) {
            LOG.debug("Request for {} not found in inflight queue, checking pending queue", envelope);
            maybeEntry = TransmitQueue.findMatchingEntry(this.pending, envelope);
        }
        if (maybeEntry == null || !maybeEntry.isPresent()) {
            LOG.warn("No request matching {} found, ignoring response", envelope);
            return Optional.empty();
        }
        TransmittedConnectionEntry entry = maybeEntry.orElseThrow();
        this.tracker.closeTask(now, entry.getEnqueuedTicks(), entry.getTxTicks(), envelope.getExecutionTimeNanos());
        this.tryTransmit(now);
        return Optional.of(entry);
    }

    final void tryTransmit(long now) {
        int toSend = this.canTransmitCount(this.inflight.size());
        if (toSend > 0 && !this.pending.isEmpty()) {
            this.transmitEntries(toSend, now);
        }
    }

    private void transmitEntries(int maxTransmit, long now) {
        for (int i = 0; i < maxTransmit; ++i) {
            ConnectionEntry e = this.pending.poll();
            if (e != null && this.transmitEntry(e, now)) continue;
            LOG.debug("Queue {} transmitted {} requests", (Object)this, (Object)i);
            return;
        }
        LOG.debug("Queue {} transmitted {} requests", (Object)this, (Object)maxTransmit);
    }

    private boolean transmitEntry(ConnectionEntry entry, long now) {
        LOG.debug("Queue {} transmitting entry {}", (Object)this, (Object)entry);
        Optional<TransmittedConnectionEntry> maybeTransmitted = this.transmit(entry, now);
        if (!maybeTransmitted.isPresent()) {
            return false;
        }
        this.inflight.addLast(maybeTransmitted.orElseThrow());
        return true;
    }

    final long enqueueOrForward(ConnectionEntry entry, long now) {
        if (this.successor != null) {
            this.successor.forwardEntry(entry, now);
            return 0L;
        }
        return this.enqueue(entry, now);
    }

    final void enqueueOrReplay(ConnectionEntry entry, long now) {
        if (this.successor != null) {
            this.successor.replayEntry(entry, now);
        } else {
            this.enqueue(entry, now);
        }
    }

    private long enqueue(ConnectionEntry entry, long now) {
        long delay = this.tracker.openTask(now);
        int toSend = this.canTransmitCount(this.inflight.size());
        if (toSend <= 0) {
            LOG.trace("Queue is at capacity, delayed sending of request {}", entry.getRequest());
            this.pending.addLast(entry);
            return delay;
        }
        if (this.pending.isEmpty()) {
            if (!this.transmitEntry(entry, now)) {
                LOG.debug("Queue {} cannot transmit request {} - delaying it", (Object)this, entry.getRequest());
                this.pending.addLast(entry);
            }
            return delay;
        }
        this.pending.addLast(entry);
        this.transmitEntries(toSend, now);
        return delay;
    }

    abstract int canTransmitCount(int var1);

    abstract Optional<TransmittedConnectionEntry> transmit(ConnectionEntry var1, long var2);

    abstract void preComplete(ResponseEnvelope<?> var1);

    final boolean isEmpty() {
        return this.inflight.isEmpty() && this.pending.isEmpty();
    }

    final ConnectionEntry peek() {
        ConnectionEntry ret = this.inflight.peek();
        if (ret != null) {
            return ret;
        }
        return this.pending.peek();
    }

    final List<ConnectionEntry> poison() {
        ArrayList<ConnectionEntry> entries = new ArrayList<ConnectionEntry>(this.inflight.size() + this.pending.size());
        entries.addAll(this.inflight);
        this.inflight.clear();
        entries.addAll(this.pending);
        this.pending.clear();
        return entries;
    }

    final void setForwarder(ReconnectForwarder forwarder, long now) {
        Verify.verify((this.successor == null ? 1 : 0) != 0, (String)"Successor %s already set on connection %s", (Object)this.successor, (Object)this);
        this.successor = Objects.requireNonNull(forwarder);
        LOG.debug("Connection {} superseded by {}, splicing queue", (Object)this, (Object)this.successor);
        int count = 0;
        ConnectionEntry entry = this.inflight.poll();
        while (entry != null) {
            this.successor.replayEntry(entry, now);
            entry = this.inflight.poll();
            ++count;
        }
        entry = this.pending.poll();
        while (entry != null) {
            this.successor.replayEntry(entry, now);
            entry = this.pending.poll();
            ++count;
        }
        LOG.debug("Connection {} queue spliced {} messages", (Object)this, (Object)count);
    }

    final void remove(long now) {
        TransmittedConnectionEntry txe = this.inflight.poll();
        if (txe == null) {
            ConnectionEntry entry = this.pending.pop();
            this.tracker.closeTask(now, entry.getEnqueuedTicks(), 0L, 0L);
        } else {
            this.tracker.closeTask(now, txe.getEnqueuedTicks(), txe.getTxTicks(), 0L);
        }
    }

    @VisibleForTesting
    Deque<TransmittedConnectionEntry> getInflight() {
        return this.inflight;
    }

    @VisibleForTesting
    Deque<ConnectionEntry> getPending() {
        return this.pending;
    }

    @SuppressFBWarnings(value={"NP_OPTIONAL_RETURN_NULL"}, justification="Returning null Optional is documented in the API contract.")
    private static Optional<TransmittedConnectionEntry> findMatchingEntry(Queue<? extends ConnectionEntry> queue, ResponseEnvelope<?> envelope) {
        Iterator it = queue.iterator();
        while (it.hasNext()) {
            ConnectionEntry e = (ConnectionEntry)it.next();
            Request<?, ?> request = e.getRequest();
            Response response = (Response)envelope.getMessage();
            if (!request.getTarget().equals(response.getTarget())) continue;
            if (request.getSequence() != response.getSequence()) {
                LOG.debug("Expecting sequence {}, ignoring response {}", (Object)request.getSequence(), envelope);
                return Optional.empty();
            }
            if (!(e instanceof TransmittedConnectionEntry)) {
                return Optional.empty();
            }
            TransmittedConnectionEntry te = (TransmittedConnectionEntry)e;
            if (envelope.getSessionId() != te.getSessionId()) {
                LOG.debug("Expecting session {}, ignoring response {}", (Object)te.getSessionId(), envelope);
                return Optional.empty();
            }
            if (envelope.getTxSequence() != te.getTxSequence()) {
                LOG.warn("Expecting txSequence {}, ignoring response {}", (Object)te.getTxSequence(), envelope);
                return Optional.empty();
            }
            LOG.debug("Completing request {} with {}", request, envelope);
            it.remove();
            return Optional.of(te);
        }
        return null;
    }

    static final class Transmitting
    extends TransmitQueue {
        private static final long NOT_SLICING = -1L;
        private final BackendInfo backend;
        private final MessageSlicer messageSlicer;
        private long nextTxSequence;
        private long currentSlicedEnvSequenceId = -1L;

        Transmitting(TransmitQueue oldQueue, int targetDepth, BackendInfo backend, long now, MessageSlicer messageSlicer) {
            super(oldQueue, targetDepth, now);
            this.backend = Objects.requireNonNull(backend);
            this.messageSlicer = Objects.requireNonNull(messageSlicer);
        }

        @Override
        int canTransmitCount(int inflightSize) {
            return this.backend.getMaxMessages() - inflightSize;
        }

        @Override
        Optional<TransmittedConnectionEntry> transmit(ConnectionEntry entry, long now) {
            if (this.currentSlicedEnvSequenceId >= 0L) {
                return Optional.empty();
            }
            Request<?, ?> request = entry.getRequest();
            RequestEnvelope env = new RequestEnvelope((Request)request.toVersion(this.backend.getVersion()), this.backend.getSessionId(), this.nextTxSequence++);
            if (request instanceof SliceableMessage) {
                if (this.messageSlicer.slice(SliceOptions.builder().identifier((Identifier)request.getTarget()).message((Serializable)env).replyTo(request.getReplyTo()).sendTo(this.backend.getActor()).onFailureCallback(t -> env.sendFailure((RequestException)new RuntimeRequestException("Failed to slice request " + request, t), 0L)).build())) {
                    this.currentSlicedEnvSequenceId = env.getTxSequence();
                }
            } else {
                this.backend.getActor().tell((Object)env, ActorRef.noSender());
            }
            return Optional.of(new TransmittedConnectionEntry(entry, env.getSessionId(), env.getTxSequence(), now));
        }

        @Override
        void preComplete(ResponseEnvelope<?> envelope) {
            if (envelope.getTxSequence() == this.currentSlicedEnvSequenceId) {
                this.currentSlicedEnvSequenceId = -1L;
            }
        }
    }

    static final class Halted
    extends TransmitQueue {
        Halted(int targetDepth) {
            super(targetDepth);
        }

        Halted(TransmitQueue oldQueue, long now) {
            super(oldQueue, now);
        }

        @Override
        int canTransmitCount(int inflightSize) {
            return 0;
        }

        @Override
        Optional<TransmittedConnectionEntry> transmit(ConnectionEntry entry, long now) {
            throw new UnsupportedOperationException("Attempted to transmit on a halted queue");
        }

        @Override
        void preComplete(ResponseEnvelope<?> envelope) {
        }
    }
}

