/*
 * Decompiled with CFR 0.152.
 */
package org.opends.server.replication.plugin;

import java.io.IOException;
import java.net.ConnectException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.util.Collection;
import java.util.LinkedHashSet;
import java.util.TreeSet;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import org.opends.messages.Message;
import org.opends.messages.MessageBuilder;
import org.opends.messages.ReplicationMessages;
import org.opends.server.loggers.ErrorLogger;
import org.opends.server.loggers.debug.DebugLogger;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.protocols.asn1.ASN1OctetString;
import org.opends.server.protocols.internal.InternalClientConnection;
import org.opends.server.protocols.internal.InternalSearchListener;
import org.opends.server.protocols.internal.InternalSearchOperation;
import org.opends.server.protocols.ldap.LDAPFilter;
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.plugin.FakeOperation;
import org.opends.server.replication.plugin.FakeOperationComparator;
import org.opends.server.replication.plugin.HeartbeatMonitor;
import org.opends.server.replication.plugin.Historical;
import org.opends.server.replication.protocol.ProtocolSession;
import org.opends.server.replication.protocol.ProtocolVersion;
import org.opends.server.replication.protocol.ReplServerStartMessage;
import org.opends.server.replication.protocol.ReplicationMessage;
import org.opends.server.replication.protocol.ServerStartMessage;
import org.opends.server.replication.protocol.SocketSession;
import org.opends.server.replication.protocol.UpdateMessage;
import org.opends.server.replication.protocol.WindowMessage;
import org.opends.server.replication.protocol.WindowProbe;
import org.opends.server.types.DN;
import org.opends.server.types.DereferencePolicy;
import org.opends.server.types.ResultCode;
import org.opends.server.types.SearchResultEntry;
import org.opends.server.types.SearchResultReference;
import org.opends.server.types.SearchScope;
import org.opends.server.util.StaticUtils;

/*
 * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
 */
