/*
 * Decompiled with CFR 0.152.
 */
package shaded.org.apache.iotdb.session.subscription.consumer.base;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import shaded.org.apache.iotdb.rpc.subscription.exception.SubscriptionException;
import shaded.org.apache.iotdb.session.subscription.consumer.AckStrategy;
import shaded.org.apache.iotdb.session.subscription.consumer.ConsumeListener;
import shaded.org.apache.iotdb.session.subscription.consumer.ConsumeResult;
import shaded.org.apache.iotdb.session.subscription.consumer.base.AbstractSubscriptionConsumer;
import shaded.org.apache.iotdb.session.subscription.consumer.base.AbstractSubscriptionPushConsumerBuilder;
import shaded.org.apache.iotdb.session.subscription.consumer.base.SubscriptionExecutorServiceManager;
import shaded.org.apache.iotdb.session.subscription.consumer.tree.SubscriptionTreePushConsumer;
import shaded.org.apache.iotdb.session.subscription.payload.SubscriptionMessage;
import shaded.org.apache.iotdb.session.subscription.util.CollectionUtils;

public abstract class AbstractSubscriptionPushConsumer
extends AbstractSubscriptionConsumer {
    private static final Logger LOGGER = LoggerFactory.getLogger(SubscriptionTreePushConsumer.class);
    private final AckStrategy ackStrategy;
    private final ConsumeListener consumeListener;
    private final long autoPollIntervalMs;
    private final long autoPollTimeoutMs;
    private final AtomicBoolean isClosed = new AtomicBoolean(true);

    protected AbstractSubscriptionPushConsumer(AbstractSubscriptionPushConsumerBuilder builder) {
        super(builder);
        this.ackStrategy = builder.ackStrategy;
        this.consumeListener = builder.consumeListener;
        this.autoPollIntervalMs = builder.autoPollIntervalMs;
        this.autoPollTimeoutMs = builder.autoPollTimeoutMs;
    }

    public AbstractSubscriptionPushConsumer(Properties config) {
        this(config, (AckStrategy)((Object)config.getOrDefault((Object)"ack-strategy", (Object)AckStrategy.defaultValue())), (ConsumeListener)config.getOrDefault((Object)"consume-listener", message -> ConsumeResult.SUCCESS), (Long)config.getOrDefault((Object)"auto-poll-interval-ms", (Object)100L), (Long)config.getOrDefault((Object)"auto-poll-timeout-ms", (Object)10000L));
    }

    protected AbstractSubscriptionPushConsumer(Properties config, AckStrategy ackStrategy, ConsumeListener consumeListener, long autoPollIntervalMs, long autoPollTimeoutMs) {
        super(new AbstractSubscriptionPushConsumerBuilder().ackStrategy(ackStrategy).consumeListener(consumeListener).autoPollIntervalMs(autoPollIntervalMs).autoPollTimeoutMs(autoPollTimeoutMs), config);
        this.ackStrategy = ackStrategy;
        this.consumeListener = consumeListener;
        this.autoPollIntervalMs = Math.max(autoPollIntervalMs, 1L);
        this.autoPollTimeoutMs = Math.max(autoPollTimeoutMs, 1000L);
    }

    @Override
    protected synchronized void open() throws SubscriptionException {
        if (!this.isClosed.get()) {
            return;
        }
        super.open();
        this.isClosed.set(false);
        this.submitAutoPollWorker();
    }

    @Override
    public synchronized void close() {
        if (this.isClosed.get()) {
            return;
        }
        super.close();
        this.isClosed.set(true);
    }

    @Override
    boolean isClosed() {
        return this.isClosed.get();
    }

    private void submitAutoPollWorker() {
        ScheduledFuture[] future;
        future = new ScheduledFuture[]{SubscriptionExecutorServiceManager.submitAutoPollWorker(() -> {
            if (this.isClosed()) {
                if (Objects.nonNull(future[0])) {
                    future[0].cancel(false);
                    LOGGER.info("SubscriptionPushConsumer {} cancel auto poll worker", (Object)this);
                }
                return;
            }
            new AutoPollWorker().run();
        }, this.autoPollIntervalMs)};
        LOGGER.info("SubscriptionPushConsumer {} submit auto poll worker", (Object)this);
    }

    public String toString() {
        return "SubscriptionPushConsumer" + this.coreReportMessage();
    }

    @Override
    protected Map<String, String> coreReportMessage() {
        Map<String, String> coreReportMessage = super.coreReportMessage();
        coreReportMessage.put("ackStrategy", this.ackStrategy.toString());
        return coreReportMessage;
    }

    @Override
    protected Map<String, String> allReportMessage() {
        Map<String, String> allReportMessage = super.allReportMessage();
        allReportMessage.put("ackStrategy", this.ackStrategy.toString());
        allReportMessage.put("autoPollIntervalMs", String.valueOf(this.autoPollIntervalMs));
        allReportMessage.put("autoPollTimeoutMs", String.valueOf(this.autoPollTimeoutMs));
        return allReportMessage;
    }

    class AutoPollWorker
    implements Runnable {
        AutoPollWorker() {
        }

        @Override
        public void run() {
            if (AbstractSubscriptionPushConsumer.this.isClosed()) {
                return;
            }
            if (AbstractSubscriptionPushConsumer.this.subscribedTopics.isEmpty()) {
                return;
            }
            try {
                List<SubscriptionMessage> messages = AbstractSubscriptionPushConsumer.this.multiplePoll(AbstractSubscriptionPushConsumer.this.subscribedTopics.keySet(), AbstractSubscriptionPushConsumer.this.autoPollTimeoutMs);
                if (messages.isEmpty()) {
                    LOGGER.info("SubscriptionPushConsumer {} poll empty message from topics {} after {} millisecond(s)", this, CollectionUtils.getLimitedString(AbstractSubscriptionPushConsumer.this.subscribedTopics.keySet(), 32), AbstractSubscriptionPushConsumer.this.autoPollTimeoutMs);
                    return;
                }
                if (AbstractSubscriptionPushConsumer.this.ackStrategy.equals((Object)AckStrategy.BEFORE_CONSUME)) {
                    AbstractSubscriptionPushConsumer.this.ack(messages);
                }
                ArrayList<SubscriptionMessage> messagesToAck = new ArrayList<SubscriptionMessage>();
                ArrayList<SubscriptionMessage> messagesToNack = new ArrayList<SubscriptionMessage>();
                for (SubscriptionMessage message : messages) {
                    try {
                        ConsumeResult consumeResult = AbstractSubscriptionPushConsumer.this.consumeListener.onReceive(message);
                        if (Objects.equals((Object)ConsumeResult.SUCCESS, (Object)consumeResult)) {
                            messagesToAck.add(message);
                            continue;
                        }
                        LOGGER.warn("Consumer listener result failure when consuming message: {}", (Object)message);
                        messagesToNack.add(message);
                    }
                    catch (Exception e) {
                        LOGGER.warn("Consumer listener raised an exception while consuming message: {}", (Object)message, (Object)e);
                        messagesToNack.add(message);
                    }
                }
                if (AbstractSubscriptionPushConsumer.this.ackStrategy.equals((Object)AckStrategy.AFTER_CONSUME)) {
                    AbstractSubscriptionPushConsumer.this.ack(messagesToAck);
                    AbstractSubscriptionPushConsumer.this.nack(messagesToNack);
                }
            }
            catch (Exception e) {
                LOGGER.warn("something unexpected happened when auto poll messages...", e);
            }
        }
    }
}

