/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.integration.jdbc.channel;

import java.util.concurrent.Executor;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.integration.channel.AbstractSubscribableChannel;
import org.springframework.integration.dispatcher.MessageDispatcher;
import org.springframework.integration.dispatcher.UnicastingDispatcher;
import org.springframework.integration.jdbc.channel.PostgresChannelMessageTableSubscriber;
import org.springframework.integration.jdbc.store.JdbcChannelMessageStore;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.util.Assert;

public class PostgresSubscribableChannel
extends AbstractSubscribableChannel
implements PostgresChannelMessageTableSubscriber.Subscription {
    private final JdbcChannelMessageStore jdbcChannelMessageStore;
    private final Object groupId;
    private final PostgresChannelMessageTableSubscriber messageTableSubscriber;
    private UnicastingDispatcher dispatcher = new UnicastingDispatcher((Executor)new SimpleAsyncTaskExecutor());

    public PostgresSubscribableChannel(JdbcChannelMessageStore jdbcChannelMessageStore, Object groupId, PostgresChannelMessageTableSubscriber messageTableSubscriber) {
        Assert.notNull((Object)jdbcChannelMessageStore, (String)"A jdbcChannelMessageStore must be provided.");
        Assert.notNull((Object)groupId, (String)"A groupId must be set.");
        Assert.notNull((Object)messageTableSubscriber, (String)"A messageTableSubscriber must be set.");
        this.jdbcChannelMessageStore = jdbcChannelMessageStore;
        this.groupId = groupId;
        this.messageTableSubscriber = messageTableSubscriber;
    }

    public void setDispatcherExecutor(Executor executor) {
        Assert.notNull((Object)executor, (String)"An executor must be provided.");
        this.dispatcher = new UnicastingDispatcher(executor);
    }

    public boolean subscribe(MessageHandler handler) {
        boolean subscribed = super.subscribe(handler);
        if (this.dispatcher.getHandlerCount() == 1) {
            this.messageTableSubscriber.subscribe(this);
            this.notifyUpdate();
        }
        return subscribed;
    }

    public boolean unsubscribe(MessageHandler handle) {
        boolean unsubscribed = super.unsubscribe(handle);
        if (this.dispatcher.getHandlerCount() == 0) {
            this.messageTableSubscriber.unsubscribe(this);
        }
        return unsubscribed;
    }

    protected MessageDispatcher getDispatcher() {
        return this.dispatcher;
    }

    protected boolean doSend(Message<?> message, long timeout) {
        this.jdbcChannelMessageStore.addMessageToGroup(this.groupId, message);
        return true;
    }

    @Override
    public void notifyUpdate() {
        Message<?> message;
        while ((message = this.jdbcChannelMessageStore.pollMessageFromGroup(this.groupId)) != null) {
            this.dispatcher.dispatch(message);
        }
    }

    @Override
    public String getRegion() {
        return this.jdbcChannelMessageStore.getRegion();
    }

    @Override
    public Object getGroupId() {
        return this.groupId;
    }
}

