/*
 * Decompiled with CFR 0.152.
 */
package org.noear.folkmq.client;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import org.noear.folkmq.FolkMQ;
import org.noear.folkmq.client.MqAlarm;
import org.noear.folkmq.client.MqClient;
import org.noear.folkmq.client.MqClientInternal;
import org.noear.folkmq.client.MqClientListener;
import org.noear.folkmq.client.MqConsumeHandler;
import org.noear.folkmq.client.MqMessage;
import org.noear.folkmq.client.MqMessageReceivedImpl;
import org.noear.folkmq.client.MqSubscription;
import org.noear.folkmq.client.MqTransaction;
import org.noear.folkmq.client.MqTransactionCheckback;
import org.noear.folkmq.client.MqTransactionImpl;
import org.noear.folkmq.common.MqAssert;
import org.noear.folkmq.common.MqTopicHelper;
import org.noear.folkmq.common.MqUtils;
import org.noear.folkmq.exception.FolkmqException;
import org.noear.socketd.SocketD;
import org.noear.socketd.cluster.ClusterClientSession;
import org.noear.socketd.exception.SocketDConnectionException;
import org.noear.socketd.exception.SocketDException;
import org.noear.socketd.transport.client.ClientConfig;
import org.noear.socketd.transport.client.ClientConfigHandler;
import org.noear.socketd.transport.client.ClientSession;
import org.noear.socketd.transport.core.Entity;
import org.noear.socketd.transport.core.Listener;
import org.noear.socketd.transport.core.Message;
import org.noear.socketd.transport.core.Reply;
import org.noear.socketd.transport.core.Session;
import org.noear.socketd.transport.core.entity.EntityDefault;
import org.noear.socketd.transport.core.entity.StringEntity;
import org.noear.socketd.transport.stream.RequestStream;
import org.noear.socketd.utils.StrUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MqClientDefault
implements MqClientInternal {
    private static final Logger log = LoggerFactory.getLogger(MqClientDefault.class);
    protected MqTransactionCheckback transactionCheckback;
    protected MqConsumeHandler listenHandler;
    protected ExecutorService consumeExecutor;
    private final String[] urls;
    private ClusterClientSession clientSession;
    private final MqClientListener clientListener;
    private ClientConfigHandler clientConfigHandler;
    private Map<String, MqSubscription> subscriptionMap = new HashMap<String, MqSubscription>();
    private String name;
    private String namespace;
    protected boolean autoAcknowledge = true;

    public MqClientDefault(String[] urls) {
        this(urls, null);
    }

    public MqClientDefault(String[] urls, MqClientListener clientListener) {
        this.urls = urls;
        this.clientListener = clientListener == null ? new MqClientListener() : clientListener;
        this.clientListener.init(this);
    }

    @Override
    public String name() {
        return this.name;
    }

    @Override
    public MqClient nameAs(String name) {
        MqAssert.requireNonNull(name, "Param 'name' can't be null");
        MqAssert.assertMeta(name, "name");
        this.name = name;
        return this;
    }

    @Override
    public String namespace() {
        return this.namespace;
    }

    @Override
    public MqClient namespaceAs(String namespace) {
        this.namespace = namespace;
        return this;
    }

    @Override
    public MqClient connect() throws IOException {
        ArrayList<String> serverUrls = new ArrayList<String>();
        for (String url : this.urls) {
            url = url.replaceAll("folkmq:ws://", "sd:ws://");
            url = url.replaceAll("folkmq:wss://", "sd:wss://");
            url = url.replaceAll("folkmq://", "sd:tcp://");
            for (String url1 : url.split(",")) {
                if (StrUtils.isNotEmpty((String)this.name)) {
                    url1 = url1.contains("?") ? url1 + "&@=" + this.name : url1 + "?@=" + this.name;
                }
                serverUrls.add(url1);
            }
        }
        this.clientSession = (ClusterClientSession)SocketD.createClusterClient(serverUrls).config(c -> {
            ((ClientConfig)((ClientConfig)((ClientConfig)c.metaPut("folkmq-version", FolkMQ.versionCodeAsString()).heartbeatInterval(6000L).nolockSend(true)).ioThreads(1)).codecThreads(1)).exchangeThreads(1);
            if (StrUtils.isNotEmpty((String)this.namespace)) {
                c.metaPut("folkmq-namespace", this.namespace);
            }
            if (this.clientConfigHandler != null) {
                this.clientConfigHandler.clientConfig(c);
            }
        }).listen((Listener)this.clientListener).open();
        return this;
    }

    @Override
    public void disconnect() throws IOException {
        this.clientSession.close();
    }

    @Override
    public MqClient config(ClientConfigHandler configHandler) {
        this.clientConfigHandler = configHandler;
        return this;
    }

    @Override
    public MqClient consumeExecutor(ExecutorService consumeExecutor) {
        this.consumeExecutor = consumeExecutor;
        return this;
    }

    @Override
    public MqClient autoAcknowledge(boolean auto) {
        this.autoAcknowledge = auto;
        return this;
    }

    @Override
    public boolean autoAcknowledge() {
        return this.autoAcknowledge;
    }

    @Override
    public CompletableFuture<String> call(String apiName, String apiToken, String topic, String consumerGroup) throws IOException {
        MqAssert.requireNonNull(apiName, "Param 'apiName' can't be null");
        MqAssert.requireNonNull(apiToken, "Param 'apiToken' can't be null");
        MqAssert.requireNonNull(topic, "Param 'topic' can't be null");
        MqAssert.requireNonNull(consumerGroup, "Param 'consumerGroup' can't be null");
        MqAssert.assertMeta(apiName, "apiName");
        MqAssert.assertMeta(apiToken, "apiToken");
        MqAssert.assertMeta(topic, "topic");
        MqAssert.assertMeta(consumerGroup, "consumerGroup");
        topic = MqTopicHelper.getFullTopic(this.namespace, topic);
        if (this.clientSession != null) {
            EntityDefault entity = new StringEntity("").metaPut("api.name", apiName).metaPut("api.token", apiToken).metaPut("mq.topic", topic).metaPut("mq.consumer", consumerGroup);
            CompletableFuture<String> completableFuture = new CompletableFuture<String>();
            this.clientSession.sendAndRequest("mq.api", (Entity)entity).thenReply(r -> completableFuture.complete(r.dataAsString())).thenError(err -> completableFuture.completeExceptionally((Throwable)err));
            return completableFuture;
        }
        throw new IOException("No sessions are available");
    }

    @Override
    public void subscribe(String topic, String consumerGroup, boolean autoAck, MqConsumeHandler consumerHandler) throws IOException {
        MqAssert.requireNonNull(topic, "Param 'topic' can't be null");
        MqAssert.requireNonNull(consumerGroup, "Param 'consumerGroup' can't be null");
        MqAssert.requireNonNull(consumerHandler, "Param 'consumerHandler' can't be null");
        MqAssert.assertMeta(topic, "topic");
        MqAssert.assertMeta(consumerGroup, "consumerGroup");
        topic = MqTopicHelper.getFullTopic(this.namespace, topic);
        MqSubscription subscription = new MqSubscription(topic, consumerGroup, autoAck, consumerHandler);
        this.subscriptionMap.put(subscription.getQueueName(), subscription);
        if (this.clientSession != null) {
            for (ClientSession session : this.clientSession.getSessionAll()) {
                EntityDefault entity = new StringEntity("").metaPut("mq.topic", subscription.getTopic()).metaPut("mq.consumer", subscription.getConsumerGroup()).at("folkmq-server*");
                session.sendAndRequest("mq.event.subscribe", (Entity)entity, 30000L).await();
                log.info("Client subscribe successfully: {}#{}, sessionId={}", new Object[]{topic, consumerGroup, session.sessionId()});
            }
        }
    }

    @Override
    public void unsubscribe(String topic, String consumerGroup) throws IOException {
        MqAssert.requireNonNull(topic, "Param 'topic' can't be null");
        MqAssert.requireNonNull(consumerGroup, "Param 'consumerGroup' can't be null");
        MqAssert.assertMeta(topic, "topic");
        MqAssert.assertMeta(consumerGroup, "consumerGroup");
        topic = MqTopicHelper.getFullTopic(this.namespace, topic);
        String queueName = topic + "#" + consumerGroup;
        this.subscriptionMap.remove(queueName);
        if (this.clientSession != null) {
            for (ClientSession session : this.clientSession.getSessionAll()) {
                EntityDefault entity = new StringEntity("").metaPut("mq.topic", topic).metaPut("mq.consumer", consumerGroup).at("folkmq-server*");
                session.sendAndRequest("mq.event.unsubscribe", (Entity)entity, 30000L).await();
                log.info("Client unsubscribe successfully: {}#{}\uff0c sessionId={}", new Object[]{topic, consumerGroup, session.sessionId()});
            }
        }
    }

    @Override
    public void publish(String topic, MqMessage message) throws IOException {
        MqAssert.requireNonNull(topic, "Param 'topic' can't be null");
        MqAssert.requireNonNull(message, "Param 'message' can't be null");
        MqAssert.assertMeta(topic, "topic");
        if (this.clientSession == null) {
            throw new SocketDConnectionException("Not connected!");
        }
        ClientSession session = this.clientSession.getSessionAny(this.diversionOrNull(topic = MqTopicHelper.getFullTopic(this.namespace, topic), message));
        if (session == null || !session.isValid()) {
            throw new SocketDException("No session is available!");
        }
        EntityDefault entity = MqUtils.getOf((Session)session).publishEntityBuild(topic, message);
        if (message.getQos() > 0) {
            Reply resp = session.sendAndRequest("mq.event.publish", (Entity)entity).await();
            int confirm = Integer.parseInt(resp.metaOrDefault("mq.confirm", "0"));
            if (confirm != 1) {
                String messsage = "Client message publish confirm failed: " + resp.dataAsString();
                throw new FolkmqException(messsage);
            }
        } else {
            session.send("mq.event.publish", (Entity)entity);
        }
    }

    @Override
    public CompletableFuture<Boolean> publishAsync(String topic, MqMessage message) throws IOException {
        MqAssert.requireNonNull(topic, "Param 'topic' can't be null");
        MqAssert.requireNonNull(message, "Param 'message' can't be null");
        MqAssert.assertMeta(topic, "topic");
        if (this.clientSession == null) {
            throw new SocketDConnectionException("Not connected!");
        }
        ClientSession session = this.clientSession.getSessionAny(this.diversionOrNull(topic = MqTopicHelper.getFullTopic(this.namespace, topic), message));
        if (session == null || !session.isValid()) {
            throw new SocketDException("No session is available!");
        }
        CompletableFuture<Boolean> future = new CompletableFuture<Boolean>();
        EntityDefault entity = MqUtils.getOf((Session)session).publishEntityBuild(topic, message);
        if (message.getQos() > 0) {
            session.sendAndRequest("mq.event.publish", (Entity)entity, -1L).thenReply(r -> {
                int confirm = Integer.parseInt(r.metaOrDefault("mq.confirm", "0"));
                if (confirm == 1) {
                    future.complete(true);
                } else {
                    String messsage = "Client message publish confirm failed: " + r.dataAsString();
                    future.completeExceptionally(new FolkmqException(messsage));
                }
            }).thenError(err -> {
                String messsage = "Client message publish confirm failed: " + err.getMessage();
                future.completeExceptionally(new FolkmqException(messsage));
            });
        } else {
            session.send("mq.event.publish", (Entity)entity);
            future.complete(true);
        }
        return future;
    }

    @Override
    public void unpublish(String topic, String key) throws IOException {
        MqAssert.requireNonNull(topic, "Param 'topic' can't be null");
        MqAssert.requireNonNull(key, "Param 'key' can't be null");
        MqAssert.assertMeta(topic, "topic");
        MqAssert.assertMeta(key, "key");
        if (this.clientSession == null) {
            throw new SocketDConnectionException("Not connected!");
        }
        topic = MqTopicHelper.getFullTopic(this.namespace, topic);
        ClientSession session = this.clientSession.getSessionAny(null);
        if (session == null || !session.isValid()) {
            throw new SocketDException("No session is available!");
        }
        EntityDefault entity = new StringEntity("").metaPut("mq.topic", topic).metaPut("mq.tid", key).at("folkmq-server*");
        Reply resp = session.sendAndRequest("mq.event.unpublish", (Entity)entity).await();
        int confirm = Integer.parseInt(resp.metaOrDefault("mq.confirm", "0"));
        if (confirm != 1) {
            String messsage = "Client message unpublish confirm failed: " + resp.dataAsString();
            throw new FolkmqException(messsage);
        }
    }

    @Override
    public CompletableFuture<Boolean> unpublishAsync(String topic, String key) throws IOException {
        MqAssert.requireNonNull(topic, "Param 'topic' can't be null");
        MqAssert.requireNonNull(key, "Param 'key' can't be null");
        MqAssert.assertMeta(topic, "topic");
        MqAssert.assertMeta(key, "key");
        if (this.clientSession == null) {
            throw new SocketDConnectionException("Not connected!");
        }
        topic = MqTopicHelper.getFullTopic(this.namespace, topic);
        ClientSession session = this.clientSession.getSessionAny(null);
        if (session == null || !session.isValid()) {
            throw new SocketDException("No session is available!");
        }
        CompletableFuture<Boolean> future = new CompletableFuture<Boolean>();
        EntityDefault entity = new StringEntity("").metaPut("mq.topic", topic).metaPut("mq.tid", key).at("folkmq-server*");
        session.sendAndRequest("mq.event.unpublish", (Entity)entity, -1L).thenReply(r -> {
            int confirm = Integer.parseInt(r.metaOrDefault("mq.confirm", "0"));
            if (confirm == 1) {
                future.complete(true);
            } else {
                String messsage = "Client message unpublish confirm failed: " + r.dataAsString();
                future.completeExceptionally(new FolkmqException(messsage));
            }
        }).thenError(err -> {
            String messsage = "Client message unpublish confirm failed: " + err.getMessage();
            future.completeExceptionally(new FolkmqException(messsage));
        });
        return future;
    }

    @Override
    public void listen(MqConsumeHandler listenHandler) {
        if (StrUtils.isEmpty((String)this.name)) {
            throw new IllegalArgumentException("Client 'name' can't be empty");
        }
        this.listenHandler = listenHandler;
    }

    @Override
    public RequestStream send(MqMessage message, String toName, long timeout) throws IOException {
        if (StrUtils.isEmpty((String)this.name)) {
            throw new IllegalArgumentException("Client 'name' can't be empty");
        }
        MqAssert.requireNonNull(toName, "Param 'toName' can't be null");
        MqAssert.requireNonNull(message, "Param 'message' can't be null");
        MqAssert.assertMeta(toName, "toName");
        if (this.clientSession == null) {
            throw new SocketDConnectionException("Not connected!");
        }
        ClientSession session = this.clientSession.getSessionAny(null);
        if (session == null || !session.isValid()) {
            throw new SocketDException("No session is available!");
        }
        message.internalSender(this.name());
        EntityDefault entity = MqUtils.getOf((Session)session).publishEntityBuild("", message);
        entity.putMeta("c1", toName);
        entity.at(toName);
        if (message.getQos() > 0) {
            return session.sendAndRequest("mq.event.request", (Entity)entity, timeout);
        }
        session.send("mq.event.request", (Entity)entity);
        return null;
    }

    @Override
    public MqClient transactionCheckback(MqTransactionCheckback transactionCheckback) {
        if (transactionCheckback != null) {
            this.transactionCheckback = transactionCheckback;
        }
        return this;
    }

    @Override
    public MqTransaction newTransaction() {
        if (StrUtils.isEmpty((String)this.name)) {
            throw new IllegalArgumentException("Client 'name' can't be empty");
        }
        return new MqTransactionImpl(this);
    }

    @Override
    public void publish2(String tmid, List<String> keyAry, boolean isRollback) throws IOException {
        if (keyAry == null || keyAry.size() == 0) {
            return;
        }
        if (this.clientSession == null) {
            throw new SocketDConnectionException("Not connected!");
        }
        ClientSession session = this.clientSession.getSessionAny(tmid);
        if (session == null || !session.isValid()) {
            throw new SocketDException("No session is available!");
        }
        EntityDefault entity = new StringEntity(String.join((CharSequence)",", keyAry)).metaPut("mq.rollback", isRollback ? "1" : "0").at("folkmq-server!");
        Reply resp = session.sendAndRequest("mq.event.publish2", (Entity)entity).await();
        int confirm = Integer.parseInt(resp.metaOrDefault("mq.confirm", "0"));
        if (confirm != 1) {
            String messsage = "Client message publish2 confirm failed: " + resp.dataAsString();
            throw new FolkmqException(messsage);
        }
    }

    @Override
    public void reply(Session session, Message from, MqMessageReceivedImpl message, boolean isOk, Entity entity) throws IOException {
        if (message.getQos() > 0 && session.isValid()) {
            if (entity == null) {
                entity = new EntityDefault();
            }
            if (entity instanceof MqAlarm) {
                session.sendAlarm(from, entity.dataAsString());
            } else {
                entity.putMeta("mq.ack", isOk ? "1" : "0");
                session.replyEnd(from, entity);
            }
        }
    }

    protected String diversionOrNull(String fullTopic, MqMessage message) {
        if (message.isTransaction()) {
            return message.getTmid();
        }
        if (message.isSequence()) {
            if (StrUtils.isEmpty((String)message.getSequenceSharding())) {
                return fullTopic;
            }
            return message.getSequenceSharding();
        }
        return null;
    }

    protected MqSubscription getSubscription(String fullTopic, String consumerGroup) {
        String queueName = fullTopic + "#" + consumerGroup;
        return this.subscriptionMap.get(queueName);
    }

    protected Collection<MqSubscription> getSubscriptionAll() {
        return this.subscriptionMap.values();
    }

    protected int getSubscriptionSize() {
        return this.subscriptionMap.size();
    }
}

