/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.integration.jdbc.channel;

import java.sql.SQLException;
import java.sql.Statement;
import java.time.Duration;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.postgresql.PGNotification;
import org.postgresql.jdbc.PgConnection;
import org.springframework.context.SmartLifecycle;
import org.springframework.core.log.LogAccessor;
import org.springframework.core.task.AsyncTaskExecutor;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.integration.jdbc.channel.PgConnectionSupplier;
import org.springframework.integration.util.UUIDConverter;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;

public final class PostgresChannelMessageTableSubscriber
implements SmartLifecycle {
    private static final LogAccessor LOGGER = new LogAccessor(PostgresChannelMessageTableSubscriber.class);
    private final Lock lock = new ReentrantLock();
    private final Map<String, Set<Subscription>> subscriptionsMap = new ConcurrentHashMap<String, Set<Subscription>>();
    private final PgConnectionSupplier connectionSupplier;
    private final String tablePrefix;
    private AsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor("postgres-channel-message-table-subscriber-");
    private CountDownLatch latch = new CountDownLatch(0);
    private Future<?> future = CompletableFuture.completedFuture(null);
    @Nullable
    private volatile PgConnection connection;
    private Duration notificationTimeout = Duration.ofSeconds(60L);

    public PostgresChannelMessageTableSubscriber(PgConnectionSupplier connectionSupplier) {
        this(connectionSupplier, "INT_");
    }

    public PostgresChannelMessageTableSubscriber(PgConnectionSupplier connectionSupplier, String tablePrefix) {
        Assert.notNull((Object)connectionSupplier, (String)"A connectionSupplier must be provided.");
        Assert.notNull((Object)tablePrefix, (String)"A table prefix must be set.");
        this.connectionSupplier = connectionSupplier;
        this.tablePrefix = tablePrefix;
    }

    public void setTaskExecutor(AsyncTaskExecutor taskExecutor) {
        Assert.notNull((Object)taskExecutor, (String)"A 'taskExecutor' must not be null.");
        this.taskExecutor = taskExecutor;
    }

    public void setNotificationTimeout(Duration notificationTimeout) {
        Assert.notNull((Object)notificationTimeout, (String)"'notificationTimeout' must not be null.");
        this.notificationTimeout = notificationTimeout;
    }

    public boolean subscribe(Subscription subscription) {
        String subscriptionKey = subscription.getRegion() + " " + PostgresChannelMessageTableSubscriber.getKey(subscription.getGroupId());
        Set subscriptions = this.subscriptionsMap.computeIfAbsent(subscriptionKey, __ -> ConcurrentHashMap.newKeySet());
        return subscriptions.add(subscription);
    }

    public boolean unsubscribe(Subscription subscription) {
        String subscriptionKey = subscription.getRegion() + " " + PostgresChannelMessageTableSubscriber.getKey(subscription.getGroupId());
        Set<Subscription> subscriptions = this.subscriptionsMap.get(subscriptionKey);
        return subscriptions != null && subscriptions.remove(subscription);
    }

    public void start() {
        this.lock.lock();
        try {
            if (this.latch.getCount() > 0L) {
                return;
            }
            this.latch = new CountDownLatch(1);
            CountDownLatch startingLatch = new CountDownLatch(1);
            this.future = this.taskExecutor.submit(() -> this.doStart(startingLatch));
            try {
                if (!startingLatch.await(5L, TimeUnit.SECONDS)) {
                    throw new IllegalStateException("Failed to start " + String.valueOf(this));
                }
            }
            catch (InterruptedException ex) {
                Thread.currentThread().interrupt();
                throw new IllegalStateException("Failed to start " + String.valueOf(this), ex);
            }
        }
        finally {
            this.lock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    private void doStart(CountDownLatch startingLatch) {
        try {
            block20: while (this.isActive()) {
                try {
                    PgConnection conn = this.connectionSupplier.get();
                    try (Statement stmt = conn.createStatement();){
                        stmt.execute("LISTEN " + this.tablePrefix.toLowerCase(Locale.ROOT) + "channel_message_notify");
                    }
                    catch (Exception ex) {
                        try {
                            conn.close();
                            throw ex;
                        }
                        catch (Exception suppressed) {
                            ex.addSuppressed(suppressed);
                        }
                        throw ex;
                    }
                    this.subscriptionsMap.values().forEach(subscriptions -> subscriptions.forEach(Subscription::notifyUpdate));
                    try {
                        this.connection = conn;
                        block21: while (true) {
                            if (!this.isActive()) continue block20;
                            startingLatch.countDown();
                            PGNotification[] notifications = conn.getNotifications((int)this.notificationTimeout.toMillis());
                            if (!this.isActive()) {
                                return;
                            }
                            if ((notifications == null || notifications.length == 0) && !conn.isValid(1)) continue block20;
                            PGNotification[] pGNotificationArray = notifications;
                            int n = pGNotificationArray.length;
                            int n2 = 0;
                            while (true) {
                                if (n2 >= n) continue block21;
                                PGNotification notification = pGNotificationArray[n2];
                                String parameter = notification.getParameter();
                                Set<Subscription> subscriptions2 = this.subscriptionsMap.get(parameter);
                                if (subscriptions2 != null) {
                                    for (Subscription subscription : subscriptions2) {
                                        subscription.notifyUpdate();
                                    }
                                }
                                ++n2;
                            }
                            break;
                        }
                    }
                    finally {
                        conn.close();
                    }
                }
                catch (Exception e) {
                    if (!this.isActive()) continue;
                    LOGGER.error((Throwable)e, (CharSequence)"Failed to poll notifications from Postgres database");
                }
            }
            return;
        }
        finally {
            this.latch.countDown();
        }
    }

    private boolean isActive() {
        if (Thread.interrupted()) {
            Thread.currentThread().interrupt();
            return false;
        }
        return true;
    }

    public void stop() {
        this.lock.lock();
        try {
            if (this.future.isDone()) {
                return;
            }
            this.future.cancel(true);
            PgConnection conn = this.connection;
            if (conn != null) {
                try {
                    conn.close();
                }
                catch (SQLException sQLException) {
                    // empty catch block
                }
            }
            try {
                if (!this.latch.await(5L, TimeUnit.SECONDS)) {
                    throw new IllegalStateException("Failed to stop " + String.valueOf(this));
                }
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
        }
        finally {
            this.lock.unlock();
        }
    }

    public boolean isRunning() {
        return this.latch.getCount() > 0L;
    }

    private static String getKey(Object input) {
        return input == null ? null : UUIDConverter.getUUID((Object)input).toString();
    }

    public static interface Subscription {
        public void notifyUpdate();

        public String getRegion();

        public Object getGroupId();
    }
}

