/*
 * Decompiled with CFR 0.152.
 */
package org.apache.qpid.server.store.berkeleydb;

import com.sleepycat.je.DatabaseException;
import com.sleepycat.je.Durability;
import com.sleepycat.je.Environment;
import com.sleepycat.je.EnvironmentConfig;
import com.sleepycat.je.OperationFailureException;
import com.sleepycat.je.Transaction;
import com.sleepycat.je.rep.InsufficientLogException;
import com.sleepycat.je.rep.NetworkRestore;
import com.sleepycat.je.rep.NetworkRestoreConfig;
import com.sleepycat.je.rep.ReplicatedEnvironment;
import com.sleepycat.je.rep.ReplicationConfig;
import com.sleepycat.je.rep.ReplicationMutableConfig;
import com.sleepycat.je.rep.ReplicationNode;
import com.sleepycat.je.rep.StateChangeEvent;
import com.sleepycat.je.rep.StateChangeListener;
import com.sleepycat.je.rep.util.ReplicationGroupAdmin;
import java.io.File;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQStoreException;
import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.logging.RootMessageLogger;
import org.apache.qpid.server.logging.actors.AbstractActor;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.store.HAMessageStore;
import org.apache.qpid.server.store.MessageStoreRecoveryHandler;
import org.apache.qpid.server.store.State;
import org.apache.qpid.server.store.StoreFuture;
import org.apache.qpid.server.store.TransactionLogRecoveryHandler;
import org.apache.qpid.server.store.berkeleydb.AbstractBDBMessageStore;
import org.apache.qpid.server.store.berkeleydb.CommitThreadWrapper;

/*
 * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
 */
