/*
 * Decompiled with CFR 0.152.
 */
package shaded.org.apache.iotdb.session.subscription.consumer.base;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.SortedMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import shaded.org.apache.iotdb.common.rpc.thrift.TEndPoint;
import shaded.org.apache.iotdb.rpc.IoTDBConnectionException;
import shaded.org.apache.iotdb.rpc.subscription.exception.SubscriptionConnectionException;
import shaded.org.apache.iotdb.rpc.subscription.exception.SubscriptionException;
import shaded.org.apache.iotdb.rpc.subscription.payload.response.PipeSubscribeHeartbeatResp;
import shaded.org.apache.iotdb.session.subscription.consumer.base.AbstractSubscriptionConsumer;
import shaded.org.apache.iotdb.session.subscription.consumer.base.AbstractSubscriptionProvider;

final class AbstractSubscriptionProviders {
    private static final Logger LOGGER = LoggerFactory.getLogger(AbstractSubscriptionProviders.class);
    private final SortedMap<Integer, AbstractSubscriptionProvider> subscriptionProviders = new ConcurrentSkipListMap<Integer, AbstractSubscriptionProvider>();
    private int nextDataNodeId = -1;
    private final ReentrantReadWriteLock subscriptionProvidersLock = new ReentrantReadWriteLock(true);
    private final Set<TEndPoint> initialEndpoints;

    AbstractSubscriptionProviders(Set<TEndPoint> initialEndpoints) {
        this.initialEndpoints = initialEndpoints;
    }

    void acquireReadLock() {
        this.subscriptionProvidersLock.readLock().lock();
    }

    void releaseReadLock() {
        this.subscriptionProvidersLock.readLock().unlock();
    }

    void acquireWriteLock() {
        this.subscriptionProvidersLock.writeLock().lock();
    }

    void releaseWriteLock() {
        this.subscriptionProvidersLock.writeLock().unlock();
    }

    void openProviders(AbstractSubscriptionConsumer consumer) throws SubscriptionException {
        this.closeProviders();
        for (TEndPoint endPoint : this.initialEndpoints) {
            Map<Integer, TEndPoint> allEndPoints;
            AbstractSubscriptionProvider defaultProvider;
            try {
                defaultProvider = consumer.constructProviderAndHandshake(endPoint);
            }
            catch (Exception e) {
                LOGGER.warn("{} failed to create connection with {} because of {}", consumer, endPoint, e, e);
                continue;
            }
            int defaultDataNodeId = defaultProvider.getDataNodeId();
            this.addProvider(defaultDataNodeId, defaultProvider);
            try {
                allEndPoints = defaultProvider.heartbeat().getEndPoints();
            }
            catch (Exception e) {
                LOGGER.warn("{} failed to fetch all endpoints from {} because of {}", consumer, endPoint, e, e);
                break;
            }
            for (Map.Entry<Integer, TEndPoint> entry : allEndPoints.entrySet()) {
                AbstractSubscriptionProvider provider;
                if (defaultDataNodeId == entry.getKey()) continue;
                try {
                    provider = consumer.constructProviderAndHandshake(entry.getValue());
                }
                catch (Exception e) {
                    LOGGER.warn("{} failed to create connection with {} because of {}", consumer, entry.getValue(), e, e);
                    continue;
                }
                this.addProvider(entry.getKey(), provider);
            }
        }
        if (this.hasNoAvailableProviders()) {
            throw new SubscriptionConnectionException(String.format("Cluster has no available subscription providers to connect with initial endpoints %s", this.initialEndpoints));
        }
        this.nextDataNodeId = this.subscriptionProviders.firstKey();
    }

    void closeProviders() {
        for (AbstractSubscriptionProvider provider : this.getAllProviders()) {
            try {
                provider.close();
            }
            catch (Exception e) {
                LOGGER.warn("Failed to close subscription provider {} because of {}", provider, e, e);
            }
        }
        this.subscriptionProviders.clear();
    }

    void addProvider(int dataNodeId, AbstractSubscriptionProvider provider) {
        LOGGER.info("add new subscription provider {}", (Object)provider);
        this.subscriptionProviders.put(dataNodeId, provider);
    }

    void closeAndRemoveProvider(int dataNodeId) throws SubscriptionException, IoTDBConnectionException {
        if (!this.containsProvider(dataNodeId)) {
            return;
        }
        AbstractSubscriptionProvider provider = (AbstractSubscriptionProvider)this.subscriptionProviders.get(dataNodeId);
        try {
            provider.close();
        }
        finally {
            LOGGER.info("close and remove stale subscription provider {}", (Object)provider);
            this.subscriptionProviders.remove(dataNodeId);
        }
    }

    boolean hasNoProviders() {
        return this.subscriptionProviders.isEmpty();
    }

    List<AbstractSubscriptionProvider> getAllProviders() {
        return new ArrayList<AbstractSubscriptionProvider>(this.subscriptionProviders.values());
    }

    AbstractSubscriptionProvider getProvider(int dataNodeId) {
        return this.containsProvider(dataNodeId) ? (AbstractSubscriptionProvider)this.subscriptionProviders.get(dataNodeId) : null;
    }

    boolean hasNoAvailableProviders() {
        return this.subscriptionProviders.values().stream().noneMatch(AbstractSubscriptionProvider::isAvailable);
    }

