/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.client.impl;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiConsumer;
import org.apache.pulsar.client.api.CryptoKeyReader;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageIdAdv;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.ReaderBuilder;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TableView;
import org.apache.pulsar.client.api.TopicMessageId;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.TableViewConfigurationData;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.topics.TopicCompactionStrategy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TableViewImpl<T>
implements TableView<T> {
    private static final Logger log = LoggerFactory.getLogger(TableViewImpl.class);
    private final TableViewConfigurationData conf;
    private final ConcurrentMap<String, T> data;
    private final Map<String, T> immutableData;
    private final CompletableFuture<Reader<T>> reader;
    private final List<BiConsumer<String, T>> listeners;
    private final ReentrantLock listenersMutex;
    private final boolean isPersistentTopic;
    private TopicCompactionStrategy<T> compactionStrategy;
    private final ConcurrentHashMap<CompletableFuture<Void>, Map<String, TopicMessageId>> pendingRefreshRequests;
    private final ConcurrentHashMap<String, MessageId> lastReadPositions;

    TableViewImpl(PulsarClientImpl client, Schema<T> schema, TableViewConfigurationData conf) {
        CryptoKeyReader cryptoKeyReader;
        this.conf = conf;
        this.isPersistentTopic = conf.getTopicName().startsWith(TopicDomain.persistent.toString());
        this.data = new ConcurrentHashMap<String, T>();
        this.immutableData = Collections.unmodifiableMap(this.data);
        this.listeners = new ArrayList<BiConsumer<String, T>>();
        this.listenersMutex = new ReentrantLock();
        this.compactionStrategy = TopicCompactionStrategy.load("table-view", conf.getTopicCompactionStrategyClassName());
        this.pendingRefreshRequests = new ConcurrentHashMap();
        this.lastReadPositions = new ConcurrentHashMap();
        ReaderBuilder readerBuilder = client.newReader(schema).topic(conf.getTopicName()).startMessageId(MessageId.earliest).autoUpdatePartitions(true).autoUpdatePartitionsInterval((int)conf.getAutoUpdatePartitionsSeconds(), TimeUnit.SECONDS).poolMessages(true).subscriptionName(conf.getSubscriptionName());
        if (this.isPersistentTopic) {
            readerBuilder.readCompacted(true);
        }
        if ((cryptoKeyReader = conf.getCryptoKeyReader()) != null) {
            readerBuilder.cryptoKeyReader(cryptoKeyReader);
        }
        readerBuilder.cryptoFailureAction(conf.getCryptoFailureAction());
        this.reader = readerBuilder.createAsync();
    }

    CompletableFuture<TableView<T>> start() {
        return ((CompletableFuture)this.reader.thenCompose(reader -> {
            if (!this.isPersistentTopic) {
                this.readTailMessages((Reader<T>)reader);
                return CompletableFuture.completedFuture(null);
            }
            return this.readAllExistingMessages((Reader<T>)reader).thenRun(() -> this.readTailMessages((Reader<T>)reader));
        })).thenApply(__ -> this);
    }

    public int size() {
        return this.data.size();
    }

    public boolean isEmpty() {
        return this.data.isEmpty();
    }

    public boolean containsKey(String key) {
        return this.data.containsKey(key);
    }

    public T get(String key) {
        return (T)this.data.get(key);
    }

    public Set<Map.Entry<String, T>> entrySet() {
        return this.immutableData.entrySet();
    }

    public Set<String> keySet() {
        return this.immutableData.keySet();
    }

    public Collection<T> values() {
        return this.immutableData.values();
    }

    public void forEach(BiConsumer<String, T> action) {
        this.data.forEach(action);
    }

    public void listen(BiConsumer<String, T> action) {
        try {
            this.listenersMutex.lock();
            this.listeners.add(action);
        }
        finally {
            this.listenersMutex.unlock();
        }
    }

    public void forEachAndListen(BiConsumer<String, T> action) {
        try {
            this.listenersMutex.lock();
            this.forEach(action);
            this.listeners.add(action);
        }
        finally {
            this.listenersMutex.unlock();
        }
    }

    public CompletableFuture<Void> closeAsync() {
        return this.reader.thenCompose(Reader::closeAsync);
    }

    public void close() throws PulsarClientException {
        try {
            this.closeAsync().get();
        }
        catch (Exception e) {
            throw PulsarClientException.unwrap((Throwable)e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void handleMessage(Message<T> msg) {
        this.lastReadPositions.put(msg.getTopicName(), msg.getMessageId());
        try {
            if (msg.hasKey()) {
                Object cur;
                String key = msg.getKey();
                Object object = cur = msg.size() > 0 ? msg.getValue() : null;
                if (log.isDebugEnabled()) {
                    log.debug("Applying message from topic {}. key={} value={}", new Object[]{this.conf.getTopicName(), key, cur});
                }
                boolean update = true;
                if (this.compactionStrategy != null) {
                    Object prev = this.data.get(key);
                    boolean bl = update = !this.compactionStrategy.shouldKeepLeft(prev, cur);
                    if (!update) {
                        log.info("Skipped the message from topic {}. key={} value={} prev={}", new Object[]{this.conf.getTopicName(), key, cur, prev});
                        this.compactionStrategy.handleSkippedMessage(key, cur);
                    }
                }
                if (update) {
                    try {
                        this.listenersMutex.lock();
                        if (null == cur) {
                            this.data.remove(key);
                        } else {
                            this.data.put(key, cur);
                        }
                        for (BiConsumer<String, String> biConsumer : this.listeners) {
                            try {
                                biConsumer.accept(key, (String)cur);
                            }
                            catch (Throwable t2) {
                                log.error("Table view listener raised an exception", t2);
                            }
                        }
                    }
                    finally {
                        this.listenersMutex.unlock();
                    }
                }
            }
            this.checkAllFreshTask(msg);
        }
        finally {
            msg.release();
        }
    }

    public CompletableFuture<Void> refreshAsync() {
        CompletableFuture<Void> completableFuture = new CompletableFuture<Void>();
        ((CompletableFuture)this.reader.thenCompose(reader -> this.getLastMessageIdOfNonEmptyTopics((Reader<T>)reader).thenAccept(lastMessageIds -> {
            if (lastMessageIds.isEmpty()) {
                completableFuture.complete(null);
                return;
            }
            this.pendingRefreshRequests.put(completableFuture, (Map<String, TopicMessageId>)lastMessageIds);
            this.filterReceivedMessages((Map<String, TopicMessageId>)lastMessageIds);
            if (lastMessageIds.isEmpty()) {
                this.pendingRefreshRequests.remove(completableFuture);
                completableFuture.complete(null);
            }
        }))).exceptionally(throwable -> {
            completableFuture.completeExceptionally((Throwable)throwable);
            this.pendingRefreshRequests.remove(completableFuture);
            return null;
        });
        return completableFuture;
    }

    public void refresh() throws PulsarClientException {
        try {
            this.refreshAsync().get();
        }
        catch (Exception e) {
            throw PulsarClientException.unwrap((Throwable)e);
        }
    }

    private CompletableFuture<Void> readAllExistingMessages(Reader<T> reader) {
        long startTime = System.nanoTime();
        AtomicLong messagesRead = new AtomicLong();
        CompletableFuture<Void> future = new CompletableFuture<Void>();
        ((CompletableFuture)this.getLastMessageIdOfNonEmptyTopics(reader).thenAccept(lastMessageIds -> {
            if (lastMessageIds.isEmpty()) {
                future.complete(null);
                return;
            }
            this.readAllExistingMessages(reader, future, startTime, messagesRead, (Map<String, TopicMessageId>)lastMessageIds);
        })).exceptionally(ex -> {
            future.completeExceptionally((Throwable)ex);
            return null;
        });
        return future;
    }

    private CompletableFuture<Map<String, TopicMessageId>> getLastMessageIdOfNonEmptyTopics(Reader<T> reader) {
        return reader.getLastMessageIdsAsync().thenApply(lastMessageIds -> {
            ConcurrentHashMap lastMessageIdMap = new ConcurrentHashMap();
            lastMessageIds.forEach((? super T topicMessageId) -> {
                if (((MessageIdAdv)topicMessageId).getEntryId() >= 0L) {
                    lastMessageIdMap.put(topicMessageId.getOwnerTopic(), topicMessageId);
                }
            });
            return lastMessageIdMap;
        });
    }

    private void filterReceivedMessages(Map<String, TopicMessageId> lastMessageIds) {
        lastMessageIds.forEach((? super K partition, ? super V lastMessageId) -> {
            MessageId messageId = this.lastReadPositions.get(partition);
            if (messageId != null && lastMessageId.compareTo((Object)messageId) <= 0) {
                lastMessageIds.remove(partition);
            }
        });
    }

    private boolean checkFreshTask(Map<String, TopicMessageId> maxMessageIds, CompletableFuture<Void> future, MessageId messageId, String topicName) {
        TopicMessageId maxMessageId = maxMessageIds.get(topicName);
        if (maxMessageId != null && messageId.compareTo((Object)maxMessageId) >= 0) {
            maxMessageIds.remove(topicName);
        }
        if (maxMessageIds.isEmpty()) {
            future.complete(null);
            return true;
        }
        return false;
    }

    private void checkAllFreshTask(Message<T> msg) {
        this.pendingRefreshRequests.forEach((? super K future, ? super V maxMessageIds) -> {
            String topicName = msg.getTopicName();
            MessageId messageId = msg.getMessageId();
            if (this.checkFreshTask((Map<String, TopicMessageId>)maxMessageIds, (CompletableFuture<Void>)future, messageId, topicName)) {
                this.pendingRefreshRequests.remove(future);
            }
        });
    }

    private void readAllExistingMessages(Reader<T> reader, CompletableFuture<Void> future, long startTime, AtomicLong messagesRead, Map<String, TopicMessageId> maxMessageIds) {
        reader.hasMessageAvailableAsync().thenAccept(hasMessage -> {
            if (hasMessage.booleanValue()) {
                ((CompletableFuture)reader.readNextAsync().thenAccept(msg -> {
                    messagesRead.incrementAndGet();
                    String topicName = msg.getTopicName();
                    MessageId messageId = msg.getMessageId();
                    this.handleMessage((Message<T>)msg);
                    if (!this.checkFreshTask(maxMessageIds, future, messageId, topicName)) {
                        this.readAllExistingMessages(reader, future, startTime, messagesRead, maxMessageIds);
                    }
                })).exceptionally(ex -> {
                    if (ex.getCause() instanceof PulsarClientException.AlreadyClosedException) {
                        log.info("Reader {} was closed while reading existing messages.", (Object)reader.getTopic());
                    } else {
                        log.warn("Reader {} was interrupted while reading existing messages. ", (Object)reader.getTopic(), ex);
                    }
                    future.completeExceptionally((Throwable)ex);
                    return null;
                });
            } else {
                long endTime = System.nanoTime();
                long durationMillis = TimeUnit.NANOSECONDS.toMillis(endTime - startTime);
                log.info("Started table view for topic {} - Replayed {} messages in {} seconds", new Object[]{reader.getTopic(), messagesRead, (double)durationMillis / 1000.0});
                future.complete(null);
            }
        });
    }

    private void readTailMessages(Reader<T> reader) {
        ((CompletableFuture)reader.readNextAsync().thenAccept(msg -> {
            this.handleMessage((Message<T>)msg);
            this.readTailMessages(reader);
        })).exceptionally(ex -> {
            if (ex.getCause() instanceof PulsarClientException.AlreadyClosedException) {
                log.info("Reader {} was closed while reading tail messages.", (Object)reader.getTopic());
                ((ConcurrentHashMap.KeySetView)this.pendingRefreshRequests.keySet()).forEach((? super K future) -> {
                    this.pendingRefreshRequests.remove(future);
                    future.completeExceptionally((Throwable)ex);
                });
            } else {
                try {
                    Thread.sleep(50L);
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
                log.warn("Reader {} was interrupted while reading tail messages. Retrying..", (Object)reader.getTopic(), ex);
                this.readTailMessages(reader);
            }
            return null;
        });
    }
}

