/*
 * Decompiled with CFR 0.152.
 */
package org.granite.client.messaging.channel.amf;

import flex.messaging.messages.AcknowledgeMessage;
import flex.messaging.messages.AsyncMessage;
import flex.messaging.messages.CommandMessage;
import flex.messaging.messages.Message;
import java.io.IOException;
import java.io.InputStream;
import java.io.UnsupportedEncodingException;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.granite.client.messaging.Consumer;
import org.granite.client.messaging.ResponseListener;
import org.granite.client.messaging.channel.AsyncToken;
import org.granite.client.messaging.channel.MessagingChannel;
import org.granite.client.messaging.channel.ResponseMessageFuture;
import org.granite.client.messaging.channel.amf.AbstractAMFChannel;
import org.granite.client.messaging.codec.MessagingCodec;
import org.granite.client.messaging.messages.Message;
import org.granite.client.messaging.messages.RequestMessage;
import org.granite.client.messaging.messages.ResponseMessage;
import org.granite.client.messaging.messages.requests.DisconnectMessage;
import org.granite.client.messaging.messages.requests.LoginMessage;
import org.granite.client.messaging.messages.responses.AbstractResponseMessage;
import org.granite.client.messaging.messages.responses.FaultMessage;
import org.granite.client.messaging.messages.responses.ResultMessage;
import org.granite.client.messaging.transport.DefaultTransportMessage;
import org.granite.client.messaging.transport.Transport;
import org.granite.client.messaging.transport.TransportMessage;
import org.granite.logging.Logger;
import org.granite.util.UUIDUtil;

