/*
 * Decompiled with CFR 0.152.
 */
package org.apache.qpid.server.virtualhost;

import java.io.DataInput;
import java.io.DataInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.TreeMap;
import java.util.UUID;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQStoreException;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.exchange.ExchangeFactory;
import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.exchange.FilterSupport;
import org.apache.qpid.server.exchange.TopicExchange;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.messages.ConfigStoreMessages;
import org.apache.qpid.server.logging.messages.TransactionLogMessages;
import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject;
import org.apache.qpid.server.message.EnqueableMessage;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.Binding;
import org.apache.qpid.server.model.Exchange;
import org.apache.qpid.server.model.LifetimePolicy;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.store.ConfigurationRecoveryHandler;
import org.apache.qpid.server.store.ConfiguredObjectRecord;
import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.MessageStoreRecoveryHandler;
import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.store.Transaction;
import org.apache.qpid.server.store.TransactionLogRecoveryHandler;
import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.txn.DtxBranch;
import org.apache.qpid.server.txn.DtxRegistry;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.transport.Xid;
import org.apache.qpid.transport.util.Functions;
import org.apache.qpid.util.ByteBufferInputStream;

/*
 * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
 */
public class VirtualHostConfigRecoveryHandler
implements ConfigurationRecoveryHandler,
MessageStoreRecoveryHandler,
MessageStoreRecoveryHandler.StoredMessageRecoveryHandler,
TransactionLogRecoveryHandler,
TransactionLogRecoveryHandler.QueueEntryRecoveryHandler,
TransactionLogRecoveryHandler.DtxRecordRecoveryHandler {
    private static final Logger _logger = Logger.getLogger(VirtualHostConfigRecoveryHandler.class);
    private final VirtualHost _virtualHost;
    private final Map<String, Integer> _queueRecoveries = new TreeMap<String, Integer>();
    private final Map<Long, ServerMessage> _recoveredMessages = new HashMap<Long, ServerMessage>();
    private final Map<Long, StoredMessage> _unusedMessages = new HashMap<Long, StoredMessage>();
    private final Map<String, Map<UUID, Map<String, Object>>> _configuredObjects = new HashMap<String, Map<UUID, Map<String, Object>>>();
    private final ExchangeRegistry _exchangeRegistry;
    private final ExchangeFactory _exchangeFactory;
    private MessageStoreLogSubject _logSubject;
    private MessageStore _store;
    private int _currentConfigVersion;
    private DurableConfigurationStore _configStore;

    public VirtualHostConfigRecoveryHandler(VirtualHost virtualHost, ExchangeRegistry exchangeRegistry, ExchangeFactory exchangeFactory) {
        this._virtualHost = virtualHost;
        this._exchangeRegistry = exchangeRegistry;
        this._exchangeFactory = exchangeFactory;
    }

    @Override
    public void beginConfigurationRecovery(DurableConfigurationStore store, int configVersion) {
        this._logSubject = new MessageStoreLogSubject(this._virtualHost, store.getClass().getSimpleName());
        this._configStore = store;
        this._currentConfigVersion = configVersion;
        CurrentActor.get().message(this._logSubject, ConfigStoreMessages.RECOVERY_START());
    }

    @Override
    public VirtualHostConfigRecoveryHandler begin(MessageStore store) {
        this._logSubject = new MessageStoreLogSubject(this._virtualHost, store.getClass().getSimpleName());
        this._store = store;
        CurrentActor.get().message(this._logSubject, TransactionLogMessages.RECOVERY_START(null, false));
        return this;
    }

    public void queue(UUID id, String queueName, String owner, boolean exclusive, FieldTable arguments, UUID alternateExchangeId) {
        try {
            AMQQueue q = this._virtualHost.getQueueRegistry().getQueue(queueName);
            if (q == null) {
                q = AMQQueueFactory.createAMQQueueImpl(id, queueName, true, owner, false, exclusive, this._virtualHost, FieldTable.convertToMap((FieldTable)arguments));
                this._virtualHost.getQueueRegistry().registerQueue(q);
                if (alternateExchangeId != null) {
                    org.apache.qpid.server.exchange.Exchange altExchange = this._exchangeRegistry.getExchange(alternateExchangeId);
                    if (altExchange == null) {
                        _logger.error((Object)("Unknown exchange id " + alternateExchangeId + ", cannot set alternate exchange on queue with id " + id));
                        return;
                    }
                    q.setAlternateExchange(altExchange);
                }
            }
            CurrentActor.get().message(this._logSubject, TransactionLogMessages.RECOVERY_START(queueName, true));
            this._queueRecoveries.put(queueName, 0);
        }
        catch (AMQException e) {
            throw new RuntimeException("Error recovering queue uuid " + id + " name " + queueName, e);
        }
    }

    public void exchange(UUID id, String exchangeName, String type, boolean autoDelete) {
        try {
            org.apache.qpid.server.exchange.Exchange exchange = this._exchangeRegistry.getExchange(exchangeName);
            if (exchange == null) {
                exchange = this._exchangeFactory.createExchange(id, exchangeName, type, true, autoDelete);
                this._exchangeRegistry.registerExchange(exchange);
            }
        }
        catch (AMQException e) {
            throw new RuntimeException("Error recovering exchange uuid " + id + " name " + exchangeName, e);
        }
    }

    @Override
    public MessageStoreRecoveryHandler.StoredMessageRecoveryHandler begin() {
        return this;
    }

    @Override
    public void message(StoredMessage message) {
        ServerMessage serverMessage = message.getMetaData().getType().createMessage(message);
        this._recoveredMessages.put(message.getMessageNumber(), serverMessage);
        this._unusedMessages.put(message.getMessageNumber(), message);
    }

    @Override
    public void completeMessageRecovery() {
    }

    @Override
    public void dtxRecord(long format, byte[] globalId, byte[] branchId, Transaction.Record[] enqueues, Transaction.Record[] dequeues) {
        StringBuilder xidString;
        ServerMessage message;
        AMQQueue queue;
        Xid id = new Xid(format, globalId, branchId);
        DtxRegistry dtxRegistry = this._virtualHost.getDtxRegistry();
        DtxBranch branch = dtxRegistry.getBranch(id);
        if (branch == null) {
            branch = new DtxBranch(id, this._store, this._virtualHost);
            dtxRegistry.registerBranch(branch);
        }
        for (Transaction.Record record : enqueues) {
            queue = this._virtualHost.getQueueRegistry().getQueue(record.getQueue().getId());
            if (queue != null) {
                long messageId = record.getMessage().getMessageNumber();
                message = this._recoveredMessages.get(messageId);
                this._unusedMessages.remove(messageId);
                if (message != null) {
                    final MessageReference ref = message.newReference();
                    branch.enqueue(queue, message);
                    branch.addPostTransactionAcion(new ServerTransaction.Action(){

                        public void postCommit() {
                            try {
                                queue.enqueue(message, true, null);
                                ref.release();
                            }
                            catch (AMQException e) {
                                _logger.error((Object)("Unable to enqueue message " + message.getMessageNumber() + " into " + "queue " + queue.getName() + " (from XA transaction)"), (Throwable)e);
                                throw new RuntimeException(e);
                            }
                        }

                        public void onRollback() {
                            ref.release();
                        }
                    });
                    continue;
                }
                xidString = VirtualHostConfigRecoveryHandler.xidAsString(id);
                CurrentActor.get().message(this._logSubject, TransactionLogMessages.XA_INCOMPLETE_MESSAGE(xidString.toString(), Long.toString(messageId)));
                continue;
            }
            StringBuilder xidString2 = VirtualHostConfigRecoveryHandler.xidAsString(id);
            CurrentActor.get().message(this._logSubject, TransactionLogMessages.XA_INCOMPLETE_QUEUE(xidString2.toString(), record.getQueue().getId().toString()));
        }
        for (Transaction.Record record : dequeues) {
            queue = this._virtualHost.getQueueRegistry().getQueue(record.getQueue().getId());
            if (queue != null) {
                long messageId = record.getMessage().getMessageNumber();
                message = this._recoveredMessages.get(messageId);
                this._unusedMessages.remove(messageId);
                if (message != null) {
                    final QueueEntry entry = queue.getMessageOnTheQueue(messageId);
                    entry.acquire();
                    branch.dequeue(queue, message);
                    branch.addPostTransactionAcion(new ServerTransaction.Action(){

                        public void postCommit() {
                            entry.discard();
                        }

                        public void onRollback() {
                            entry.release();
                        }
                    });
                    continue;
                }
                xidString = VirtualHostConfigRecoveryHandler.xidAsString(id);
                CurrentActor.get().message(this._logSubject, TransactionLogMessages.XA_INCOMPLETE_MESSAGE(xidString.toString(), Long.toString(messageId)));
                continue;
            }
            StringBuilder xidString3 = VirtualHostConfigRecoveryHandler.xidAsString(id);
            CurrentActor.get().message(this._logSubject, TransactionLogMessages.XA_INCOMPLETE_QUEUE(xidString3.toString(), record.getQueue().getId().toString()));
        }
        try {
            branch.setState(DtxBranch.State.PREPARED);
            branch.prePrepareTransaction();
        }
        catch (AMQStoreException e) {
            _logger.error((Object)("Unexpected database exception when attempting to prepare a recovered XA transaction " + VirtualHostConfigRecoveryHandler.xidAsString(id)), (Throwable)e);
            throw new RuntimeException(e);
        }
    }

    private static StringBuilder xidAsString(Xid id) {
        return new StringBuilder("(").append(id.getFormat()).append(',').append(Functions.str((byte[])id.getGlobalId())).append(',').append(Functions.str((byte[])id.getBranchId())).append(')');
    }

    @Override
    public void completeDtxRecordRecovery() {
        for (StoredMessage m : this._unusedMessages.values()) {
            _logger.warn((Object)("Message id " + m.getMessageNumber() + " in store, but not in any queue - removing...."));
            m.remove();
        }
        CurrentActor.get().message(this._logSubject, TransactionLogMessages.RECOVERY_COMPLETE(null, false));
    }

    private void binding(UUID bindingId, UUID exchangeId, UUID queueId, String bindingKey, ByteBuffer buf) {
        block8: {
            try {
                Map argumentMap;
                org.apache.qpid.server.exchange.Exchange exchange = this._exchangeRegistry.getExchange(exchangeId);
                if (exchange == null) {
                    _logger.error((Object)("Unknown exchange id " + exchangeId + ", cannot bind queue with id " + queueId));
                    return;
                }
                AMQQueue queue = this._virtualHost.getQueueRegistry().getQueue(queueId);
                if (queue == null) {
                    _logger.error((Object)("Unknown queue id " + queueId + ", cannot be bound to exchange: " + exchange.getName()));
                    break block8;
                }
                FieldTable argumentsFT = null;
                if (buf != null) {
                    try {
                        argumentsFT = new FieldTable((DataInput)new DataInputStream((InputStream)new ByteBufferInputStream(buf)), (long)buf.limit());
                    }
                    catch (IOException e) {
                        throw new RuntimeException("IOException should not be thrown here", e);
                    }
                }
                if (exchange.getBinding(bindingKey, queue, argumentMap = FieldTable.convertToMap(argumentsFT)) == null) {
                    _logger.info((Object)("Restoring binding: (Exchange: " + exchange.getNameShortString() + ", Queue: " + queue.getName() + ", Routing Key: " + bindingKey + ", Arguments: " + argumentsFT + ")"));
                    exchange.restoreBinding(bindingId, bindingKey, queue, argumentMap);
                }
            }
            catch (AMQException e) {
                throw new RuntimeException(e);
            }
        }
    }

    public void complete() {
    }

    @Override
    public void queueEntry(final UUID queueId, long messageId) {
        AMQQueue queue = this._virtualHost.getQueueRegistry().getQueue(queueId);
        try {
            if (queue != null) {
                String queueName = queue.getName();
                ServerMessage message = this._recoveredMessages.get(messageId);
                this._unusedMessages.remove(messageId);
                if (message != null) {
                    Integer count;
                    if (_logger.isDebugEnabled()) {
                        _logger.debug((Object)("On recovery, delivering " + message.getMessageNumber() + " to " + queueName));
                    }
                    if ((count = this._queueRecoveries.get(queueName)) == null) {
                        count = 0;
                    }
                    queue.enqueue(message);
                    count = count + 1;
                    this._queueRecoveries.put(queueName, count);
                } else {
                    _logger.warn((Object)("Message id " + messageId + " referenced in log as enqueued in queue " + queueName + " is unknown, entry will be discarded"));
                    Transaction txn = this._store.newTransaction();
                    txn.dequeueMessage(queue, new DummyMessage(messageId));
                    txn.commitTranAsync();
                }
            } else {
                _logger.warn((Object)("Message id " + messageId + " in log references queue with id " + queueId + " which is not in the configuration, entry will be discarded"));
                Transaction txn = this._store.newTransaction();
                TransactionLogResource mockQueue = new TransactionLogResource(){

                    public UUID getId() {
                        return queueId;
                    }
                };
                txn.dequeueMessage(mockQueue, new DummyMessage(messageId));
                txn.commitTranAsync();
            }
        }
        catch (AMQException e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public TransactionLogRecoveryHandler.DtxRecordRecoveryHandler completeQueueEntryRecovery() {
        for (Map.Entry<String, Integer> entry : this._queueRecoveries.entrySet()) {
            CurrentActor.get().message(this._logSubject, TransactionLogMessages.RECOVERED(entry.getValue(), entry.getKey()));
            CurrentActor.get().message(this._logSubject, TransactionLogMessages.RECOVERY_COMPLETE(entry.getKey(), true));
        }
        return this;
    }

    @Override
    public void configuredObject(UUID id, String type, Map<String, Object> attributes) {
        Map<UUID, Map<String, Object>> typeMap = this._configuredObjects.get(type);
        if (typeMap == null) {
            typeMap = new HashMap<UUID, Map<String, Object>>();
            this._configuredObjects.put(type, typeMap);
        }
        typeMap.put(id, attributes);
    }

    @Override
    public int completeConfigurationRecovery() {
        Map<UUID, Map<String, Object>> bindingObjects;
        Map<UUID, Map<String, Object>> queueObjects;
        Map<UUID, Map<String, Object>> exchangeObjects;
        if (1 != this._currentConfigVersion) {
            try {
                this.upgrade();
            }
            catch (AMQStoreException e) {
                throw new IllegalArgumentException("Unable to upgrade configuration from version " + this._currentConfigVersion + " to version " + 1);
            }
        }
        if ((exchangeObjects = this._configuredObjects.remove(Exchange.class.getName())) != null) {
            this.recoverExchanges(exchangeObjects);
        }
        if ((queueObjects = this._configuredObjects.remove(Queue.class.getName())) != null) {
            this.recoverQueues(queueObjects);
        }
        if ((bindingObjects = this._configuredObjects.remove(Binding.class.getName())) != null) {
            this.recoverBindings(bindingObjects);
        }
        CurrentActor.get().message(this._logSubject, ConfigStoreMessages.RECOVERY_COMPLETE());
        return 1;
    }

    private void upgrade() throws AMQStoreException {
        HashMap<UUID, String> updates = new HashMap<UUID, String>();
        String bindingType = Binding.class.getName();
        switch (this._currentConfigVersion) {
            case 0: {
                Map<UUID, Map<String, Object>> bindingObjects = this._configuredObjects.get(bindingType);
                if (bindingObjects != null) {
                    for (Map.Entry<UUID, Map<String, Object>> bindingEntry : bindingObjects.entrySet()) {
                        Map<String, Object> binding = bindingEntry.getValue();
                        if (!this.hasSelectorArguments(binding) || this.isTopicExchange(binding)) continue;
                        binding = new LinkedHashMap<String, Object>(binding);
                        this.removeSelectorArguments(binding);
                        bindingEntry.setValue(binding);
                        updates.put(bindingEntry.getKey(), bindingType);
                    }
                }
            }
            case 1: {
                if (updates.isEmpty()) break;
                ConfiguredObjectRecord[] updateRecords = new ConfiguredObjectRecord[updates.size()];
                int i = 0;
                for (Map.Entry update : updates.entrySet()) {
                    updateRecords[i++] = new ConfiguredObjectRecord((UUID)update.getKey(), (String)update.getValue(), this._configuredObjects.get(update.getValue()).get(update.getKey()));
                }
                this._configStore.update(updateRecords);
                break;
            }
            default: {
                throw new IllegalStateException("Unknown configuration model version: " + this._currentConfigVersion + ". Are you attempting to run an older instance against an upgraded configuration?");
            }
        }
    }

    private void removeSelectorArguments(Map<String, Object> binding) {
        LinkedHashMap<String, Object> arguments = new LinkedHashMap<String, Object>((Map)binding.get("arguments"));
        FilterSupport.removeFilters(arguments);
        binding.put("arguments", arguments);
    }

    private boolean isTopicExchange(Map<String, Object> binding) {
        UUID exchangeId = UUID.fromString((String)binding.get("exchange"));
        Map<UUID, Map<String, Object>> exchanges = this._configuredObjects.get(Exchange.class.getName());
        if (exchanges != null && exchanges.containsKey(exchangeId)) {
            return "topic".equals(exchanges.get(exchangeId).get("type"));
        }
        return this._exchangeRegistry.getExchange(exchangeId) != null && this._exchangeRegistry.getExchange(exchangeId).getType() == TopicExchange.TYPE;
    }

    private boolean hasSelectorArguments(Map<String, Object> binding) {
        Map arguments = (Map)binding.get("arguments");
        return arguments != null && FilterSupport.argumentsContainFilter(arguments);
    }

    private void recoverExchanges(Map<UUID, Map<String, Object>> exchangeObjects) {
        for (Map.Entry<UUID, Map<String, Object>> entry : exchangeObjects.entrySet()) {
            Map<String, Object> attributeMap = entry.getValue();
            String exchangeName = (String)attributeMap.get("name");
            String exchangeType = (String)attributeMap.get("type");
            String lifeTimePolicy = (String)attributeMap.get("lifetimePolicy");
            boolean autoDelete = lifeTimePolicy == null || LifetimePolicy.valueOf(lifeTimePolicy) == LifetimePolicy.AUTO_DELETE;
            this.exchange(entry.getKey(), exchangeName, exchangeType, autoDelete);
        }
    }

    private void recoverQueues(Map<UUID, Map<String, Object>> queueObjects) {
        for (Map.Entry<UUID, Map<String, Object>> entry : queueObjects.entrySet()) {
            Map<String, Object> attributeMap = entry.getValue();
            String queueName = (String)attributeMap.get("name");
            String owner = (String)attributeMap.get("owner");
            boolean exclusive = (Boolean)attributeMap.get("exclusive");
            UUID alternateExchangeId = attributeMap.get("alternateExchange") == null ? null : UUID.fromString((String)attributeMap.get("alternateExchange"));
            Map queueArgumentsMap = (Map)attributeMap.get("arguments");
            FieldTable arguments = null;
            if (queueArgumentsMap != null) {
                arguments = FieldTable.convertToFieldTable((Map)queueArgumentsMap);
            }
            this.queue(entry.getKey(), queueName, owner, exclusive, arguments, alternateExchangeId);
        }
    }

    private void recoverBindings(Map<UUID, Map<String, Object>> bindingObjects) {
        for (Map.Entry<UUID, Map<String, Object>> entry : bindingObjects.entrySet()) {
            Map<String, Object> attributeMap = entry.getValue();
            UUID exchangeId = UUID.fromString((String)attributeMap.get("exchange"));
            UUID queueId = UUID.fromString((String)attributeMap.get("queue"));
            String bindingName = (String)attributeMap.get("name");
            Map bindingArgumentsMap = (Map)attributeMap.get("arguments");
            FieldTable arguments = null;
            if (bindingArgumentsMap != null) {
                arguments = FieldTable.convertToFieldTable((Map)bindingArgumentsMap);
            }
            ByteBuffer argumentsBB = arguments == null ? null : ByteBuffer.wrap(arguments.getDataAsBytes());
            this.binding(entry.getKey(), exchangeId, queueId, bindingName, argumentsBB);
        }
    }

    private static class DummyMessage
    implements EnqueableMessage {
        private final long _messageId;

        public DummyMessage(long messageId) {
            this._messageId = messageId;
        }

        public long getMessageNumber() {
            return this._messageId;
        }

        public boolean isPersistent() {
            return true;
        }

        public StoredMessage getStoredMessage() {
            return null;
        }
    }
}

