/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.store.kahadb;

import java.io.DataInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTempQueue;
import org.apache.activemq.command.ActiveMQTempTopic;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.SubscriptionInfo;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.command.XATransactionId;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.protobuf.Buffer;
import org.apache.activemq.store.AbstractMessageStore;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.store.TransactionRecoveryListener;
import org.apache.activemq.store.TransactionStore;
import org.apache.activemq.store.kahadb.TempMessageDatabase;
import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand;
import org.apache.activemq.store.kahadb.data.KahaDestination;
import org.apache.activemq.store.kahadb.data.KahaLocation;
import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand;
import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand;
import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand;
import org.apache.activemq.usage.MemoryUsage;
import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.wireformat.WireFormat;
import org.apache.kahadb.journal.Location;
import org.apache.kahadb.page.Transaction;

/*
 * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
 */
public class TempKahaDBStore
extends TempMessageDatabase
implements PersistenceAdapter {
    private WireFormat wireFormat = new OpenWireFormat();

    @Override
    public void setBrokerName(String brokerName) {
    }

    @Override
    public void setUsageManager(SystemUsage usageManager) {
    }

    @Override
    public TransactionStore createTransactionStore() throws IOException {
        return new TransactionStore(){

            public void commit(TransactionId txid, boolean wasPrepared) throws IOException {
                TempKahaDBStore.this.processCommit(txid);
            }

            public void prepare(TransactionId txid) throws IOException {
                TempKahaDBStore.this.processPrepare(txid);
            }

            public void rollback(TransactionId txid) throws IOException {
                TempKahaDBStore.this.processRollback(txid);
            }

            public void recover(TransactionRecoveryListener listener) throws IOException {
                for (Map.Entry entry : TempKahaDBStore.this.preparedTransactions.entrySet()) {
                    XATransactionId xid = (XATransactionId)entry.getKey();
                    ArrayList<Message> messageList = new ArrayList<Message>();
                    ArrayList<MessageAck> ackList = new ArrayList<MessageAck>();
                    for (TempMessageDatabase.Operation op : (ArrayList)entry.getValue()) {
                        if (op.getClass() == TempMessageDatabase.AddOpperation.class) {
                            TempMessageDatabase.AddOpperation addOp = (TempMessageDatabase.AddOpperation)op;
                            Message msg = (Message)TempKahaDBStore.this.wireFormat.unmarshal(new DataInputStream(addOp.getCommand().getMessage().newInput()));
                            messageList.add(msg);
                            continue;
                        }
                        TempMessageDatabase.RemoveOpperation rmOp = (TempMessageDatabase.RemoveOpperation)op;
                        MessageAck ack = (MessageAck)TempKahaDBStore.this.wireFormat.unmarshal(new DataInputStream(rmOp.getCommand().getAck().newInput()));
                        ackList.add(ack);
                    }
                    Message[] addedMessages = new Message[messageList.size()];
                    MessageAck[] acks = new MessageAck[ackList.size()];
                    messageList.toArray(addedMessages);
                    ackList.toArray(acks);
                    listener.recover(xid, addedMessages, acks);
                }
            }

            public void start() throws Exception {
            }

            public void stop() throws Exception {
            }
        };
    }

    String subscriptionKey(String clientId, String subscriptionName) {
        return clientId + ":" + subscriptionName;
    }

    @Override
    public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
        return new KahaDBMessageStore(destination);
    }

    @Override
    public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException {
        return new KahaDBTopicMessageStore(destination);
    }

    @Override
    public void removeQueueMessageStore(ActiveMQQueue destination) {
    }

    @Override
    public void removeTopicMessageStore(ActiveMQTopic destination) {
    }

    @Override
    public void deleteAllMessages() throws IOException {
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Set<ActiveMQDestination> getDestinations() {
        try {
            final HashSet<ActiveMQDestination> rc = new HashSet<ActiveMQDestination>();
            Object object = this.indexMutex;
            synchronized (object) {
                this.pageFile.tx().execute(new Transaction.Closure<IOException>(){

                    @Override
                    public void execute(Transaction tx) throws IOException {
                        Iterator iterator = TempKahaDBStore.this.destinations.iterator(tx);
                        while (iterator.hasNext()) {
                            Map.Entry entry = iterator.next();
                            rc.add(TempKahaDBStore.this.convert((String)entry.getKey()));
                        }
                    }
                });
            }
            return rc;
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public long getLastMessageBrokerSequenceId() throws IOException {
        return 0L;
    }

    @Override
    public long size() {
        if (!this.started.get()) {
            return 0L;
        }
        try {
            return this.pageFile.getDiskSize();
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public void beginTransaction(ConnectionContext context) throws IOException {
        throw new IOException("Not yet implemented.");
    }

    @Override
    public void commitTransaction(ConnectionContext context) throws IOException {
        throw new IOException("Not yet implemented.");
    }

    @Override
    public void rollbackTransaction(ConnectionContext context) throws IOException {
        throw new IOException("Not yet implemented.");
    }

    @Override
    public void checkpoint(boolean sync) throws IOException {
    }

    KahaLocation convert(Location location) {
        KahaLocation rc = new KahaLocation();
        rc.setLogId(location.getDataFileId());
        rc.setOffset(location.getOffset());
        return rc;
    }

    KahaDestination convert(ActiveMQDestination dest) {
        KahaDestination rc = new KahaDestination();
        rc.setName(dest.getPhysicalName());
        switch (dest.getDestinationType()) {
            case 1: {
                rc.setType(KahaDestination.DestinationType.QUEUE);
                return rc;
            }
            case 2: {
                rc.setType(KahaDestination.DestinationType.TOPIC);
                return rc;
            }
            case 5: {
                rc.setType(KahaDestination.DestinationType.TEMP_QUEUE);
                return rc;
            }
            case 6: {
                rc.setType(KahaDestination.DestinationType.TEMP_TOPIC);
                return rc;
            }
        }
        return null;
    }

    ActiveMQDestination convert(String dest) {
        int p = dest.indexOf(":");
        if (p < 0) {
            throw new IllegalArgumentException("Not in the valid destination format");
        }
        int type = Integer.parseInt(dest.substring(0, p));
        String name = dest.substring(p + 1);
        switch (KahaDestination.DestinationType.valueOf(type)) {
            case QUEUE: {
                return new ActiveMQQueue(name);
            }
            case TOPIC: {
                return new ActiveMQTopic(name);
            }
            case TEMP_QUEUE: {
                return new ActiveMQTempQueue(name);
            }
            case TEMP_TOPIC: {
                return new ActiveMQTempTopic(name);
            }
        }
        throw new IllegalArgumentException("Not in the valid destination format");
    }

    class KahaDBTopicMessageStore
    extends KahaDBMessageStore
    implements TopicMessageStore {
        public KahaDBTopicMessageStore(ActiveMQTopic destination) {
            super(destination);
        }

        public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, MessageId messageId) throws IOException {
            KahaRemoveMessageCommand command = new KahaRemoveMessageCommand();
            command.setDestination(this.dest);
            command.setSubscriptionKey(TempKahaDBStore.this.subscriptionKey(clientId, subscriptionName));
            command.setMessageId(messageId.toString());
            TempKahaDBStore.this.processRemove(command, null);
        }

        public void addSubsciption(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException {
            String subscriptionKey = TempKahaDBStore.this.subscriptionKey(subscriptionInfo.getClientId(), subscriptionInfo.getSubscriptionName());
            KahaSubscriptionCommand command = new KahaSubscriptionCommand();
            command.setDestination(this.dest);
            command.setSubscriptionKey(subscriptionKey);
            command.setRetroactive(retroactive);
            ByteSequence packet = TempKahaDBStore.this.wireFormat.marshal(subscriptionInfo);
            command.setSubscriptionInfo(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
            TempKahaDBStore.this.process(command);
        }

        public void deleteSubscription(String clientId, String subscriptionName) throws IOException {
            KahaSubscriptionCommand command = new KahaSubscriptionCommand();
            command.setDestination(this.dest);
            command.setSubscriptionKey(TempKahaDBStore.this.subscriptionKey(clientId, subscriptionName));
            TempKahaDBStore.this.process(command);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public SubscriptionInfo[] getAllSubscriptions() throws IOException {
            final ArrayList subscriptions = new ArrayList();
            Object object = TempKahaDBStore.this.indexMutex;
            synchronized (object) {
                TempKahaDBStore.this.pageFile.tx().execute(new Transaction.Closure<IOException>(){

                    @Override
                    public void execute(Transaction tx) throws IOException {
                        TempMessageDatabase.StoredDestination sd = TempKahaDBStore.this.getStoredDestination(KahaDBTopicMessageStore.this.dest, tx);
                        Iterator<Map.Entry<String, KahaSubscriptionCommand>> iterator = sd.subscriptions.iterator(tx);
                        while (iterator.hasNext()) {
                            Map.Entry<String, KahaSubscriptionCommand> entry = iterator.next();
                            SubscriptionInfo info = (SubscriptionInfo)TempKahaDBStore.this.wireFormat.unmarshal(new DataInputStream(entry.getValue().getSubscriptionInfo().newInput()));
                            subscriptions.add(info);
                        }
                    }
                });
            }
            SubscriptionInfo[] rc = new SubscriptionInfo[subscriptions.size()];
            subscriptions.toArray(rc);
            return rc;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException {
            final String subscriptionKey = TempKahaDBStore.this.subscriptionKey(clientId, subscriptionName);
            Object object = TempKahaDBStore.this.indexMutex;
            synchronized (object) {
                return TempKahaDBStore.this.pageFile.tx().execute(new Transaction.CallableClosure<SubscriptionInfo, IOException>(){

                    @Override
                    public SubscriptionInfo execute(Transaction tx) throws IOException {
                        TempMessageDatabase.StoredDestination sd = TempKahaDBStore.this.getStoredDestination(KahaDBTopicMessageStore.this.dest, tx);
                        KahaSubscriptionCommand command = sd.subscriptions.get(tx, subscriptionKey);
                        if (command == null) {
                            return null;
                        }
                        return (SubscriptionInfo)TempKahaDBStore.this.wireFormat.unmarshal(new DataInputStream(command.getSubscriptionInfo().newInput()));
                    }
                });
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public int getMessageCount(String clientId, String subscriptionName) throws IOException {
            final String subscriptionKey = TempKahaDBStore.this.subscriptionKey(clientId, subscriptionName);
            Object object = TempKahaDBStore.this.indexMutex;
            synchronized (object) {
                return TempKahaDBStore.this.pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>(){

                    @Override
                    public Integer execute(Transaction tx) throws IOException {
                        TempMessageDatabase.StoredDestination sd = TempKahaDBStore.this.getStoredDestination(KahaDBTopicMessageStore.this.dest, tx);
                        Long cursorPos = sd.subscriptionAcks.get(tx, subscriptionKey);
                        if (cursorPos == null) {
                            return 0;
                        }
                        cursorPos = cursorPos + 1L;
                        int counter = 0;
                        Iterator<Map.Entry<Long, TempMessageDatabase.MessageRecord>> iterator = sd.orderIndex.iterator(tx, cursorPos);
                        while (iterator.hasNext()) {
                            iterator.next();
                            ++counter;
                        }
                        return counter;
                    }
                });
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void recoverSubscription(String clientId, String subscriptionName, final MessageRecoveryListener listener) throws Exception {
            final String subscriptionKey = TempKahaDBStore.this.subscriptionKey(clientId, subscriptionName);
            Object object = TempKahaDBStore.this.indexMutex;
            synchronized (object) {
                TempKahaDBStore.this.pageFile.tx().execute(new Transaction.Closure<Exception>(){

                    @Override
                    public void execute(Transaction tx) throws Exception {
                        TempMessageDatabase.StoredDestination sd = TempKahaDBStore.this.getStoredDestination(KahaDBTopicMessageStore.this.dest, tx);
                        Long cursorPos = sd.subscriptionAcks.get(tx, subscriptionKey);
                        cursorPos = cursorPos + 1L;
                        Iterator<Map.Entry<Long, TempMessageDatabase.MessageRecord>> iterator = sd.orderIndex.iterator(tx, cursorPos);
                        while (iterator.hasNext()) {
                            Map.Entry<Long, TempMessageDatabase.MessageRecord> entry = iterator.next();
                            listener.recoverMessage((Message)TempKahaDBStore.this.wireFormat.unmarshal(entry.getValue().data));
                        }
                    }
                });
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void recoverNextMessages(String clientId, String subscriptionName, final int maxReturned, final MessageRecoveryListener listener) throws Exception {
            final String subscriptionKey = TempKahaDBStore.this.subscriptionKey(clientId, subscriptionName);
            Object object = TempKahaDBStore.this.indexMutex;
            synchronized (object) {
                TempKahaDBStore.this.pageFile.tx().execute(new Transaction.Closure<Exception>(){

                    @Override
                    public void execute(Transaction tx) throws Exception {
                        TempMessageDatabase.StoredDestination sd = TempKahaDBStore.this.getStoredDestination(KahaDBTopicMessageStore.this.dest, tx);
                        Long cursorPos = sd.subscriptionCursors.get(subscriptionKey);
                        if (cursorPos == null) {
                            cursorPos = sd.subscriptionAcks.get(tx, subscriptionKey);
                            cursorPos = cursorPos + 1L;
                        }
                        Map.Entry<Long, TempMessageDatabase.MessageRecord> entry = null;
                        int counter = 0;
                        Iterator<Map.Entry<Long, TempMessageDatabase.MessageRecord>> iterator = sd.orderIndex.iterator(tx, cursorPos);
                        while (iterator.hasNext()) {
                            entry = iterator.next();
                            listener.recoverMessage((Message)TempKahaDBStore.this.wireFormat.unmarshal(entry.getValue().data));
                            if (++counter < maxReturned) continue;
                        }
                        if (entry != null) {
                            sd.subscriptionCursors.put(subscriptionKey, entry.getKey() + 1L);
                        }
                    }
                });
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void resetBatching(String clientId, String subscriptionName) {
            try {
                final String subscriptionKey = TempKahaDBStore.this.subscriptionKey(clientId, subscriptionName);
                Object object = TempKahaDBStore.this.indexMutex;
                synchronized (object) {
                    TempKahaDBStore.this.pageFile.tx().execute(new Transaction.Closure<IOException>(){

                        @Override
                        public void execute(Transaction tx) throws IOException {
                            TempMessageDatabase.StoredDestination sd = TempKahaDBStore.this.getStoredDestination(KahaDBTopicMessageStore.this.dest, tx);
                            sd.subscriptionCursors.remove(subscriptionKey);
                        }
                    });
                }
            }
            catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
    }

    public class KahaDBMessageStore
    extends AbstractMessageStore {
        protected KahaDestination dest;
        long cursorPos;

        public KahaDBMessageStore(ActiveMQDestination destination) {
            super(destination);
            this.cursorPos = 0L;
            this.dest = TempKahaDBStore.this.convert(destination);
        }

        public ActiveMQDestination getDestination() {
            return this.destination;
        }

        public void addMessage(ConnectionContext context, Message message) throws IOException {
            KahaAddMessageCommand command = new KahaAddMessageCommand();
            command.setDestination(this.dest);
            command.setMessageId(message.getMessageId().toString());
            TempKahaDBStore.this.processAdd(command, message.getTransactionId(), TempKahaDBStore.this.wireFormat.marshal(message));
        }

        public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException {
            KahaRemoveMessageCommand command = new KahaRemoveMessageCommand();
            command.setDestination(this.dest);
            command.setMessageId(ack.getLastMessageId().toString());
            TempKahaDBStore.this.processRemove(command, ack.getTransactionId());
        }

        public void removeAllMessages(ConnectionContext context) throws IOException {
            KahaRemoveDestinationCommand command = new KahaRemoveDestinationCommand();
            command.setDestination(this.dest);
            TempKahaDBStore.this.process(command);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public Message getMessage(MessageId identity) throws IOException {
            ByteSequence data;
            final String key = identity.toString();
            Object object = TempKahaDBStore.this.indexMutex;
            synchronized (object) {
                data = TempKahaDBStore.this.pageFile.tx().execute(new Transaction.CallableClosure<ByteSequence, IOException>(){

                    @Override
                    public ByteSequence execute(Transaction tx) throws IOException {
                        TempMessageDatabase.StoredDestination sd = TempKahaDBStore.this.getStoredDestination(KahaDBMessageStore.this.dest, tx);
                        Long sequence = sd.messageIdIndex.get(tx, key);
                        if (sequence == null) {
                            return null;
                        }
                        return sd.orderIndex.get((Transaction)tx, (Long)sequence).data;
                    }
                });
            }
            if (data == null) {
                return null;
            }
            Message msg = (Message)TempKahaDBStore.this.wireFormat.unmarshal(data);
            return msg;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public int getMessageCount() throws IOException {
            Object object = TempKahaDBStore.this.indexMutex;
            synchronized (object) {
                return TempKahaDBStore.this.pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>(){

                    @Override
                    public Integer execute(Transaction tx) throws IOException {
                        TempMessageDatabase.StoredDestination sd = TempKahaDBStore.this.getStoredDestination(KahaDBMessageStore.this.dest, tx);
                        int rc = 0;
                        Iterator<Map.Entry<String, Long>> iterator = sd.messageIdIndex.iterator(tx);
                        while (iterator.hasNext()) {
                            iterator.next();
                            ++rc;
                        }
                        return rc;
                    }
                });
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void recover(final MessageRecoveryListener listener) throws Exception {
            Object object = TempKahaDBStore.this.indexMutex;
            synchronized (object) {
                TempKahaDBStore.this.pageFile.tx().execute(new Transaction.Closure<Exception>(){

                    @Override
                    public void execute(Transaction tx) throws Exception {
                        TempMessageDatabase.StoredDestination sd = TempKahaDBStore.this.getStoredDestination(KahaDBMessageStore.this.dest, tx);
                        Iterator<Map.Entry<Long, TempMessageDatabase.MessageRecord>> iterator = sd.orderIndex.iterator(tx);
                        while (iterator.hasNext()) {
                            Map.Entry<Long, TempMessageDatabase.MessageRecord> entry = iterator.next();
                            listener.recoverMessage((Message)TempKahaDBStore.this.wireFormat.unmarshal(entry.getValue().data));
                        }
                    }
                });
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void recoverNextMessages(final int maxReturned, final MessageRecoveryListener listener) throws Exception {
            Object object = TempKahaDBStore.this.indexMutex;
            synchronized (object) {
                TempKahaDBStore.this.pageFile.tx().execute(new Transaction.Closure<Exception>(){

                    @Override
                    public void execute(Transaction tx) throws Exception {
                        TempMessageDatabase.StoredDestination sd = TempKahaDBStore.this.getStoredDestination(KahaDBMessageStore.this.dest, tx);
                        Map.Entry<Long, TempMessageDatabase.MessageRecord> entry = null;
                        int counter = 0;
                        Iterator<Map.Entry<Long, TempMessageDatabase.MessageRecord>> iterator = sd.orderIndex.iterator(tx, KahaDBMessageStore.this.cursorPos);
                        while (iterator.hasNext()) {
                            entry = iterator.next();
                            listener.recoverMessage((Message)TempKahaDBStore.this.wireFormat.unmarshal(entry.getValue().data));
                            if (++counter < maxReturned) continue;
                        }
                        if (entry != null) {
                            KahaDBMessageStore.this.cursorPos = entry.getKey() + 1L;
                        }
                    }
                });
            }
        }

        public void resetBatching() {
            this.cursorPos = 0L;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void setBatch(MessageId identity) throws IOException {
            Long location;
            final String key = identity.toString();
            Object object = TempKahaDBStore.this.indexMutex;
            synchronized (object) {
                location = TempKahaDBStore.this.pageFile.tx().execute(new Transaction.CallableClosure<Long, IOException>(){

                    @Override
                    public Long execute(Transaction tx) throws IOException {
                        TempMessageDatabase.StoredDestination sd = TempKahaDBStore.this.getStoredDestination(KahaDBMessageStore.this.dest, tx);
                        return sd.messageIdIndex.get(tx, key);
                    }
                });
            }
            if (location != null) {
                this.cursorPos = location + 1L;
            }
        }

        public void setMemoryUsage(MemoryUsage memoeyUSage) {
        }

        public void start() throws Exception {
        }

        public void stop() throws Exception {
        }
    }
}