public class BaseAMFMessagingChannel
extends AbstractAMFChannel
implements MessagingChannel {
    private static final Logger log = Logger.getLogger(BaseAMFMessagingChannel.class);
    protected final MessagingCodec<Message[]> codec;
    protected String sessionId = null;
    protected final ConcurrentMap<String, Consumer> consumersMap = new ConcurrentHashMap<String, Consumer>();
    protected final AtomicReference<String> connectMessageId = new AtomicReference<Object>(null);
    protected final AtomicReference<String> loginMessageId = new AtomicReference<Object>(null);
    protected final AtomicReference<ReconnectTimerTask> reconnectTimerTask = new AtomicReference();
    protected final List<MessagingChannel.ChannelResponseListener> responseListeners = new ArrayList<MessagingChannel.ChannelResponseListener>();
    protected volatile long reconnectIntervalMillis = TimeUnit.SECONDS.toMillis(30L);
    protected volatile long reconnectMaxAttempts = 60L;
    protected volatile long reconnectAttempts = 0L;

    public BaseAMFMessagingChannel(MessagingCodec<Message[]> codec, Transport transport, String id, URI uri) {
        super(transport, id, uri, 1);
        this.codec = codec;
    }

    @Override
    public void setSessionId(String sessionId) {
        if (sessionId == null && this.sessionId != null || sessionId != null && !sessionId.equals(this.sessionId)) {
            this.sessionId = sessionId;
            log.info("Messaging channel %s set sessionId %s", this.clientId, sessionId);
        }
    }

    protected boolean connect() {
        this.cancelReconnectTimerTask();
        if (this.consumersMap.isEmpty()) {
            return false;
        }
        String id = UUIDUtil.randomUUID();
        if (!this.connectMessageId.compareAndSet(null, id)) {
            return false;
        }
        log.debug("Connecting channel with clientId %s", this.clientId);
        CommandMessage connectMessage = new CommandMessage();
        connectMessage.setOperation(20);
        connectMessage.setMessageId(id);
        connectMessage.setTimestamp(System.currentTimeMillis());
        connectMessage.setClientId(this.clientId);
        try {
            this.transport.send(this, new DefaultTransportMessage<Message[]>(id, true, false, this.clientId, this.sessionId, new Message[]{connectMessage}, this.codec));
            return true;
        }
        catch (Exception e) {
            this.connectMessageId.set(null);
            this.loginMessageId.set(null);
            this.scheduleReconnectTimerTask();
            return false;
        }
    }

    @Override
    public void addConsumer(Consumer consumer) {
        this.consumersMap.putIfAbsent(consumer.getSubscriptionId(), consumer);
        this.connect();
    }

    @Override
    public boolean removeConsumer(Consumer consumer) {
        String subscriptionId = consumer.getSubscriptionId();
        if (subscriptionId == null) {
            for (String sid : this.consumersMap.keySet()) {
                if (this.consumersMap.get(sid) != consumer) continue;
                subscriptionId = sid;
                break;
            }
        }
        if (subscriptionId == null) {
            log.warn("Trying to remove unexisting consumer for destination %s", consumer.getDestination());
            return false;
        }
        return this.consumersMap.remove(subscriptionId) != null;
    }

    @Override
    public void addListener(MessagingChannel.ChannelResponseListener listener) {
        this.responseListeners.add(listener);
    }

    @Override
    public void removeListener(MessagingChannel.ChannelResponseListener listener) {
        this.responseListeners.remove(listener);
    }

    @Override
    public ResponseMessageFuture logout(boolean sendLogout, ResponseListener ... listeners) {
        log.info("Logging out channel %s", this.clientId);
        ResponseMessageFuture future = super.logout(sendLogout, listeners);
        this.disconnect(new ResponseListener[0]);
        return future;
    }

    @Override
    public synchronized ResponseMessageFuture disconnect(ResponseListener ... listeners) {
        this.cancelReconnectTimerTask();
        for (Consumer consumer : this.consumersMap.values()) {
            consumer.onDisconnect();
        }
        this.consumersMap.clear();
        this.connectMessageId.set(null);
        this.loginMessageId.set(null);
        this.reconnectAttempts = 0L;
        return this.send(new DisconnectMessage(this.clientId), listeners);
    }

    @Override
    protected TransportMessage createTransportMessage(AsyncToken token) throws UnsupportedEncodingException {
        Message[] messages = this.convertToAmf(token.getRequest());
        return new DefaultTransportMessage<Message[]>(token.getId(), false, token.isDisconnectRequest(), this.clientId, this.sessionId, messages, this.codec);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected ResponseMessage decodeResponse(InputStream is) throws IOException {
        boolean reconnect = false;
        AbstractResponseMessage responseChain = null;
        AbstractResponseMessage currentResponse = null;
        try {
            if (is.available() > 0) {
                Message[] messages = this.codec.decode(is);
                log.debug("Channel %s: received %d messages", this.clientId, messages.length);
                for (Message message : messages) {
                    RequestMessage request;
                    Message.Type requestType;
                    if (!(message instanceof AcknowledgeMessage)) continue;
                    AbstractResponseMessage response = this.convertFromAmf((AcknowledgeMessage)message);
                    if (response instanceof ResultMessage) {
                        requestType = null;
                        request = this.getRequest(response.getCorrelationId());
                        if (request != null) {
                            requestType = request.getType();
                        } else if (response.getCorrelationId().equals(this.connectMessageId.get())) {
                            requestType = Message.Type.PING;
                        } else if (response.getCorrelationId().equals(this.loginMessageId.get())) {
                            requestType = Message.Type.LOGIN;
                        }
                        if (requestType != null) {
                            ResultMessage result = (ResultMessage)response;
                            switch (requestType) {
                                case PING: {
                                    if (messages[0].getBody() instanceof Map) {
                                        Object reconnectMaxAttempts;
                                        Map advices = (Map)messages[0].getBody();
                                        Object reconnectIntervalMillis = advices.get("reconnect-interval-ms");
                                        if (reconnectIntervalMillis instanceof Number) {
                                            this.reconnectIntervalMillis = ((Number)reconnectIntervalMillis).longValue();
                                        }
                                        if ((reconnectMaxAttempts = advices.get("reconnect-max-attempts")) instanceof Number) {
                                            this.reconnectMaxAttempts = ((Number)reconnectMaxAttempts).longValue();
                                        }
                                    }
                                    if (messages[0].getHeaders().containsKey("JSESSIONID")) {
                                        this.setSessionId((String)messages[0].getHeader("JSESSIONID"));
                                    }
                                    if (this.clientId != null && !this.clientId.equals(result.getClientId())) {
                                        log.warn("Channel %s pinged new clientId %s  requested %s", this.id, result != null ? result.getClientId() : "(no request)", this.clientId);
                                    } else {
                                        log.debug("Channel %s pinged clientId %s", this.id, this.clientId);
                                    }
                                    this.clientId = result.getClientId();
                                    this.setPinged(true);
                                    LoginMessage loginMessage = this.authenticate(null);
                                    if (loginMessage == null) break;
                                    this.loginMessageId.set(loginMessage.getId());
                                    break;
                                }
                                case LOGIN: {
                                    this.setAuthenticated(true);
                                    break;
                                }
                                case SUBSCRIBE: {
                                    result.setResult(messages[0].getHeader("DSDstClientId"));
                                    break;
                                }
                            }
                        }
                    } else if (response instanceof FaultMessage) {
                        requestType = null;
                        request = this.getRequest(response.getCorrelationId());
                        if (request != null) {
                            requestType = request.getType();
                        } else if (response.getCorrelationId().equals(this.connectMessageId.get())) {
                            requestType = Message.Type.PING;
                        } else if (response.getCorrelationId().equals(this.loginMessageId.get())) {
                            requestType = Message.Type.LOGIN;
                        }
                        if (requestType != null) {
                            switch (requestType) {
                                case PING: {
                                    this.clientId = null;
                                    this.setPinged(false);
                                }
                                case LOGIN: {
                                    this.setAuthenticated(false);
                                    if (!this.transport.isDisconnectAfterAuthenticationFailure()) break;
                                    this.disconnect(new ResponseListener[0]);
                                    break;
                                }
                            }
                        }
                        this.dispatchFault((FaultMessage)response);
                    }
                    if (responseChain == null) {
                        responseChain = currentResponse = response;
                        continue;
                    }
                    currentResponse.setNext(response);
                    currentResponse = response;
                }
                if (responseChain != null) {
                    for (MessagingChannel.ChannelResponseListener listener : this.responseListeners) {
                        listener.onResponse(responseChain);
                    }
                }
                for (Message message : messages) {
                    if (message instanceof AcknowledgeMessage) continue;
                    reconnect = this.transport.isReconnectAfterReceive();
                    if (!(message instanceof AsyncMessage)) {
                        throw new RuntimeException("Message should be an AsyncMessage: " + message);
                    }
                    String subscriptionId = (String)message.getHeader("DSDstClientId");
                    Consumer consumer = (Consumer)this.consumersMap.get(subscriptionId);
                    if (consumer != null) {
                        consumer.onMessage(this.convertFromAmf((AsyncMessage)message));
                        continue;
                    }
                    log.warn("Channel %s: no consumer for subscriptionId: %s", this.clientId, subscriptionId);
                }
            } else {
                reconnect = this.transport.isReconnectAfterReceive();
            }
        }
        finally {
            if (reconnect) {
                this.connectMessageId.set(null);
                this.loginMessageId.set(null);
                this.connect();
            }
        }
        return responseChain;
    }

    @Override
    protected void internalStop() {
        super.internalStop();
        this.cancelReconnectTimerTask();
    }

    @Override
    public void onError(TransportMessage message, Exception e) {
        if (!this.isStarted()) {
            return;
        }
        super.onError(message, e);
        if (message != null && this.connectMessageId.compareAndSet(message.getId(), null)) {
            this.scheduleReconnectTimerTask();
        }
    }

    protected void cancelReconnectTimerTask() {
        ReconnectTimerTask task = this.reconnectTimerTask.getAndSet(null);
        if (task != null && task.cancel()) {
            this.reconnectAttempts = 0L;
        }
    }

    protected void scheduleReconnectTimerTask() {
        log.info("Channel %s schedule reconnect", this.getId());
        this.setPinged(false);
        this.setAuthenticated(false);
        ReconnectTimerTask task = new ReconnectTimerTask();
        ReconnectTimerTask previousTask = this.reconnectTimerTask.getAndSet(task);
        if (previousTask != null) {
            previousTask.cancel();
        }
        if (this.reconnectAttempts < this.reconnectMaxAttempts) {
            ++this.reconnectAttempts;
            this.schedule(task, this.reconnectIntervalMillis);
        }
    }

    class ReconnectTimerTask
    extends TimerTask {
        ReconnectTimerTask() {
        }

        @Override
        public void run() {
            BaseAMFMessagingChannel.this.connect();
        }
    }
}

