/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.store.kahadb;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.command.SubscriptionInfo;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand;
import org.apache.activemq.store.kahadb.data.KahaDestination;
import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand;
import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand;
import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand;
import org.apache.activemq.util.ByteSequence;
import org.apache.kahadb.index.BTreeIndex;
import org.apache.kahadb.page.PageFile;
import org.apache.kahadb.page.Transaction;
import org.apache.kahadb.util.LongMarshaller;
import org.apache.kahadb.util.StringMarshaller;
import org.apache.kahadb.util.VariableMarshaller;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TempMessageDatabase {
    private static final Logger LOG = LoggerFactory.getLogger(TempMessageDatabase.class);
    public static final int CLOSED_STATE = 1;
    public static final int OPEN_STATE = 2;
    protected BTreeIndex<String, StoredDestination> destinations;
    protected PageFile pageFile;
    protected File directory;
    boolean enableIndexWriteAsync = true;
    int setIndexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE;
    protected AtomicBoolean started = new AtomicBoolean();
    protected AtomicBoolean opened = new AtomicBoolean();
    protected final Object indexMutex = new Object();
    private final HashSet<Integer> journalFilesBeingReplicated = new HashSet();
    private final HashMap<String, StoredDestination> storedDestinations = new HashMap();
    protected final LinkedHashMap<TransactionId, ArrayList<Operation>> inflightTransactions = new LinkedHashMap();
    protected final LinkedHashMap<TransactionId, ArrayList<Operation>> preparedTransactions = new LinkedHashMap();

    public void start() throws Exception {
        if (this.started.compareAndSet(false, true)) {
            this.load();
        }
    }

    public void stop() throws Exception {
        if (this.started.compareAndSet(true, false)) {
            this.unload();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void loadPageFile() throws IOException {
        Object object = this.indexMutex;
        synchronized (object) {
            final PageFile pageFile = this.getPageFile();
            pageFile.load();
            pageFile.tx().execute(new Transaction.Closure<IOException>(){

                @Override
                public void execute(Transaction tx) throws IOException {
                    TempMessageDatabase.this.destinations = new BTreeIndex(pageFile, tx.allocate().getPageId());
                    TempMessageDatabase.this.destinations.setKeyMarshaller(StringMarshaller.INSTANCE);
                    TempMessageDatabase.this.destinations.setValueMarshaller(new StoredDestinationMarshaller());
                    TempMessageDatabase.this.destinations.load(tx);
                }
            });
            pageFile.flush();
            this.storedDestinations.clear();
        }
    }

    public void open() throws IOException {
        if (this.opened.compareAndSet(false, true)) {
            this.loadPageFile();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void load() throws IOException {
        Object object = this.indexMutex;
        synchronized (object) {
            this.open();
            this.pageFile.unload();
            this.pageFile.delete();
            this.loadPageFile();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void close() throws IOException, InterruptedException {
        if (this.opened.compareAndSet(true, false)) {
            Object object = this.indexMutex;
            synchronized (object) {
                this.pageFile.unload();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void unload() throws IOException, InterruptedException {
        Object object = this.indexMutex;
        synchronized (object) {
            if (this.pageFile.isLoaded()) {
                this.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void processAdd(final KahaAddMessageCommand command, TransactionId txid, final ByteSequence data) throws IOException {
        if (txid != null) {
            Object object = this.indexMutex;
            synchronized (object) {
                ArrayList<Operation> inflightTx = this.getInflightTx(txid);
                inflightTx.add(new AddOpperation(command, data));
            }
        }
        Object object = this.indexMutex;
        synchronized (object) {
            this.pageFile.tx().execute(new Transaction.Closure<IOException>(){

                @Override
                public void execute(Transaction tx) throws IOException {
                    TempMessageDatabase.this.upadateIndex(tx, command, data);
                }
            });
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void processRemove(final KahaRemoveMessageCommand command, TransactionId txid) throws IOException {
        if (txid != null) {
            Object object = this.indexMutex;
            synchronized (object) {
                ArrayList<Operation> inflightTx = this.getInflightTx(txid);
                inflightTx.add(new RemoveOpperation(command));
            }
        }
        Object object = this.indexMutex;
        synchronized (object) {
            this.pageFile.tx().execute(new Transaction.Closure<IOException>(){

                @Override
                public void execute(Transaction tx) throws IOException {
                    TempMessageDatabase.this.updateIndex(tx, command);
                }
            });
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void process(final KahaRemoveDestinationCommand command) throws IOException {
        Object object = this.indexMutex;
        synchronized (object) {
            this.pageFile.tx().execute(new Transaction.Closure<IOException>(){

                @Override
                public void execute(Transaction tx) throws IOException {
                    TempMessageDatabase.this.updateIndex(tx, command);
                }
            });
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void process(final KahaSubscriptionCommand command) throws IOException {
        Object object = this.indexMutex;
        synchronized (object) {
            this.pageFile.tx().execute(new Transaction.Closure<IOException>(){

                @Override
                public void execute(Transaction tx) throws IOException {
                    TempMessageDatabase.this.updateIndex(tx, command);
                }
            });
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void processCommit(TransactionId key) throws IOException {
        Object object = this.indexMutex;
        synchronized (object) {
            ArrayList inflightTx = (ArrayList)this.inflightTransactions.remove(key);
            if (inflightTx == null) {
                inflightTx = (ArrayList)this.preparedTransactions.remove(key);
            }
            if (inflightTx == null) {
                return;
            }
            final ArrayList messagingTx = inflightTx;
            this.pageFile.tx().execute(new Transaction.Closure<IOException>(){

                @Override
                public void execute(Transaction tx) throws IOException {
                    for (Operation op : messagingTx) {
                        op.execute(tx);
                    }
                }
            });
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void processPrepare(TransactionId key) {
        Object object = this.indexMutex;
        synchronized (object) {
            ArrayList tx = (ArrayList)this.inflightTransactions.remove(key);
            if (tx != null) {
                this.preparedTransactions.put(key, tx);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void processRollback(TransactionId key) {
        Object object = this.indexMutex;
        synchronized (object) {
            ArrayList tx = (ArrayList)this.inflightTransactions.remove(key);
            if (tx == null) {
                this.preparedTransactions.remove(key);
            }
        }
    }

    private void upadateIndex(Transaction tx, KahaAddMessageCommand command, ByteSequence data) throws IOException {
        StoredDestination sd = this.getStoredDestination(command.getDestination(), tx);
        if (sd.subscriptions != null && sd.ackPositions.isEmpty()) {
            return;
        }
        long id = sd.nextMessageId++;
        Long previous = sd.messageIdIndex.put(tx, command.getMessageId(), id);
        if (previous == null) {
            sd.orderIndex.put(tx, id, new MessageRecord(command.getMessageId(), data));
        } else {
            sd.messageIdIndex.put(tx, command.getMessageId(), previous);
        }
    }

    private void updateIndex(Transaction tx, KahaRemoveMessageCommand command) throws IOException {
        StoredDestination sd = this.getStoredDestination(command.getDestination(), tx);
        if (!command.hasSubscriptionKey()) {
            Long sequenceId = sd.messageIdIndex.remove(tx, command.getMessageId());
            if (sequenceId != null) {
                sd.orderIndex.remove(tx, sequenceId);
            }
        } else {
            Long sequence = sd.messageIdIndex.get(tx, command.getMessageId());
            if (sequence != null) {
                String subscriptionKey = command.getSubscriptionKey();
                Long prev = sd.subscriptionAcks.put(tx, subscriptionKey, sequence);
                this.removeAckByteSequence(tx, sd, subscriptionKey, prev);
                this.addAckByteSequence(sd, sequence, subscriptionKey);
            }
        }
    }

    private void updateIndex(Transaction tx, KahaRemoveDestinationCommand command) throws IOException {
        StoredDestination sd = this.getStoredDestination(command.getDestination(), tx);
        sd.orderIndex.clear(tx);
        sd.orderIndex.unload(tx);
        tx.free(sd.orderIndex.getPageId());
        sd.messageIdIndex.clear(tx);
        sd.messageIdIndex.unload(tx);
        tx.free(sd.messageIdIndex.getPageId());
        if (sd.subscriptions != null) {
            sd.subscriptions.clear(tx);
            sd.subscriptions.unload(tx);
            tx.free(sd.subscriptions.getPageId());
            sd.subscriptionAcks.clear(tx);
            sd.subscriptionAcks.unload(tx);
            tx.free(sd.subscriptionAcks.getPageId());
        }
        String key = this.key(command.getDestination());
        this.storedDestinations.remove(key);
        this.destinations.remove(tx, key);
    }

    private void updateIndex(Transaction tx, KahaSubscriptionCommand command) throws IOException {
        StoredDestination sd = this.getStoredDestination(command.getDestination(), tx);
        if (command.hasSubscriptionInfo()) {
            String subscriptionKey = command.getSubscriptionKey();
            sd.subscriptions.put(tx, subscriptionKey, command);
            long ackByteSequence = -1L;
            if (!command.getRetroactive()) {
                ackByteSequence = sd.nextMessageId - 1L;
            }
            sd.subscriptionAcks.put(tx, subscriptionKey, ackByteSequence);
            this.addAckByteSequence(sd, ackByteSequence, subscriptionKey);
        } else {
            String subscriptionKey = command.getSubscriptionKey();
            sd.subscriptions.remove(tx, subscriptionKey);
            Long prev = sd.subscriptionAcks.remove(tx, subscriptionKey);
            if (prev != null) {
                this.removeAckByteSequence(tx, sd, subscriptionKey, prev);
            }
        }
    }

    public HashSet<Integer> getJournalFilesBeingReplicated() {
        return this.journalFilesBeingReplicated;
    }

    protected StoredDestination getStoredDestination(KahaDestination destination, Transaction tx) throws IOException {
        String key = this.key(destination);
        StoredDestination rc = this.storedDestinations.get(key);
        if (rc == null) {
            boolean topic = destination.getType() == KahaDestination.DestinationType.TOPIC || destination.getType() == KahaDestination.DestinationType.TEMP_TOPIC;
            rc = this.loadStoredDestination(tx, key, topic);
            this.storedDestinations.put(key, rc);
        }
        return rc;
    }

    private StoredDestination loadStoredDestination(Transaction tx, String key, boolean topic) throws IOException {
        StoredDestination rc = this.destinations.get(tx, key);
        if (rc == null) {
            rc = new StoredDestination();
            rc.orderIndex = new BTreeIndex(this.pageFile, tx.allocate());
            rc.messageIdIndex = new BTreeIndex(this.pageFile, tx.allocate());
            if (topic) {
                rc.subscriptions = new BTreeIndex(this.pageFile, tx.allocate());
                rc.subscriptionAcks = new BTreeIndex(this.pageFile, tx.allocate());
            }
            this.destinations.put(tx, key, rc);
        }
        rc.orderIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
        rc.orderIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE);
        rc.orderIndex.load(tx);
        Map.Entry<Long, MessageRecord> lastEntry = rc.orderIndex.getLast(tx);
        if (lastEntry != null) {
            rc.nextMessageId = lastEntry.getKey() + 1L;
        }
        rc.messageIdIndex.setKeyMarshaller(StringMarshaller.INSTANCE);
        rc.messageIdIndex.setValueMarshaller(LongMarshaller.INSTANCE);
        rc.messageIdIndex.load(tx);
        if (topic) {
            rc.subscriptions.setKeyMarshaller(StringMarshaller.INSTANCE);
            rc.subscriptions.setValueMarshaller(KahaSubscriptionCommandMarshaller.INSTANCE);
            rc.subscriptions.load(tx);
            rc.subscriptionAcks.setKeyMarshaller(StringMarshaller.INSTANCE);
            rc.subscriptionAcks.setValueMarshaller(LongMarshaller.INSTANCE);
            rc.subscriptionAcks.load(tx);
            rc.ackPositions = new TreeMap();
            rc.subscriptionCursors = new HashMap();
            Iterator<Map.Entry<String, Long>> iterator = rc.subscriptionAcks.iterator(tx);
            while (iterator.hasNext()) {
                Map.Entry<String, Long> entry = iterator.next();
                this.addAckByteSequence(rc, entry.getValue(), entry.getKey());
            }
        }
        return rc;
    }

    private void addAckByteSequence(StoredDestination sd, Long messageSequence, String subscriptionKey) {
        HashSet<String> hs = sd.ackPositions.get(messageSequence);
        if (hs == null) {
            hs = new HashSet();
            sd.ackPositions.put(messageSequence, hs);
        }
        hs.add(subscriptionKey);
    }

    private void removeAckByteSequence(Transaction tx, StoredDestination sd, String subscriptionKey, Long sequenceId) throws IOException {
        HashSet<String> hs;
        if (sequenceId != null && (hs = sd.ackPositions.get(sequenceId)) != null) {
            hs.remove(subscriptionKey);
            if (hs.isEmpty()) {
                HashSet<String> firstSet = sd.ackPositions.values().iterator().next();
                sd.ackPositions.remove(sequenceId);
                if (hs == firstSet) {
                    ArrayList<Map.Entry> deletes = new ArrayList<Map.Entry>();
                    Iterator<Map.Entry<Long, MessageRecord>> iterator = sd.orderIndex.iterator(tx);
                    while (iterator.hasNext()) {
                        Map.Entry entry = iterator.next();
                        if (entry.getKey().compareTo(sequenceId) > 0) continue;
                        deletes.add(entry);
                    }
                    for (Map.Entry entry : deletes) {
                        sd.messageIdIndex.remove(tx, ((MessageRecord)entry.getValue()).messageId);
                        sd.orderIndex.remove(tx, (Long)entry.getKey());
                    }
                }
            }
        }
    }

    private String key(KahaDestination destination) {
        return destination.getType().getNumber() + ":" + destination.getName();
    }

    private ArrayList<Operation> getInflightTx(TransactionId key) {
        ArrayList<Operation> tx = this.inflightTransactions.get(key);
        if (tx == null) {
            tx = new ArrayList();
            this.inflightTransactions.put(key, tx);
        }
        return tx;
    }

    private PageFile createPageFile() {
        PageFile index = new PageFile(this.directory, "temp-db");
        index.setEnableWriteThread(this.isEnableIndexWriteAsync());
        index.setWriteBatchSize(this.getIndexWriteBatchSize());
        index.setEnableDiskSyncs(false);
        index.setEnableRecoveryFile(false);
        return index;
    }

    public File getDirectory() {
        return this.directory;
    }

    public void setDirectory(File directory) {
        this.directory = directory;
    }

    public void setIndexWriteBatchSize(int setIndexWriteBatchSize) {
        this.setIndexWriteBatchSize = setIndexWriteBatchSize;
    }

    public int getIndexWriteBatchSize() {
        return this.setIndexWriteBatchSize;
    }

    public void setEnableIndexWriteAsync(boolean enableIndexWriteAsync) {
        this.enableIndexWriteAsync = enableIndexWriteAsync;
    }

    boolean isEnableIndexWriteAsync() {
        return this.enableIndexWriteAsync;
    }

    public PageFile getPageFile() {
        if (this.pageFile == null) {
            this.pageFile = this.createPageFile();
        }
        return this.pageFile;
    }

    class RemoveOpperation
    extends Operation {
        final KahaRemoveMessageCommand command;

        public RemoveOpperation(KahaRemoveMessageCommand command) {
            this.command = command;
        }

        @Override
        public void execute(Transaction tx) throws IOException {
            TempMessageDatabase.this.updateIndex(tx, this.command);
        }

        public KahaRemoveMessageCommand getCommand() {
            return this.command;
        }
    }

    class AddOpperation
    extends Operation {
        final KahaAddMessageCommand command;
        private final ByteSequence data;

        public AddOpperation(KahaAddMessageCommand command, ByteSequence location) {
            this.command = command;
            this.data = location;
        }

        @Override
        public void execute(Transaction tx) throws IOException {
            TempMessageDatabase.this.upadateIndex(tx, this.command, this.data);
        }

        public KahaAddMessageCommand getCommand() {
            return this.command;
        }
    }

    abstract class Operation {
        Operation() {
        }

        public abstract void execute(Transaction var1) throws IOException;
    }

    static class KahaSubscriptionCommandMarshaller
    extends VariableMarshaller<KahaSubscriptionCommand> {
        static final KahaSubscriptionCommandMarshaller INSTANCE = new KahaSubscriptionCommandMarshaller();

        KahaSubscriptionCommandMarshaller() {
        }

        @Override
        public KahaSubscriptionCommand readPayload(DataInput dataIn) throws IOException {
            KahaSubscriptionCommand rc = new KahaSubscriptionCommand();
            rc.mergeFramed((InputStream)((Object)dataIn));
            return rc;
        }

        @Override
        public void writePayload(KahaSubscriptionCommand object, DataOutput dataOut) throws IOException {
            object.writeFramed((OutputStream)((Object)dataOut));
        }
    }

    static class ByteSequenceMarshaller
    extends VariableMarshaller<ByteSequence> {
        static final ByteSequenceMarshaller INSTANCE = new ByteSequenceMarshaller();

        ByteSequenceMarshaller() {
        }

        @Override
        public ByteSequence readPayload(DataInput dataIn) throws IOException {
            byte[] data = new byte[dataIn.readInt()];
            dataIn.readFully(data);
            return new ByteSequence(data);
        }

        @Override
        public void writePayload(ByteSequence object, DataOutput dataOut) throws IOException {
            dataOut.writeInt(object.getLength());
            dataOut.write(object.getData(), object.getOffset(), object.getLength());
        }
    }

    protected class StoredDestinationMarshaller
    extends VariableMarshaller<StoredDestination> {
        protected StoredDestinationMarshaller() {
        }

        public Class<StoredDestination> getType() {
            return StoredDestination.class;
        }

        @Override
        public StoredDestination readPayload(DataInput dataIn) throws IOException {
            StoredDestination value = new StoredDestination();
            value.orderIndex = new BTreeIndex(TempMessageDatabase.this.pageFile, dataIn.readLong());
            value.messageIdIndex = new BTreeIndex(TempMessageDatabase.this.pageFile, dataIn.readLong());
            if (dataIn.readBoolean()) {
                value.subscriptions = new BTreeIndex(TempMessageDatabase.this.pageFile, dataIn.readLong());
                value.subscriptionAcks = new BTreeIndex(TempMessageDatabase.this.pageFile, dataIn.readLong());
            }
            return value;
        }

        @Override
        public void writePayload(StoredDestination value, DataOutput dataOut) throws IOException {
            dataOut.writeLong(value.orderIndex.getPageId());
            dataOut.writeLong(value.messageIdIndex.getPageId());
            if (value.subscriptions != null) {
                dataOut.writeBoolean(true);
                dataOut.writeLong(value.subscriptions.getPageId());
                dataOut.writeLong(value.subscriptionAcks.getPageId());
            } else {
                dataOut.writeBoolean(false);
            }
        }
    }

    static class StoredDestination {
        long nextMessageId;
        BTreeIndex<Long, MessageRecord> orderIndex;
        BTreeIndex<String, Long> messageIdIndex;
        BTreeIndex<String, KahaSubscriptionCommand> subscriptions;
        BTreeIndex<String, Long> subscriptionAcks;
        HashMap<String, Long> subscriptionCursors;
        TreeMap<Long, HashSet<String>> ackPositions;

        StoredDestination() {
        }
    }

    protected static class MessageKeysMarshaller
    extends VariableMarshaller<MessageRecord> {
        static final MessageKeysMarshaller INSTANCE = new MessageKeysMarshaller();

        protected MessageKeysMarshaller() {
        }

        @Override
        public MessageRecord readPayload(DataInput dataIn) throws IOException {
            return new MessageRecord(dataIn.readUTF(), ByteSequenceMarshaller.INSTANCE.readPayload(dataIn));
        }

        @Override
        public void writePayload(MessageRecord object, DataOutput dataOut) throws IOException {
            dataOut.writeUTF(object.messageId);
            ByteSequenceMarshaller.INSTANCE.writePayload(object.data, dataOut);
        }
    }

    static class MessageRecord {
        final String messageId;
        final ByteSequence data;

        public MessageRecord(String messageId, ByteSequence location) {
            this.messageId = messageId;
            this.data = location;
        }

        public String toString() {
            return "[" + this.messageId + "," + this.data + "]";
        }
    }

    class StoredSubscription {
        SubscriptionInfo subscriptionInfo;
        String lastAckId;
        ByteSequence lastAckByteSequence;
        ByteSequence cursor;

        StoredSubscription() {
        }
    }
}