public class ReplicationBroker
implements InternalSearchListener {
    private static final DebugTracer TRACER = DebugLogger.getTracer();
    private boolean shutdown = false;
    private Collection<String> servers;
    private boolean connected = false;
    private String replicationServer = "Not connected";
    private TreeSet<FakeOperation> replayOperations;
    private ProtocolSession session = null;
    private final ServerState state;
    private final DN baseDn;
    private final short serverID;
    private int maxSendDelay;
    private int maxReceiveDelay;
    private int maxSendQueue;
    private int maxReceiveQueue;
    private Semaphore sendWindow;
    private int maxSendWindow;
    private int rcvWindow;
    private int halfRcvWindow;
    private int maxRcvWindow;
    private int timeout = 0;
    private short protocolVersion;
    private long heartbeatInterval = 0L;
    private HeartbeatMonitor heartbeatMonitor = null;
    private int numLostConnections = 0;
    private boolean connectionError = false;
    private Object connectPhaseLock = new Object();

    public ReplicationBroker(ServerState state, DN baseDn, short serverID, int maxReceiveQueue, int maxReceiveDelay, int maxSendQueue, int maxSendDelay, int window, long heartbeatInterval) {
        this.baseDn = baseDn;
        this.serverID = serverID;
        this.maxReceiveDelay = maxReceiveDelay;
        this.maxSendDelay = maxSendDelay;
        this.maxReceiveQueue = maxReceiveQueue;
        this.maxSendQueue = maxSendQueue;
        this.state = state;
        this.replayOperations = new TreeSet<FakeOperation>(new FakeOperationComparator());
        this.rcvWindow = window;
        this.maxRcvWindow = window;
        this.halfRcvWindow = window / 2;
        this.heartbeatInterval = heartbeatInterval;
        this.protocolVersion = ProtocolVersion.currentVersion();
    }

    public void start(Collection<String> servers) {
        this.shutdown = false;
        this.servers = servers;
        if (servers.size() < 1) {
            Message message = ReplicationMessages.NOTE_NEED_MORE_THAN_ONE_CHANGELOG_SERVER.get();
            ErrorLogger.logError(message);
        }
        this.rcvWindow = this.maxRcvWindow;
        this.connect();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void connect() {
        if (this.heartbeatMonitor != null) {
            this.heartbeatMonitor.shutdown();
            this.heartbeatMonitor = null;
        }
        boolean checkState = true;
        boolean receivedResponse = true;
        Object object = this.connectPhaseLock;
        synchronized (object) {
            Message message;
            while (!this.connected && !this.shutdown && receivedResponse) {
                receivedResponse = false;
                for (String server : this.servers) {
                    Message message2;
                    int separator = server.lastIndexOf(58);
                    String port = server.substring(separator + 1);
                    String hostname = server.substring(0, separator);
                    try {
                        ChangeNumber ourMaxChangeNumber;
                        InetSocketAddress ServerAddr = new InetSocketAddress(InetAddress.getByName(hostname), Integer.parseInt(port));
                        Socket socket = new Socket();
                        socket.setReceiveBufferSize(1000000);
                        socket.setTcpNoDelay(true);
                        socket.connect(ServerAddr, 500);
                        this.session = new SocketSession(socket);
                        ServerStartMessage msg = new ServerStartMessage(this.serverID, this.baseDn, this.maxReceiveDelay, this.maxReceiveQueue, this.maxSendDelay, this.maxSendQueue, this.halfRcvWindow * 2, this.heartbeatInterval, this.state, this.protocolVersion);
                        this.session.publish(msg);
                        this.session.setSoTimeout(1000);
                        ReplServerStartMessage startMsg = (ReplServerStartMessage)this.session.receive();
                        receivedResponse = true;
                        this.protocolVersion = ProtocolVersion.minWithCurrent(startMsg.getVersion());
                        this.session.setSoTimeout(this.timeout);
                        ChangeNumber replServerMaxChangeNumber = startMsg.getServerState().getMaxChangeNumber(this.serverID);
                        if (replServerMaxChangeNumber == null) {
                            replServerMaxChangeNumber = new ChangeNumber(0L, 0, this.serverID);
                        }
                        if ((ourMaxChangeNumber = this.state.getMaxChangeNumber(this.serverID)) == null || ourMaxChangeNumber.olderOrEqual(replServerMaxChangeNumber).booleanValue()) {
                            this.replicationServer = ServerAddr.toString();
                            this.maxSendWindow = startMsg.getWindowSize();
                            this.connected = true;
                            this.startHeartBeat();
                            break;
                        }
                        if (checkState) {
                            Message message3 = ReplicationMessages.NOTE_CHANGELOG_MISSING_CHANGES.get(server);
                            ErrorLogger.logError(message3);
                            continue;
                        }
                        this.replayOperations.clear();
                        ErrorLogger.logError(Message.raw("going to search for changes", new Object[0]));
                        InternalClientConnection conn = InternalClientConnection.getRootConnection();
                        LDAPFilter filter = LDAPFilter.decode("(ds-sync-hist>=dummy:" + replServerMaxChangeNumber + ")");
                        LinkedHashSet<String> attrs = new LinkedHashSet<String>(1);
                        attrs.add("ds-sync-hist");
                        InternalSearchOperation op = conn.processSearch(new ASN1OctetString(this.baseDn.toString()), SearchScope.WHOLE_SUBTREE, DereferencePolicy.NEVER_DEREF_ALIASES, 0, 0, false, filter, attrs, (InternalSearchListener)this);
                        if (op.getResultCode() != ResultCode.SUCCESS) {
                            Message message4 = ReplicationMessages.ERR_CANNOT_RECOVER_CHANGES.get(this.baseDn.toNormalizedString());
                            ErrorLogger.logError(message4);
                            continue;
                        }
                        this.replicationServer = ServerAddr.toString();
                        this.maxSendWindow = startMsg.getWindowSize();
                        this.connected = true;
                        for (FakeOperation replayOp : this.replayOperations) {
                            ErrorLogger.logError(Message.raw("sendingChange", new Object[0]));
                            this.session.publish(replayOp.generateMessage());
                        }
                        this.startHeartBeat();
                        ErrorLogger.logError(Message.raw("changes sent", new Object[0]));
                        break;
                    }
                    catch (ConnectException e) {
                        if (this.connectionError) continue;
                        message2 = ReplicationMessages.NOTE_NO_CHANGELOG_SERVER_LISTENING.get(server);
                        ErrorLogger.logError(message2);
                    }
                    catch (Exception e) {
                        message2 = ReplicationMessages.NOTE_EXCEPTION_STARTING_SESSION.get();
                        ErrorLogger.logError(message2);
                    }
                    finally {
                        if (this.connected || this.session == null) continue;
                        try {
                            this.session.close();
                        }
                        catch (IOException e) {}
                        this.session = null;
                    }
                }
                if (this.connected || !checkState || !receivedResponse) continue;
                message = ReplicationMessages.NOTE_COULD_NOT_FIND_CHANGELOG_WITH_MY_CHANGES.get();
                ErrorLogger.logError(message);
                checkState = false;
            }
            if (this.connected) {
                this.connectionError = false;
                if (this.sendWindow != null) {
                    this.sendWindow.release(Integer.MAX_VALUE);
                }
                this.sendWindow = new Semaphore(this.maxSendWindow);
                this.connectPhaseLock.notify();
                message = ReplicationMessages.NOTE_NOW_FOUND_CHANGELOG.get(this.replicationServer, this.baseDn.toString());
                ErrorLogger.logError(message);
            } else if (!this.connectionError) {
                checkState = false;
                this.connectionError = true;
                this.connectPhaseLock.notify();
                message = ReplicationMessages.NOTE_COULD_NOT_FIND_CHANGELOG.get(this.baseDn.toString());
                ErrorLogger.logError(message);
            }
        }
    }

    private void startHeartBeat() {
        if (this.heartbeatInterval > 0L) {
            this.heartbeatMonitor = new HeartbeatMonitor("Replication Heartbeat Monitor", this.session, this.heartbeatInterval);
            this.heartbeatMonitor.start();
        }
    }

    private void reStart() {
        this.reStart(null);
    }

    private void reStart(ProtocolSession failingSession) {
        try {
            if (failingSession != null) {
                failingSession.close();
                ++this.numLostConnections;
            }
        }
        catch (IOException e1) {
            // empty catch block
        }
        if (failingSession == this.session) {
            this.connected = false;
        }
        while (!this.connected && !this.shutdown) {
            try {
                this.connect();
            }
            catch (Exception e) {
                MessageBuilder mb = new MessageBuilder();
                mb.append(ReplicationMessages.NOTE_EXCEPTION_STARTING_SESSION.get());
                mb.append(StaticUtils.stackTraceToSingleLineString(e));
                ErrorLogger.logError(mb.toMessage());
            }
            if (this.connected || this.shutdown) continue;
            try {
                Thread.sleep(500L);
            }
            catch (InterruptedException interruptedException) {}
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void publish(ReplicationMessage msg) {
        boolean done = false;
        while (!done) {
            if (this.connectionError) {
                return;
            }
            try {
                Semaphore currentWindowSemaphore;
                ProtocolSession current_session;
                Object object = this.connectPhaseLock;
                synchronized (object) {
                    current_session = this.session;
                    currentWindowSemaphore = this.sendWindow;
                }
                boolean credit = msg instanceof UpdateMessage ? currentWindowSemaphore.tryAcquire(500L, TimeUnit.MILLISECONDS) : true;
                if (credit) {
                    object = this.connectPhaseLock;
                    synchronized (object) {
                        if (this.session == current_session) {
                            this.session.publish(msg);
                            done = true;
                        }
                    }
                }
                if (credit) continue;
                this.session.publish(new WindowProbe());
            }
            catch (IOException e) {
                Object object = this.connectPhaseLock;
                synchronized (object) {
                    try {
                        this.connectPhaseLock.wait(100L);
                    }
                    catch (InterruptedException e1) {
                        // empty catch block
                    }
                }
            }
            catch (InterruptedException interruptedException) {
            }
        }
    }

    public ReplicationMessage receive() throws SocketTimeoutException {
        while (!this.shutdown) {
            if (!this.connected) {
                this.reStart();
            }
            ProtocolSession failingSession = this.session;
            try {
                ReplicationMessage msg = this.session.receive();
                if (msg instanceof WindowMessage) {
                    WindowMessage windowMsg = (WindowMessage)msg;
                    this.sendWindow.release(windowMsg.getNumAck());
                    continue;
                }
                if (msg instanceof UpdateMessage) {
                    --this.rcvWindow;
                    if (this.rcvWindow < this.halfRcvWindow) {
                        this.session.publish(new WindowMessage(this.halfRcvWindow));
                        this.rcvWindow += this.halfRcvWindow;
                    }
                }
                return msg;
            }
            catch (SocketTimeoutException e) {
                throw e;
            }
            catch (Exception e) {
                if (this.shutdown) continue;
                Message message = ReplicationMessages.NOTE_DISCONNECTED_FROM_CHANGELOG.get(this.replicationServer);
                ErrorLogger.logError(message);
                this.reStart(failingSession);
            }
        }
        return null;
    }

    public void stop() {
        this.replicationServer = "stopped";
        this.shutdown = true;
        this.connected = false;
        try {
            if (DebugLogger.debugEnabled()) {
                TRACER.debugInfo("ReplicationBroker Stop Closing session");
            }
            if (this.session != null) {
                this.session.close();
            }
        }
        catch (IOException iOException) {
            // empty catch block
        }
    }

    public void setSoTimeout(int timeout) throws SocketException {
        this.timeout = timeout;
        if (this.session != null) {
            this.session.setSoTimeout(timeout);
        }
    }

    public String getReplicationServer() {
        return this.replicationServer;
    }

    @Override
    public void handleInternalSearchEntry(InternalSearchOperation searchOperation, SearchResultEntry searchEntry) {
        Iterable<FakeOperation> updates = Historical.generateFakeOperations(searchEntry);
        for (FakeOperation op : updates) {
            this.replayOperations.add(op);
        }
    }

    @Override
    public void handleInternalSearchReference(InternalSearchOperation searchOperation, SearchResultReference searchReference) {
    }

    public int getMaxRcvWindow() {
        return this.maxRcvWindow;
    }

    public int getCurrentRcvWindow() {
        return this.rcvWindow;
    }

    public int getMaxSendWindow() {
        return this.maxSendWindow;
    }

    public int getCurrentSendWindow() {
        if (this.connected) {
            return this.sendWindow.availablePermits();
        }
        return 0;
    }

    public int getNumLostConnections() {
        return this.numLostConnections;
    }

    public void changeConfig(Collection<String> replicationServers, int maxReceiveQueue, int maxReceiveDelay, int maxSendQueue, int maxSendDelay, int window, long heartbeatInterval) {
        this.servers = replicationServers;
        this.maxRcvWindow = window;
        this.heartbeatInterval = heartbeatInterval;
        this.maxReceiveDelay = maxReceiveDelay;
        this.maxReceiveQueue = maxReceiveQueue;
        this.maxSendDelay = maxSendDelay;
        this.maxSendQueue = maxSendQueue;
    }

    public short getProtocolVersion() {
        return this.protocolVersion;
    }

    public boolean isConnected() {
        return !this.connectionError;
    }
}