public class BDBHAMessageStore
extends AbstractBDBMessageStore
implements HAMessageStore {
    private static final Logger LOGGER = Logger.getLogger(BDBHAMessageStore.class);
    private static final Durability DEFAULT_DURABILITY = new Durability(Durability.SyncPolicy.NO_SYNC, Durability.SyncPolicy.NO_SYNC, Durability.ReplicaAckPolicy.SIMPLE_MAJORITY);
    public static final String GRP_MEM_COL_NODE_HOST_PORT = "NodeHostPort";
    public static final String GRP_MEM_COL_NODE_NAME = "NodeName";
    private static final Map<String, String> REPCONFIG_DEFAULTS = Collections.unmodifiableMap(new HashMap<String, String>(){
        {
            this.put("je.rep.repStreamTimeout", "1 h");
            this.put("je.rep.replicaAckTimeout", "15 s");
            this.put("je.rep.insufficientReplicasTimeout", "20 s");
            this.put("je.rep.envSetupTimeout", "15 min");
            this.put("je.rep.protocolOldStringEncoding", Boolean.FALSE.toString());
            this.put("je.rep.logFlushTaskInterval", "1 min");
        }
    });
    public static final String BDB_HA_STORE_TYPE = "BDB-HA";
    private String _groupName;
    private String _nodeName;
    private String _nodeHostPort;
    private String _helperHostPort;
    private Durability _durability;
    private String _name;
    private CommitThreadWrapper _commitThreadWrapper;
    private boolean _coalescingSync;
    private boolean _designatedPrimary;
    private Map<String, String> _repConfig;

    @Override
    public void configure(String name, Configuration storeConfig) throws Exception {
        this._groupName = this.getValidatedPropertyFromConfig("highAvailability.groupName", storeConfig);
        this._nodeName = this.getValidatedPropertyFromConfig("highAvailability.nodeName", storeConfig);
        this._nodeHostPort = this.getValidatedPropertyFromConfig("highAvailability.nodeHostPort", storeConfig);
        this._helperHostPort = this.getValidatedPropertyFromConfig("highAvailability.helperHostPort", storeConfig);
        this._name = name;
        String durabilitySetting = storeConfig.getString("highAvailability.durability");
        this._durability = durabilitySetting == null ? DEFAULT_DURABILITY : Durability.parse((String)durabilitySetting);
        this._designatedPrimary = storeConfig.getBoolean("highAvailability.designatedPrimary", Boolean.FALSE);
        this._coalescingSync = storeConfig.getBoolean("highAvailability.coalescingSync", Boolean.TRUE);
        this._repConfig = this.getConfigMap(REPCONFIG_DEFAULTS, storeConfig, "repConfig");
        if (this._coalescingSync && this._durability.getLocalSync() == Durability.SyncPolicy.SYNC) {
            throw new ConfigurationException("Coalescing sync cannot be used with master sync policy " + Durability.SyncPolicy.SYNC + "! Please set highAvailability.coalescingSync to false in store configuration.");
        }
        super.configure(name, storeConfig);
    }

    @Override
    protected void setupStore(File storePath, String name) throws DatabaseException, AMQStoreException {
        super.setupStore(storePath, name);
        if (this._coalescingSync) {
            this._commitThreadWrapper = new CommitThreadWrapper("Commit-Thread-" + name, this.getEnvironment());
            this._commitThreadWrapper.startCommitThread();
        }
    }

    @Override
    protected Environment createEnvironment(File environmentPath) throws DatabaseException {
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info((Object)("Environment path " + environmentPath.getAbsolutePath()));
            LOGGER.info((Object)("Group name " + this._groupName));
            LOGGER.info((Object)("Node name " + this._nodeName));
            LOGGER.info((Object)("Node host port " + this._nodeHostPort));
            LOGGER.info((Object)("Helper host port " + this._helperHostPort));
            LOGGER.info((Object)("Durability " + this._durability));
            LOGGER.info((Object)("Coalescing sync " + this._coalescingSync));
            LOGGER.info((Object)("Designated primary (applicable to 2 node case only) " + this._designatedPrimary));
        }
        ReplicationConfig replicationConfig = new ReplicationConfig(this._groupName, this._nodeName, this._nodeHostPort);
        replicationConfig.setHelperHosts(this._helperHostPort);
        replicationConfig.setDesignatedPrimary(this._designatedPrimary);
        this.setReplicationConfigProperties(replicationConfig);
        EnvironmentConfig envConfig = this.createEnvironmentConfig();
        envConfig.setDurability(this._durability);
        ReplicatedEnvironment replicatedEnvironment = null;
        try {
            replicatedEnvironment = new ReplicatedEnvironment(environmentPath, replicationConfig, envConfig);
        }
        catch (InsufficientLogException ile) {
            LOGGER.info((Object)"InsufficientLogException thrown and so full network restore required", (Throwable)ile);
            NetworkRestore restore = new NetworkRestore();
            NetworkRestoreConfig config = new NetworkRestoreConfig();
            config.setRetainLogFiles(false);
            restore.execute(ile, config);
            replicatedEnvironment = new ReplicatedEnvironment(environmentPath, replicationConfig, envConfig);
        }
        return replicatedEnvironment;
    }

    @Override
    public void configureMessageStore(String name, MessageStoreRecoveryHandler messageRecoveryHandler, TransactionLogRecoveryHandler tlogRecoveryHandler, Configuration config) throws Exception {
        super.configureMessageStore(name, messageRecoveryHandler, tlogRecoveryHandler, config);
        ReplicatedEnvironment replicatedEnvironment = this.getReplicatedEnvironment();
        replicatedEnvironment.setStateChangeListener((StateChangeListener)new BDBHAMessageStoreStateChangeListener());
    }

    @Override
    public synchronized void activate() throws Exception {
        this.getEnvironment().flushLog(true);
        super.activate();
    }

    public synchronized void passivate() {
        if (this._stateManager.isNotInState(State.INITIALISED)) {
            LOGGER.debug((Object)"Store becoming passive");
            this._stateManager.attainState(State.INITIALISED);
        }
    }

    public String getName() {
        return this._name;
    }

    public String getGroupName() {
        return this._groupName;
    }

    public String getNodeName() {
        return this._nodeName;
    }

    public String getNodeHostPort() {
        return this._nodeHostPort;
    }

    public String getHelperHostPort() {
        return this._helperHostPort;
    }

    public String getDurability() {
        return this._durability.toString();
    }

    public boolean isCoalescingSync() {
        return this._coalescingSync;
    }

    public String getNodeState() {
        ReplicatedEnvironment.State state = this.getReplicatedEnvironment().getState();
        return state.toString();
    }

    public Boolean isDesignatedPrimary() {
        return this.getReplicatedEnvironment().getRepMutableConfig().getDesignatedPrimary();
    }

    public List<Map<String, String>> getGroupMembers() {
        ArrayList<Map<String, String>> members = new ArrayList<Map<String, String>>();
        for (ReplicationNode node : this.getReplicatedEnvironment().getGroup().getNodes()) {
            HashMap<String, String> nodeMap = new HashMap<String, String>();
            nodeMap.put(GRP_MEM_COL_NODE_NAME, node.getName());
            nodeMap.put(GRP_MEM_COL_NODE_HOST_PORT, node.getHostName() + ":" + node.getPort());
            members.add(nodeMap);
        }
        return members;
    }

    public void removeNodeFromGroup(String nodeName) throws AMQStoreException {
        try {
            this.createReplicationGroupAdmin().removeMember(nodeName);
        }
        catch (OperationFailureException ofe) {
            throw new AMQStoreException("Failed to remove '" + nodeName + "' from group. " + ofe.getMessage(), (Throwable)ofe);
        }
        catch (DatabaseException e) {
            throw new AMQStoreException("Failed to remove '" + nodeName + "' from group. " + e.getMessage(), (Throwable)e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void setDesignatedPrimary(boolean isPrimary) throws AMQStoreException {
        try {
            ReplicatedEnvironment replicatedEnvironment;
            ReplicatedEnvironment replicatedEnvironment2 = replicatedEnvironment = this.getReplicatedEnvironment();
            synchronized (replicatedEnvironment2) {
                ReplicationMutableConfig oldConfig = replicatedEnvironment.getRepMutableConfig();
                ReplicationMutableConfig newConfig = oldConfig.setDesignatedPrimary(isPrimary);
                replicatedEnvironment.setRepMutableConfig(newConfig);
            }
            if (LOGGER.isInfoEnabled()) {
                LOGGER.info((Object)("Node " + this._nodeName + " successfully set as designated primary for group"));
            }
        }
        catch (DatabaseException e) {
            throw new AMQStoreException("Failed to set '" + this._nodeName + "' as designated primary for group. " + e.getMessage(), (Throwable)e);
        }
    }

    ReplicatedEnvironment getReplicatedEnvironment() {
        return (ReplicatedEnvironment)this.getEnvironment();
    }

    public void updateAddress(String nodeName, String newHostName, int newPort) throws AMQStoreException {
        try {
            this.createReplicationGroupAdmin().updateAddress(nodeName, newHostName, newPort);
        }
        catch (OperationFailureException ofe) {
            throw new AMQStoreException("Failed to update address for '" + nodeName + "' with new host " + newHostName + " and new port " + newPort + ". " + ofe.getMessage(), (Throwable)ofe);
        }
        catch (DatabaseException e) {
            throw new AMQStoreException("Failed to update address for '" + nodeName + "' with new host " + newHostName + " and new port " + newPort + ". " + e.getMessage(), (Throwable)e);
        }
    }

    @Override
    protected StoreFuture commit(Transaction tx, boolean syncCommit) throws DatabaseException {
        try {
            tx.commit();
        }
        catch (DatabaseException de) {
            LOGGER.error((Object)"Got DatabaseException on commit, closing environment", (Throwable)de);
            this.closeEnvironmentSafely();
            throw de;
        }
        if (this._coalescingSync) {
            return this._commitThreadWrapper.commit(tx, syncCommit);
        }
        return StoreFuture.IMMEDIATE_FUTURE;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected void closeInternal() throws Exception {
        this.substituteNoOpStateChangeListenerOn(this.getReplicatedEnvironment());
        try {
            if (this._coalescingSync) {
                this._commitThreadWrapper.stopCommitThread();
            }
        }
        finally {
            super.closeInternal();
        }
    }

    private void substituteNoOpStateChangeListenerOn(ReplicatedEnvironment replicatedEnvironment) {
        LOGGER.debug((Object)"Substituting no-op state change listener for environment close");
        replicatedEnvironment.setStateChangeListener((StateChangeListener)new NoOpStateChangeListener());
    }

    private ReplicationGroupAdmin createReplicationGroupAdmin() {
        HashSet<InetSocketAddress> helpers = new HashSet<InetSocketAddress>();
        helpers.addAll(this.getReplicatedEnvironment().getRepConfig().getHelperSockets());
        ReplicationConfig repConfig = this.getReplicatedEnvironment().getRepConfig();
        helpers.add(InetSocketAddress.createUnresolved(repConfig.getNodeHostname(), repConfig.getNodePort()));
        return new ReplicationGroupAdmin(this._groupName, helpers);
    }

    private void setReplicationConfigProperties(ReplicationConfig replicationConfig) {
        for (Map.Entry<String, String> configItem : this._repConfig.entrySet()) {
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug((Object)("Setting ReplicationConfig key " + configItem.getKey() + " to '" + configItem.getValue() + "'"));
            }
            replicationConfig.setConfigParam(configItem.getKey(), configItem.getValue());
        }
    }

    private String getValidatedPropertyFromConfig(String key, Configuration config) throws ConfigurationException {
        if (!config.containsKey(key)) {
            throw new ConfigurationException("BDB HA configuration key not found. Please specify configuration key with XPath: " + key.replace('.', '/'));
        }
        return config.getString(key);
    }

    public String getStoreType() {
        return BDB_HA_STORE_TYPE;
    }

    private class NoOpStateChangeListener
    implements StateChangeListener {
        private NoOpStateChangeListener() {
        }

        public void stateChange(StateChangeEvent stateChangeEvent) throws RuntimeException {
        }
    }

    /*
     * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
     */
    private class BDBHAMessageStoreStateChangeListener
    implements StateChangeListener {
        private final Executor _executor = Executors.newSingleThreadExecutor();

        private BDBHAMessageStoreStateChangeListener() {
        }

        public void stateChange(StateChangeEvent stateChangeEvent) throws RuntimeException {
            ReplicatedEnvironment.State state = stateChangeEvent.getState();
            if (LOGGER.isInfoEnabled()) {
                LOGGER.info((Object)("Received BDB event indicating transition to state " + state));
            }
            switch (state) {
                case MASTER: {
                    this.activateStoreAsync();
                    break;
                }
                case REPLICA: {
                    this.passivateStoreAsync();
                    break;
                }
                case DETACHED: {
                    LOGGER.error((Object)"BDB replicated node in detached state, therefore passivating.");
                    this.passivateStoreAsync();
                    break;
                }
                case UNKNOWN: {
                    LOGGER.warn((Object)"BDB replicated node in unknown state (hopefully temporarily)");
                    break;
                }
                default: {
                    LOGGER.error((Object)("Unexpected state change: " + state));
                    throw new IllegalStateException("Unexpected state change: " + state);
                }
            }
        }

        private void activateStoreAsync() {
            String threadName = "BDBHANodeActivationThread-" + BDBHAMessageStore.this._name;
            this.executeStateChangeAsync(new Callable<Void>(){

                @Override
                public Void call() throws Exception {
                    try {
                        BDBHAMessageStore.this.activate();
                    }
                    catch (Exception e) {
                        LOGGER.error((Object)"Failed to activate on hearing MASTER change event", (Throwable)e);
                        throw e;
                    }
                    return null;
                }
            }, threadName);
        }

        private void passivateStoreAsync() {
            String threadName = "BDBHANodePassivationThread-" + BDBHAMessageStore.this._name;
            this.executeStateChangeAsync(new Callable<Void>(){

                @Override
                public Void call() throws Exception {
                    try {
                        BDBHAMessageStore.this.passivate();
                    }
                    catch (Exception e) {
                        LOGGER.error((Object)"Failed to passivate on hearing REPLICA or DETACHED change event", (Throwable)e);
                        throw e;
                    }
                    return null;
                }
            }, threadName);
        }

        private void executeStateChangeAsync(final Callable<Void> callable, final String threadName) {
            final RootMessageLogger _rootLogger = CurrentActor.get().getRootMessageLogger();
            this._executor.execute(new Runnable(){

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                public void run() {
                    String originalThreadName = Thread.currentThread().getName();
                    Thread.currentThread().setName(threadName);
                    try {
                        CurrentActor.set((LogActor)new AbstractActor(_rootLogger){

                            public String getLogMessage() {
                                return threadName;
                            }
                        });
                        try {
                            callable.call();
                        }
                        catch (Exception e) {
                            LOGGER.error((Object)"Exception during state change", (Throwable)e);
                        }
                    }
                    finally {
                        Thread.currentThread().setName(originalThreadName);
                    }
                }
            });
        }
    }
}

