/*
 * Decompiled with CFR 0.152.
 */
package com.zendesk.maxwell.producer;

import com.zendesk.maxwell.MaxwellContext;
import com.zendesk.maxwell.replication.Position;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.concurrent.Semaphore;

public class InflightMessageList {
    private static final int DEFAULT_CAPACITY = 10000;
    private final long producerAckTimeoutMS;
    private final LinkedHashMap<Position, InflightMessage> linkedMap;
    private final MaxwellContext context;
    private final Semaphore semaphore;
    private long messageCount = 0L;

    public InflightMessageList(MaxwellContext context) {
        this(context, 10000);
    }

    public InflightMessageList(MaxwellContext context, int capacity) {
        this.context = context;
        this.producerAckTimeoutMS = context.getConfig().producerAckTimeout;
        this.linkedMap = new LinkedHashMap();
        this.semaphore = new Semaphore(capacity);
    }

    public long waitForSlot() throws InterruptedException {
        this.semaphore.acquire();
        return ++this.messageCount;
    }

    private synchronized InflightMessage head() {
        Iterator<InflightMessage> it = this.iterator();
        if (it.hasNext()) {
            return it.next();
        }
        return null;
    }

    private void checkStuckHead(long messageID) {
        if (this.producerAckTimeoutMS == 0L) {
            return;
        }
        InflightMessage message = this.head();
        if (message == null || message.messageID == messageID) {
            return;
        }
        if (message.timeAsBlockedHead() > this.producerAckTimeoutMS) {
            IllegalStateException e = new IllegalStateException("Did not receive acknowledgement for the head of the inflight message list for " + this.producerAckTimeoutMS + " ms");
            this.context.terminate(e);
        } else {
            message.markBlockedHead();
        }
    }

    public void freeSlot(long messageID) {
        this.semaphore.release();
        this.checkStuckHead(messageID);
    }

    public synchronized void addMessage(Position p, long eventTimestampMillis, long messageID) throws InterruptedException {
        InflightMessage m = new InflightMessage(p, eventTimestampMillis, messageID);
        this.linkedMap.put(p, m);
    }

    public synchronized InflightMessage completeMessage(Position p) {
        InflightMessage m = this.linkedMap.get(p);
        assert (m != null);
        m.isComplete = true;
        InflightMessage completeUntil = null;
        Iterator<InflightMessage> iterator = this.iterator();
        while (iterator.hasNext()) {
            InflightMessage msg = iterator.next();
            if (!msg.isComplete) break;
            completeUntil = msg;
            iterator.remove();
        }
        return completeUntil;
    }

    public int size() {
        return this.linkedMap.size();
    }

    private Iterator<InflightMessage> iterator() {
        return this.linkedMap.values().iterator();
    }

    class InflightMessage {
        public final Position position;
        public boolean isComplete;
        public final long messageID;
        public final long sendTimeMS;
        public final long eventTimeMS;
        private Long blockedHeadTimeMS;

        InflightMessage(Position p, long eventTimeMS, long messageID) {
            this.position = p;
            this.isComplete = false;
            this.sendTimeMS = System.currentTimeMillis();
            this.eventTimeMS = eventTimeMS;
            this.messageID = messageID;
        }

        long timeSinceSendMS() {
            return System.currentTimeMillis() - this.sendTimeMS;
        }

        private void markBlockedHead() {
            if (this.blockedHeadTimeMS == null) {
                this.blockedHeadTimeMS = System.currentTimeMillis();
            }
        }

        private long timeAsBlockedHead() {
            if (this.blockedHeadTimeMS == null) {
                return 0L;
            }
            return System.currentTimeMillis() - this.blockedHeadTimeMS;
        }
    }
}

