/*
 * Decompiled with CFR 0.152.
 */
package io.vertx.rabbitmq.impl;

import io.vertx.core.AsyncResult;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.impl.logging.Logger;
import io.vertx.core.impl.logging.LoggerFactory;
import io.vertx.core.streams.ReadStream;
import io.vertx.core.streams.impl.InboundBuffer;
import io.vertx.rabbitmq.QueueOptions;
import io.vertx.rabbitmq.RabbitMQConsumer;
import io.vertx.rabbitmq.RabbitMQMessage;
import io.vertx.rabbitmq.impl.QueueConsumerHandler;
import java.io.IOException;

public class RabbitMQConsumerImpl
implements RabbitMQConsumer {
    private static final Logger log = LoggerFactory.getLogger(RabbitMQConsumerImpl.class);
    private Handler<Throwable> exceptionHandler;
    private Handler<Void> endHandler;
    private final QueueConsumerHandler consumerHandler;
    private final boolean keepMostRecent;
    private final InboundBuffer<RabbitMQMessage> pending;
    private final int maxQueueSize;

    RabbitMQConsumerImpl(Context context, QueueConsumerHandler consumerHandler, QueueOptions options) {
        this.consumerHandler = consumerHandler;
        this.keepMostRecent = options.isKeepMostRecent();
        this.maxQueueSize = options.maxInternalQueueSize();
        this.pending = new InboundBuffer(context, (long)this.maxQueueSize).pause();
    }

    @Override
    public RabbitMQConsumer exceptionHandler(Handler<Throwable> exceptionHandler) {
        this.exceptionHandler = exceptionHandler;
        return this;
    }

    @Override
    public RabbitMQConsumer handler(Handler<RabbitMQMessage> handler) {
        if (handler != null) {
            this.pending.handler(msg -> {
                try {
                    handler.handle(msg);
                }
                catch (Exception e) {
                    this.handleException(e);
                }
            });
        } else {
            this.pending.handler(null);
        }
        return this;
    }

    @Override
    public RabbitMQConsumer pause() {
        this.pending.pause();
        return this;
    }

    @Override
    public RabbitMQConsumer resume() {
        this.pending.resume();
        return this;
    }

    public ReadStream<RabbitMQMessage> fetch(long amount) {
        this.pending.fetch(amount);
        return this;
    }

    @Override
    public RabbitMQConsumer endHandler(Handler<Void> endHandler) {
        this.endHandler = endHandler;
        return this;
    }

    @Override
    public String consumerTag() {
        return this.consumerHandler.getConsumerTag();
    }

    @Override
    public void cancel() {
        this.cancel(null);
    }

    @Override
    public void cancel(Handler<AsyncResult<Void>> cancelResult) {
        Future operationResult;
        try {
            this.consumerHandler.getChannel().basicCancel(this.consumerTag());
            operationResult = Future.succeededFuture();
        }
        catch (IOException e) {
            operationResult = Future.failedFuture((Throwable)e);
        }
        if (cancelResult != null) {
            cancelResult.handle((Object)operationResult);
        }
        this.handleEnd();
    }

    @Override
    public boolean isPaused() {
        return false;
    }

    void handleMessage(RabbitMQMessage message) {
        if (this.pending.size() >= this.maxQueueSize) {
            if (this.keepMostRecent) {
                this.pending.read();
            } else {
                log.debug((Object)"Discard a received message since stream is paused and buffer flag is false");
                return;
            }
        }
        this.pending.write((Object)message);
    }

    private void handleException(Throwable exception) {
        if (this.exceptionHandler != null) {
            this.exceptionHandler.handle((Object)exception);
        }
    }

    void handleEnd() {
        if (this.endHandler != null) {
            this.endHandler.handle(null);
        }
    }
}

