/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.integration.jdbc.channel;

import java.lang.invoke.LambdaMetafactory;
import java.sql.SQLException;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.postgresql.PGNotification;
import org.postgresql.jdbc.PgConnection;
import org.springframework.context.SmartLifecycle;
import org.springframework.core.log.LogAccessor;
import org.springframework.integration.jdbc.channel.PgConnectionSupplier;
import org.springframework.integration.util.UUIDConverter;
import org.springframework.lang.Nullable;
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
import org.springframework.util.Assert;

public final class PostgresChannelMessageTableSubscriber
implements SmartLifecycle {
    private static final LogAccessor LOGGER = new LogAccessor(PostgresChannelMessageTableSubscriber.class);
    private final Map<String, Set<Subscription>> subscriptionsMap = new ConcurrentHashMap<String, Set<Subscription>>();
    private final PgConnectionSupplier connectionSupplier;
    private final String tablePrefix;
    @Nullable
    private ExecutorService executor;
    private CountDownLatch latch = new CountDownLatch(0);
    private Future<?> future = CompletableFuture.completedFuture(null);
    @Nullable
    private volatile PgConnection connection;

    public PostgresChannelMessageTableSubscriber(PgConnectionSupplier connectionSupplier) {
        this(connectionSupplier, "INT_");
    }

    public PostgresChannelMessageTableSubscriber(PgConnectionSupplier connectionSupplier, String tablePrefix) {
        Assert.notNull((Object)connectionSupplier, (String)"A connectionSupplier must be provided.");
        Assert.notNull((Object)tablePrefix, (String)"A table prefix must be set.");
        this.connectionSupplier = connectionSupplier;
        this.tablePrefix = tablePrefix;
    }

    public synchronized void setExecutor(@Nullable ExecutorService executor) {
        this.executor = executor;
    }

    public boolean subscribe(Subscription subscription) {
        String subscriptionKey = subscription.getRegion() + " " + PostgresChannelMessageTableSubscriber.getKey(subscription.getGroupId());
        Set subscriptions = this.subscriptionsMap.computeIfAbsent(subscriptionKey, __ -> ConcurrentHashMap.newKeySet());
        return subscriptions.add(subscription);
    }

    public boolean unsubscribe(Subscription subscription) {
        String subscriptionKey = subscription.getRegion() + " " + PostgresChannelMessageTableSubscriber.getKey(subscription.getGroupId());
        Set<Subscription> subscriptions = this.subscriptionsMap.get(subscriptionKey);
        return subscriptions != null && subscriptions.remove(subscription);
    }

    public synchronized void start() {
        if (this.latch.getCount() > 0L) {
            return;
        }
        ExecutorService executorToUse = this.executor;
        if (executorToUse == null) {
            CustomizableThreadFactory threadFactory = new CustomizableThreadFactory("postgres-channel-message-table-subscriber-");
            threadFactory.setDaemon(true);
            this.executor = executorToUse = Executors.newSingleThreadExecutor((ThreadFactory)threadFactory);
        }
        this.latch = new CountDownLatch(1);
        this.future = executorToUse.submit(this::lambda$start$2);
    }

    private boolean isActive() {
        if (Thread.interrupted()) {
            Thread.currentThread().interrupt();
            return false;
        }
        return true;
    }

    public synchronized void stop() {
        if (this.future.isDone()) {
            return;
        }
        this.future.cancel(true);
        PgConnection conn = this.connection;
        if (conn != null) {
            try {
                conn.close();
            }
            catch (SQLException sQLException) {
                // empty catch block
            }
        }
        try {
            if (!this.latch.await(5L, TimeUnit.SECONDS)) {
                throw new IllegalStateException("Failed to stop " + PostgresChannelMessageTableSubscriber.class.getName());
            }
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
    }

    public boolean isRunning() {
        return this.latch.getCount() > 0L;
    }

    private static String getKey(Object input) {
        return input == null ? null : UUIDConverter.getUUID((Object)input).toString();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * WARNING - Removed back jump from a try to a catch block - possible behaviour change.
     * Unable to fully structure code
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    private /* synthetic */ void lambda$start$2() {
        try {
            while (this.isActive() != false) {
                block27: {
                    try {
                        block26: {
                            conn = this.connectionSupplier.get();
                            try {
                                stmt = conn.createStatement();
                                try {
                                    stmt.execute("LISTEN " + this.tablePrefix.toLowerCase() + "channel_message_notify");
                                }
                                finally {
                                    if (stmt != null) {
                                        stmt.close();
                                    }
                                }
                            }
                            catch (Exception ex) {
                                try {
                                    conn.close();
                                    throw ex;
                                }
                                catch (Exception suppressed) {
                                    ex.addSuppressed(suppressed);
                                }
                                throw ex;
                            }
                            this.subscriptionsMap.values().forEach((Consumer<Set>)LambdaMetafactory.metafactory(null, null, null, (Ljava/lang/Object;)V, lambda$start$1(java.util.Set ), (Ljava/util/Set;)V)());
                            try {
                                this.connection = conn;
lbl26:
                                // 3 sources

                                while (this.isActive()) {
                                    notifications = conn.getNotifications(0);
                                    if (!this.isActive()) {
                                        break block26;
                                    }
                                    ** GOTO lbl-1000
                                }
                                break block27;
                            }
                            catch (Throwable var11_16) {
                                conn.close();
                                throw var11_16;
                            }
                        }
                        conn.close();
                        return;
                    }
                    catch (Exception e) {
                        if (!this.isActive()) continue;
                        PostgresChannelMessageTableSubscriber.LOGGER.error((Throwable)e, (CharSequence)"Failed to poll notifications from Postgres database");
                        continue;
                    }
lbl-1000:
                    // 1 sources

                    {
                        if (notifications == null) ** GOTO lbl26
                        for (PGNotification notification : notifications) {
                            parameter = notification.getParameter();
                            subscriptions = this.subscriptionsMap.get(parameter);
                            if (subscriptions == null) continue;
                            for (Subscription subscription : subscriptions) {
                                subscription.notifyUpdate();
                            }
                        }
                        ** GOTO lbl26
                    }
                }
                conn.close();
            }
            return;
        }
        finally {
            this.latch.countDown();
        }
    }

    private static /* synthetic */ void lambda$start$1(Set subscriptions) {
        subscriptions.forEach(Subscription::notifyUpdate);
    }

    public static interface Subscription {
        public void notifyUpdate();

        public String getRegion();

        public Object getGroupId();
    }
}

