/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.client.impl;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiConsumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.ReaderBuilder;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TableView;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.TableViewConfigurationData;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.shade.client.api.v2.Reader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TableViewImpl<T>
implements TableView<T> {
    private static final Logger log = LoggerFactory.getLogger(TableViewImpl.class);
    private final TableViewConfigurationData conf;
    private final ConcurrentMap<String, T> data;
    private final Map<String, T> immutableData;
    private final CompletableFuture<Reader<T>> reader;
    private final List<BiConsumer<String, T>> listeners;
    private final ReentrantLock listenersMutex;
    private final boolean isPersistentTopic;

    TableViewImpl(PulsarClientImpl client, Schema<T> schema, TableViewConfigurationData conf) {
        this.conf = conf;
        this.isPersistentTopic = conf.getTopicName().startsWith(TopicDomain.persistent.toString());
        this.data = new ConcurrentHashMap<String, T>();
        this.immutableData = Collections.unmodifiableMap(this.data);
        this.listeners = new ArrayList<BiConsumer<String, T>>();
        this.listenersMutex = new ReentrantLock();
        ReaderBuilder<T> readerBuilder = client.newReader(schema).topic(conf.getTopicName()).startMessageId(MessageId.earliest).autoUpdatePartitions(true).autoUpdatePartitionsInterval((int)conf.getAutoUpdatePartitionsSeconds(), TimeUnit.SECONDS).poolMessages(true);
        if (this.isPersistentTopic) {
            readerBuilder.readCompacted(true);
        }
        this.reader = readerBuilder.createAsync();
    }

    CompletableFuture<TableView<T>> start() {
        return ((CompletableFuture)this.reader.thenCompose(reader -> {
            if (!this.isPersistentTopic) {
                this.readTailMessages((Reader<T>)reader);
                return CompletableFuture.completedFuture(reader);
            }
            return this.readAllExistingMessages((Reader<T>)reader);
        })).thenApply(__ -> this);
    }

    @Override
    public int size() {
        return this.data.size();
    }

    @Override
    public boolean isEmpty() {
        return this.data.isEmpty();
    }

    @Override
    public boolean containsKey(String key) {
        return this.data.containsKey(key);
    }

    @Override
    public T get(String key) {
        return (T)this.data.get(key);
    }

    @Override
    public Set<Map.Entry<String, T>> entrySet() {
        return this.immutableData.entrySet();
    }

    @Override
    public Set<String> keySet() {
        return this.immutableData.keySet();
    }

    @Override
    public Collection<T> values() {
        return this.immutableData.values();
    }

    @Override
    public void forEach(BiConsumer<String, T> action) {
        this.data.forEach(action);
    }

    @Override
    public void forEachAndListen(BiConsumer<String, T> action) {
        try {
            this.listenersMutex.lock();
            this.forEach(action);
            this.listeners.add(action);
        }
        finally {
            this.listenersMutex.unlock();
        }
    }

    @Override
    public CompletableFuture<Void> closeAsync() {
        return this.reader.thenCompose(Reader::closeAsync);
    }

    @Override
    public void close() throws PulsarClientException {
        try {
            this.closeAsync().get();
        }
        catch (Exception e) {
            throw PulsarClientException.unwrap(e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void handleMessage(Message<T> msg) {
        block12: {
            try {
                if (!msg.hasKey()) break block12;
                if (log.isDebugEnabled()) {
                    log.debug("Applying message from topic {}. key={} value={}", new Object[]{this.conf.getTopicName(), msg.getKey(), msg.getValue()});
                }
                try {
                    this.listenersMutex.lock();
                    if (null == msg.getValue()) {
                        this.data.remove(msg.getKey());
                    } else {
                        this.data.put(msg.getKey(), msg.getValue());
                    }
                    for (BiConsumer<String, String> biConsumer : this.listeners) {
                        try {
                            biConsumer.accept(msg.getKey(), (String)msg.getValue());
                        }
                        catch (Throwable t2) {
                            log.error("Table view listener raised an exception", t2);
                        }
                    }
                }
                finally {
                    this.listenersMutex.unlock();
                }
            }
            finally {
                msg.release();
            }
        }
    }

    private CompletableFuture<Reader<T>> readAllExistingMessages(Reader<T> reader) {
        long startTime = System.nanoTime();
        AtomicLong messagesRead = new AtomicLong();
        CompletableFuture<Reader<T>> future = new CompletableFuture<Reader<T>>();
        this.readAllExistingMessages(reader, future, startTime, messagesRead);
        return future;
    }

    private void readAllExistingMessages(Reader<T> reader, CompletableFuture<Reader<T>> future, long startTime, AtomicLong messagesRead) {
        reader.hasMessageAvailableAsync().thenAccept(hasMessage -> {
            if (hasMessage.booleanValue()) {
                ((CompletableFuture)reader.readNextAsync().thenAccept(msg -> {
                    messagesRead.incrementAndGet();
                    this.handleMessage((Message<T>)msg);
                    this.readAllExistingMessages(reader, future, startTime, messagesRead);
                })).exceptionally(ex -> {
                    future.completeExceptionally((Throwable)ex);
                    return null;
                });
            } else {
                long endTime = System.nanoTime();
                long durationMillis = TimeUnit.NANOSECONDS.toMillis(endTime - startTime);
                log.info("Started table view for topic {} - Replayed {} messages in {} seconds", new Object[]{reader.getTopic(), messagesRead, (double)durationMillis / 1000.0});
                future.complete(reader);
                this.readTailMessages(reader);
            }
        });
    }

    private void readTailMessages(Reader<T> reader) {
        ((CompletableFuture)reader.readNextAsync().thenAccept(msg -> {
            this.handleMessage((Message<T>)msg);
            this.readTailMessages(reader);
        })).exceptionally(ex -> {
            log.info("Reader {} was interrupted", (Object)reader.getTopic());
            return null;
        });
    }
}

