/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iotdb.session.subscription;

import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.SortedMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.rpc.subscription.exception.SubscriptionException;
import org.apache.iotdb.rpc.subscription.payload.EnrichedTablets;
import org.apache.iotdb.session.subscription.PullConsumerAutoCommitWorker;
import org.apache.iotdb.session.subscription.SubscriptionConsumer;
import org.apache.iotdb.session.subscription.SubscriptionMessage;
import org.apache.iotdb.session.subscription.SubscriptionProvider;
import org.apache.iotdb.session.subscription.SubscriptionPushConsumer;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SubscriptionPullConsumer
extends SubscriptionConsumer {
    private static final Logger LOGGER = LoggerFactory.getLogger(SubscriptionPullConsumer.class);
    private final boolean autoCommit;
    private final long autoCommitIntervalMs;
    private ScheduledExecutorService autoCommitWorkerExecutor;
    private SortedMap<Long, Set<SubscriptionMessage>> uncommittedMessages;
    private final AtomicBoolean isClosed = new AtomicBoolean(true);

    public SubscriptionPullConsumer(Builder builder) {
        super(builder);
        this.autoCommit = builder.autoCommit;
        this.autoCommitIntervalMs = builder.autoCommitIntervalMs;
    }

    public SubscriptionPullConsumer(Properties properties) {
        this(properties, (Boolean)properties.getOrDefault((Object)"auto-commit", (Object)true), (Long)properties.getOrDefault((Object)"auto-commit-interval-ms", (Object)5000L));
    }

    private SubscriptionPullConsumer(Properties properties, boolean autoCommit, long autoCommitIntervalMs) {
        super(new Builder().autoCommit(autoCommit).autoCommitIntervalMs(autoCommitIntervalMs), properties);
        this.autoCommit = autoCommit;
        this.autoCommitIntervalMs = autoCommitIntervalMs;
    }

    @Override
    public synchronized void open() throws TException, IoTDBConnectionException, IOException, StatementExecutionException {
        if (!this.isClosed.get()) {
            return;
        }
        super.open();
        if (this.autoCommit) {
            this.launchAutoCommitWorker();
        }
        this.isClosed.set(false);
    }

    @Override
    public synchronized void close() throws IoTDBConnectionException {
        if (this.isClosed.get()) {
            return;
        }
        try {
            if (this.autoCommit) {
                this.shutdownAutoCommitWorker();
                this.commitAllUncommittedMessages();
            }
            super.close();
        }
        finally {
            this.isClosed.set(true);
        }
    }

    public List<SubscriptionMessage> poll(Duration timeoutMs) throws TException, IOException, StatementExecutionException {
        return this.poll(Collections.emptySet(), timeoutMs.toMillis());
    }

    public List<SubscriptionMessage> poll(long timeoutMs) throws TException, IOException, StatementExecutionException {
        return this.poll(Collections.emptySet(), timeoutMs);
    }

    public List<SubscriptionMessage> poll(Set<String> topicNames, Duration timeoutMs) throws TException, IOException, StatementExecutionException {
        return this.poll(topicNames, timeoutMs.toMillis());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public List<SubscriptionMessage> poll(Set<String> topicNames, long timeoutMs) throws TException, IOException, StatementExecutionException {
        ArrayList<EnrichedTablets> enrichedTabletsList = new ArrayList<EnrichedTablets>();
        this.acquireReadLock();
        try {
            for (SubscriptionProvider provider : this.getAllAvailableProviders()) {
                enrichedTabletsList.addAll(provider.getSessionConnection().poll(topicNames, timeoutMs));
            }
        }
        finally {
            this.releaseReadLock();
        }
        List<SubscriptionMessage> messages = enrichedTabletsList.stream().map(SubscriptionMessage::new).collect(Collectors.toList());
        if (this.autoCommit) {
            long currentTimestamp = System.currentTimeMillis();
            long index = currentTimestamp / this.autoCommitIntervalMs;
            if (currentTimestamp % this.autoCommitIntervalMs == 0L) {
                --index;
            }
            this.uncommittedMessages.computeIfAbsent(index, o -> new ConcurrentSkipListSet()).addAll(messages);
        }
        return messages;
    }

    public void commitSync(SubscriptionMessage message) throws TException, IOException, StatementExecutionException, IoTDBConnectionException {
        this.commitSync(Collections.singletonList(message));
    }

    public void commitSync(Iterable<SubscriptionMessage> messages) throws TException, IOException, StatementExecutionException, IoTDBConnectionException {
        HashMap<Integer, Map> dataNodeIdToTopicNameToSubscriptionCommitIds = new HashMap<Integer, Map>();
        for (SubscriptionMessage subscriptionMessage : messages) {
            dataNodeIdToTopicNameToSubscriptionCommitIds.computeIfAbsent(subscriptionMessage.parseDataNodeIdFromSubscriptionCommitId(), id -> new HashMap()).computeIfAbsent(subscriptionMessage.getTopicName(), topicName -> new ArrayList()).add(subscriptionMessage.getSubscriptionCommitId());
        }
        for (Map.Entry entry : dataNodeIdToTopicNameToSubscriptionCommitIds.entrySet()) {
            this.commitSyncInternal((Integer)entry.getKey(), (Map)entry.getValue());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void commitSyncInternal(int dataNodeId, Map<String, List<String>> topicNameToSubscriptionCommitIds) throws TException, IOException, StatementExecutionException, IoTDBConnectionException {
        this.acquireReadLock();
        try {
            SubscriptionProvider provider = this.getProvider(dataNodeId);
            if (Objects.isNull(provider) || !provider.isAvailable()) {
                throw new IoTDBConnectionException(String.format("something unexpected happened when commit messages to subscription provider with data node id %s, the subscription provider may be unavailable or not existed", dataNodeId));
            }
            provider.getSessionConnection().commitSync(topicNameToSubscriptionCommitIds);
        }
        finally {
            this.releaseReadLock();
        }
    }

    private void launchAutoCommitWorker() {
        this.uncommittedMessages = new ConcurrentSkipListMap<Long, Set<SubscriptionMessage>>();
        this.autoCommitWorkerExecutor = Executors.newSingleThreadScheduledExecutor(r -> {
            Thread t = new Thread(Thread.currentThread().getThreadGroup(), r, "PullConsumerAutoCommitWorker", 0L);
            if (!t.isDaemon()) {
                t.setDaemon(true);
            }
            if (t.getPriority() != 5) {
                t.setPriority(5);
            }
            return t;
        });
        this.autoCommitWorkerExecutor.scheduleAtFixedRate(new PullConsumerAutoCommitWorker(this), 0L, this.autoCommitIntervalMs, TimeUnit.MILLISECONDS);
    }

    private void shutdownAutoCommitWorker() {
        this.autoCommitWorkerExecutor.shutdown();
        this.autoCommitWorkerExecutor = null;
    }

    private void commitAllUncommittedMessages() {
        for (Map.Entry<Long, Set<SubscriptionMessage>> entry : this.uncommittedMessages.entrySet()) {
            try {
                this.commitSync((Iterable<SubscriptionMessage>)entry.getValue());
                this.uncommittedMessages.remove(entry.getKey());
            }
            catch (Exception e) {
                LOGGER.warn("something unexpected happened when commit messages during close", (Throwable)e);
            }
        }
    }

    @Override
    boolean isClosed() {
        return this.isClosed.get();
    }

    long getAutoCommitIntervalMs() {
        return this.autoCommitIntervalMs;
    }

    SortedMap<Long, Set<SubscriptionMessage>> getUncommittedMessages() {
        return this.uncommittedMessages;
    }

    public static class Builder
    extends SubscriptionConsumer.Builder {
        private boolean autoCommit = true;
        private long autoCommitIntervalMs = 5000L;

        @Override
        public Builder host(String host) {
            super.host(host);
            return this;
        }

        @Override
        public Builder port(int port) {
            super.port(port);
            return this;
        }

        @Override
        public Builder nodeUrls(List<String> nodeUrls) {
            super.nodeUrls(nodeUrls);
            return this;
        }

        @Override
        public Builder username(String username) {
            super.username(username);
            return this;
        }

        @Override
        public Builder password(String password) {
            super.password(password);
            return this;
        }

        @Override
        public Builder consumerId(String consumerId) {
            super.consumerId(consumerId);
            return this;
        }

        @Override
        public Builder consumerGroupId(String consumerGroupId) {
            super.consumerGroupId(consumerGroupId);
            return this;
        }

        @Override
        public Builder heartbeatIntervalMs(long heartbeatIntervalMs) {
            super.heartbeatIntervalMs(heartbeatIntervalMs);
            return this;
        }

        @Override
        public Builder endpointsSyncIntervalMs(long endpointsSyncIntervalMs) {
            super.endpointsSyncIntervalMs(endpointsSyncIntervalMs);
            return this;
        }

        public Builder autoCommit(boolean autoCommit) {
            this.autoCommit = autoCommit;
            return this;
        }

        public Builder autoCommitIntervalMs(long autoCommitIntervalMs) {
            this.autoCommitIntervalMs = Math.max(autoCommitIntervalMs, 500L);
            return this;
        }

        @Override
        public SubscriptionPullConsumer buildPullConsumer() {
            return new SubscriptionPullConsumer(this);
        }

        @Override
        public SubscriptionPushConsumer buildPushConsumer() {
            throw new SubscriptionException("SubscriptionPullConsumer.Builder do not support build push consumer.");
        }
    }
}