    boolean containsProvider(int dataNodeId) {
        return this.subscriptionProviders.containsKey(dataNodeId);
    }

    List<AbstractSubscriptionProvider> getAllAvailableProviders() {
        return this.subscriptionProviders.values().stream().filter(AbstractSubscriptionProvider::isAvailable).collect(Collectors.toList());
    }

    void updateNextDataNodeId() {
        SortedMap<Integer, AbstractSubscriptionProvider> subProviders = this.subscriptionProviders.tailMap(this.nextDataNodeId + 1);
        this.nextDataNodeId = subProviders.isEmpty() ? this.subscriptionProviders.firstKey() : subProviders.firstKey();
    }

    AbstractSubscriptionProvider getNextAvailableProvider() {
        if (this.hasNoAvailableProviders()) {
            return null;
        }
        AbstractSubscriptionProvider provider = this.getProvider(this.nextDataNodeId);
        while (Objects.isNull(provider) || !provider.isAvailable()) {
            this.updateNextDataNodeId();
            provider = this.getProvider(this.nextDataNodeId);
        }
        this.updateNextDataNodeId();
        return provider;
    }

    void heartbeat(AbstractSubscriptionConsumer consumer) {
        if (consumer.isClosed()) {
            return;
        }
        this.acquireWriteLock();
        try {
            this.heartbeatInternal(consumer);
        }
        finally {
            this.releaseWriteLock();
        }
    }

    private void heartbeatInternal(AbstractSubscriptionConsumer consumer) {
        for (AbstractSubscriptionProvider provider : this.getAllProviders()) {
            try {
                PipeSubscribeHeartbeatResp resp = provider.heartbeat();
                consumer.subscribedTopics = resp.getTopics();
                for (String topicName : resp.getTopicNamesToUnsubscribe()) {
                    LOGGER.info("Termination occurred when SubscriptionConsumer {} polling topics, unsubscribe topic {} automatically", (Object)consumer.coreReportMessage(), (Object)topicName);
                    consumer.unsubscribe(topicName);
                }
                provider.setAvailable();
            }
            catch (Exception e) {
                LOGGER.warn("{} failed to sending heartbeat to subscription provider {} because of {}, set subscription provider unavailable", consumer, provider, e, e);
                provider.setUnavailable();
            }
        }
    }

    void sync(AbstractSubscriptionConsumer consumer) {
        if (consumer.isClosed()) {
            return;
        }
        this.acquireWriteLock();
        try {
            this.syncInternal(consumer);
        }
        finally {
            this.releaseWriteLock();
        }
    }

    private void syncInternal(AbstractSubscriptionConsumer consumer) {
        Map<Integer, TEndPoint> allEndPoints;
        if (this.hasNoAvailableProviders()) {
            try {
                this.openProviders(consumer);
            }
            catch (Exception e) {
                LOGGER.warn("Failed to open providers for consumer {} because of {}", consumer, e, e);
                return;
            }
        }
        try {
            allEndPoints = consumer.fetchAllEndPointsWithRedirection();
        }
        catch (Exception e) {
            LOGGER.warn("Failed to fetch all endpoints for consumer {} because of {}", consumer, e, e);
            return;
        }
        for (Map.Entry<Integer, TEndPoint> entry : allEndPoints.entrySet()) {
            AbstractSubscriptionProvider provider = this.getProvider(entry.getKey());
            if (Objects.isNull(provider)) {
                AbstractSubscriptionProvider newProvider;
                TEndPoint endPoint = entry.getValue();
                try {
                    newProvider = consumer.constructProviderAndHandshake(endPoint);
                }
                catch (Exception e) {
                    LOGGER.warn("{} failed to create connection with {} because of {}", consumer, endPoint, e, e);
                    continue;
                }
                this.addProvider(entry.getKey(), newProvider);
                continue;
            }
            try {
                consumer.subscribedTopics = provider.heartbeat().getTopics();
                provider.setAvailable();
            }
            catch (Exception e) {
                LOGGER.warn("{} failed to sending heartbeat to subscription provider {} because of {}, set subscription provider unavailable", consumer, provider, e, e);
                provider.setUnavailable();
            }
            if (provider.isAvailable()) continue;
            try {
                this.closeAndRemoveProvider(entry.getKey());
            }
            catch (Exception e) {
                LOGGER.warn("Exception occurred when {} closing and removing subscription provider {} because of {}", consumer, provider, e, e);
            }
        }
        for (AbstractSubscriptionProvider provider : this.getAllProviders()) {
            int dataNodeId = provider.getDataNodeId();
            if (allEndPoints.containsKey(dataNodeId)) continue;
            try {
                this.closeAndRemoveProvider(dataNodeId);
            }
            catch (Exception e) {
                LOGGER.warn("Exception occurred when {} closing and removing subscription provider {} because of {}", consumer, provider, e, e);
            }
        }
    }

    public String toString() {
        StringBuilder sb = new StringBuilder();
        sb.append("SubscriptionProviders{");
        for (Map.Entry<Integer, AbstractSubscriptionProvider> entry : this.subscriptionProviders.entrySet()) {
            sb.append(entry.getValue().toString()).append(", ");
        }
        if (!this.subscriptionProviders.isEmpty()) {
            sb.delete(sb.length() - 2, sb.length());
        }
        sb.append("}");
        return sb.toString();
    }
}

