/*
 * Decompiled with CFR 0.152.
 */
package net.neoremind.fountain.common.mq;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import net.neoremind.fountain.changedata.BinlogTraceable;
import net.neoremind.fountain.common.mq.FountainMQ;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractMaxCapacityFountainMQ
implements FountainMQ {
    private static final Logger LOGGER = LoggerFactory.getLogger(AbstractMaxCapacityFountainMQ.class);
    private final BlockingQueue<BinlogTraceable> queue = new LinkedBlockingQueue<BinlogTraceable>();
    private final Semaphore guardLimit;
    protected final int limitSize;

    protected AbstractMaxCapacityFountainMQ(int limit) {
        this.guardLimit = new Semaphore(limit);
        this.limitSize = limit;
    }

    protected abstract int getRequiredPermits(BinlogTraceable var1);

    @Override
    public void push(BinlogTraceable e) {
        while (true) {
            try {
                int waitTimes = 0;
                int permit = this.getRequiredPermits(e);
                while (!this.guardLimit.tryAcquire(permit, 500L, TimeUnit.MILLISECONDS)) {
                    if (waitTimes == 0) {
                        LOGGER.warn("wait 500ms for {}", (Object)permit);
                    }
                    ++waitTimes;
                }
                if (waitTimes > 0) {
                    LOGGER.warn("wait times {} for {}", (Object)waitTimes, (Object)permit);
                }
                this.queue.put(e);
            }
            catch (InterruptedException e1) {
                continue;
            }
            break;
        }
    }

    @Override
    public boolean push(BinlogTraceable e, long timeout) {
        long start = System.currentTimeMillis();
        boolean ok = false;
        long used = 0L;
        while (true) {
            try {
                int permits = this.getRequiredPermits(e);
                ok = this.guardLimit.tryAcquire(permits, timeout - used, TimeUnit.MILLISECONDS);
                if (ok) {
                    this.queue.put(e);
                }
                return ok;
            }
            catch (InterruptedException e1) {
                used = System.currentTimeMillis() - start;
                if (used > 0L) continue;
                return ok;
            }
            break;
        }
    }

    @Override
    public BinlogTraceable pop() {
        BinlogTraceable e = null;
        while (true) {
            try {
                e = this.queue.take();
                if (e != null) {
                    this.guardLimit.release(this.getRequiredPermits(e));
                }
                return e;
            }
            catch (InterruptedException e1) {
                continue;
            }
            break;
        }
    }

    @Override
    public BinlogTraceable pop(long timeout) {
        long start = System.currentTimeMillis();
        BinlogTraceable e = null;
        long used = 0L;
        while (true) {
            try {
                e = this.queue.poll(timeout - used, TimeUnit.MILLISECONDS);
                if (e != null) {
                    this.guardLimit.release(this.getRequiredPermits(e));
                }
                return e;
            }
            catch (InterruptedException e1) {
                used = System.currentTimeMillis() - start;
                if (used > 0L) continue;
                return e;
            }
            break;
        }
    }
}

