/*
 * Decompiled with CFR 0.152.
 */
package com.orientechnologies.orient.server.distributed.impl;

import com.hazelcast.core.HazelcastInstanceNotActiveException;
import com.hazelcast.spi.exception.DistributedObjectDestroyedException;
import com.orientechnologies.common.concur.OTimeoutException;
import com.orientechnologies.common.concur.lock.OModificationOperationProhibitedException;
import com.orientechnologies.common.exception.OException;
import com.orientechnologies.common.log.OLogManager;
import com.orientechnologies.orient.core.config.OGlobalConfiguration;
import com.orientechnologies.orient.core.db.ODatabaseDocumentInternal;
import com.orientechnologies.orient.core.exception.OConfigurationException;
import com.orientechnologies.orient.core.exception.OStorageException;
import com.orientechnologies.orient.core.metadata.security.OSecurityUser;
import com.orientechnologies.orient.core.metadata.security.OUser;
import com.orientechnologies.orient.server.distributed.ODistributedException;
import com.orientechnologies.orient.server.distributed.ODistributedRequest;
import com.orientechnologies.orient.server.distributed.ODistributedRequestId;
import com.orientechnologies.orient.server.distributed.ODistributedResponse;
import com.orientechnologies.orient.server.distributed.ODistributedServerLog;
import com.orientechnologies.orient.server.distributed.ODistributedServerManager;
import com.orientechnologies.orient.server.distributed.ORemoteServerController;
import com.orientechnologies.orient.server.distributed.impl.ODistributedDatabaseImpl;
import com.orientechnologies.orient.server.distributed.impl.ODistributedMessageServiceImpl;
import com.orientechnologies.orient.server.distributed.task.ODistributedOperationException;
import com.orientechnologies.orient.server.distributed.task.ORemoteTask;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;

public class ODistributedWorker
extends Thread {
    protected final ODistributedDatabaseImpl distributed;
    protected final ODistributedServerManager manager;
    protected final ODistributedMessageServiceImpl msgService;
    protected final String localNodeName;
    protected final String databaseName;
    protected final ArrayBlockingQueue<ODistributedRequest> localQueue;
    protected final int id;
    private final boolean acceptsWhileNotOnline;
    protected volatile ODatabaseDocumentInternal database;
    protected volatile OUser lastUser;
    protected volatile boolean running = true;
    private AtomicLong processedRequests = new AtomicLong(0L);
    private AtomicBoolean waitingForNextRequest = new AtomicBoolean(true);
    private static final long MAX_SHUTDOWN_TIMEOUT = 5000L;
    private volatile ODistributedRequest currentExecuting;

    public ODistributedWorker(ODistributedDatabaseImpl iDistributed, String iDatabaseName, int i, boolean acceptsWhileNotOnline) {
        this.id = i;
        this.setName("OrientDB DistributedWorker node=" + iDistributed.getLocalNodeName() + " db=" + iDatabaseName + " id=" + i);
        this.distributed = iDistributed;
        this.localQueue = new ArrayBlockingQueue(OGlobalConfiguration.DISTRIBUTED_LOCAL_QUEUESIZE.getValueAsInteger());
        this.databaseName = iDatabaseName;
        this.manager = this.distributed.getManager();
        this.msgService = this.distributed.msgService;
        this.localNodeName = this.manager.getLocalNodeName();
        this.acceptsWhileNotOnline = acceptsWhileNotOnline;
    }

    public boolean processRequest(ODistributedRequest request) {
        if (!this.acceptsWhileNotOnline && this.manager.isOffline()) {
            ODistributedServerLog.debug((Object)this, this.manager.getLocalNodeName(), null, ODistributedServerLog.DIRECTION.NONE, "Discard request '%s' for database '%s' because the server is not online", request, this.databaseName);
            return false;
        }
        if (!this.localQueue.offer(request)) {
            ODistributedServerLog.info((Object)this, this.manager.getLocalNodeName(), null, ODistributedServerLog.DIRECTION.NONE, "Local queue for database '%s' is full, cannot process further requests", this.databaseName);
            try {
                this.localQueue.put(request);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                return false;
            }
        }
        return true;
    }

    @Override
    public void run() {
        long processedMessages = 0L;
        while (this.running) {
            ODistributedRequestId reqId = null;
            ODistributedRequest message = null;
            try {
                this.currentExecuting = message = this.readRequest();
                if (message != null) {
                    this.manager.messageProcessStart(message);
                    message.getId();
                    reqId = message.getId();
                    this.onMessage(message);
                }
                this.currentExecuting = null;
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                break;
            }
            catch (DistributedObjectDestroyedException e) {
                Thread.currentThread().interrupt();
                break;
            }
            catch (HazelcastInstanceNotActiveException e) {
                Thread.currentThread().interrupt();
                break;
            }
            catch (Exception e) {
                try {
                    if (e.getCause() instanceof InterruptedException) {
                        Thread.currentThread().interrupt();
                    } else {
                        ODistributedServerLog.error((Object)this, this.localNodeName, reqId != null ? this.manager.getNodeNameById(reqId.getNodeId()) : "?", ODistributedServerLog.DIRECTION.IN, "Error on executing distributed request %s: (%s) worker=%d", e, message != null ? message.getId() : Integer.valueOf(-1), message != null ? message.getTask() : "-", this.id);
                    }
                }
                catch (Exception t) {
                    ODistributedServerLog.error((Object)this, this.localNodeName, "?", ODistributedServerLog.DIRECTION.IN, "Error on executing distributed request %s: (%s) worker=%d", e, message != null ? message.getId() : Integer.valueOf(-1), message != null ? message.getTask() : "-", this.id);
                }
            }
            ++processedMessages;
        }
        ODistributedServerLog.debug((Object)this, this.localNodeName, null, ODistributedServerLog.DIRECTION.NONE, "End of reading requests for database %s", this.databaseName);
    }

    public void initDatabaseInstance() {
        if (this.database == null) {
            for (int retry = 0; retry < 100; ++retry) {
                try {
                    this.database = this.distributed.getDatabaseInstance();
                    break;
                }
                catch (OStorageException e) {
                    if (this.dbNotAvailable(retry)) continue;
                    return;
                }
                catch (OConfigurationException e) {
                    if (this.dbNotAvailable(retry)) continue;
                    return;
                }
            }
            if (this.database == null) {
                ODistributedServerLog.info((Object)this, this.manager.getLocalNodeName(), null, ODistributedServerLog.DIRECTION.NONE, "Database '%s' not present, shutting down database manager", this.databaseName);
                this.distributed.shutdown();
                throw new ODistributedException("Cannot open database '" + this.databaseName + "'");
            }
        } else if (this.database.isClosed()) {
            this.database.activateOnCurrentThread();
            this.database.close();
            this.database = this.distributed.getDatabaseInstance();
        }
    }

    protected boolean dbNotAvailable(int retry) {
        try {
            ODistributedServerLog.info((Object)this, this.manager.getLocalNodeName(), null, ODistributedServerLog.DIRECTION.NONE, "Database '%s' not present, waiting for it (retry=%d/%d)...", this.databaseName, retry, 100);
            Thread.sleep(300L);
        }
        catch (InterruptedException e1) {
            Thread.currentThread().interrupt();
            return false;
        }
        return true;
    }

    public void shutdown() {
        this.running = false;
        int pendingMsgs = this.localQueue.size();
        if (pendingMsgs > 0) {
            ODistributedServerLog.info((Object)this, this.localNodeName, null, ODistributedServerLog.DIRECTION.NONE, "Received shutdown signal, waiting for distributed worker queue is empty (pending msgs=%d)...", pendingMsgs);
        }
        this.interrupt();
        try {
            if (pendingMsgs > 0) {
                try {
                    this.join(5000L);
                }
                catch (Exception e) {
                    ODistributedServerLog.debug((Object)this, this.localNodeName, null, ODistributedServerLog.DIRECTION.NONE, "Interrupted shutdown of distributed worker thread", new Object[0]);
                }
            }
            ODistributedServerLog.debug((Object)this, this.localNodeName, null, ODistributedServerLog.DIRECTION.NONE, "Shutdown distributed worker '%s' completed", this.getName());
            this.localQueue.clear();
            if (this.database != null) {
                this.database.activateOnCurrentThread();
                this.database.close();
            }
        }
        catch (Exception e) {
            ODistributedServerLog.warn((Object)this, this.localNodeName, null, ODistributedServerLog.DIRECTION.NONE, "Error on shutting down distributed worker '%s'", e, this.getName());
        }
    }

    public ODatabaseDocumentInternal getDatabase() {
        return this.database;
    }

    protected ODistributedRequest readRequest() throws InterruptedException {
        ODistributedRequest req = this.nextMessage();
        if (req == null) {
            return null;
        }
        if (this.manager.isOffline()) {
            this.waitNodeIsOnline();
        }
        if (ODistributedServerLog.isDebugEnabled()) {
            String senderNodeName = this.manager.getNodeNameById(req.getId().getNodeId());
            ODistributedServerLog.debug((Object)this, this.localNodeName, senderNodeName, ODistributedServerLog.DIRECTION.IN, "Processing request=(%s) sourceNode=%s worker=%d", req, senderNodeName, this.id);
        }
        return req;
    }

    public boolean isWaitingForNextRequest() {
        return this.waitingForNextRequest.get();
    }

    protected ODistributedRequest nextMessage() throws InterruptedException {
        this.waitingForNextRequest.set(true);
        ODistributedRequest req = this.localQueue.poll(1000L, TimeUnit.MILLISECONDS);
        this.waitingForNextRequest.set(false);
        this.processedRequests.incrementAndGet();
        return req;
    }

    protected void onMessage(ODistributedRequest iRequest) {
        String senderNodeName = null;
        for (int retry = 0; retry < 10 && (senderNodeName = this.manager.getNodeNameById(iRequest.getId().getNodeId())) == null; ++retry) {
            try {
                Thread.sleep(200L);
                continue;
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw OException.wrapException(new ODistributedException("Execution has been interrupted"), e);
            }
        }
        if (senderNodeName == null) {
            ODistributedServerLog.warn((Object)this, this.localNodeName, senderNodeName, ODistributedServerLog.DIRECTION.IN, "Sender server id %d is not registered in the cluster configuration, discard the request: (%s) (worker=%d)", iRequest.getId().getNodeId(), iRequest, this.id);
            this.sendResponseBack(iRequest, new ODistributedException("Sender server id " + iRequest.getId().getNodeId() + " is not registered in the cluster configuration, discard the request"));
            return;
        }
        ORemoteTask task = iRequest.getTask();
        if (ODistributedServerLog.isDebugEnabled()) {
            ODistributedServerLog.debug((Object)this, this.localNodeName, senderNodeName, ODistributedServerLog.DIRECTION.IN, "Received request: (%s) (worker=%d)", iRequest, this.id);
        }
        Object responsePayload = null;
        OSecurityUser origin = null;
        try {
            this.waitNodeIsOnline();
            this.distributed.waitIsReady(task);
            if (task.isUsingDatabase()) {
                this.initDatabaseInstance();
                if (this.database == null) {
                    throw new ODistributedOperationException("Error on executing remote request because the database '" + this.databaseName + "' is not available");
                }
            }
            if (this.database != null) {
                this.database.activateOnCurrentThread();
                origin = this.database.getUser();
                try {
                    if (iRequest.getUserRID() != null && iRequest.getUserRID().isValid() && (this.lastUser == null || !this.lastUser.getIdentity().equals(iRequest.getUserRID()))) {
                        this.lastUser = this.database.getMetadata().getSecurity().getUser(iRequest.getUserRID());
                        this.database.setUser(this.lastUser);
                    } else {
                        origin = null;
                    }
                }
                catch (Exception ex) {
                    OLogManager.instance().error(this, "Failed on user switching database. ", ex, new Object[0]);
                }
            }
            int retry = 1;
            while (this.running) {
                responsePayload = this.manager.executeOnLocalNode(iRequest.getId(), iRequest.getTask(), this.database);
                if (!(responsePayload instanceof OModificationOperationProhibitedException)) {
                    if (retry > 1) {
                        ODistributedServerLog.info((Object)this, this.localNodeName, senderNodeName, ODistributedServerLog.DIRECTION.IN, "Request %s succeed after retry=%d", iRequest, retry);
                    }
                    break;
                }
                try {
                    ODistributedServerLog.info((Object)this, this.localNodeName, senderNodeName, ODistributedServerLog.DIRECTION.IN, "Database is frozen, waiting and retrying. Request %s (retry=%d, worker=%d)", iRequest, retry, this.id);
                    Thread.sleep(1000L);
                }
                catch (InterruptedException e) {}
                ++retry;
            }
        }
        catch (RuntimeException e) {
            if (task.hasResponse()) {
                this.sendResponseBack(iRequest, e);
            }
            throw e;
        }
        finally {
            if (this.database != null && !this.database.isClosed()) {
                this.database.activateOnCurrentThread();
                if (!this.database.isClosed()) {
                    this.database.rollback();
                    this.database.getLocalCache().clear();
                    if (origin != null) {
                        this.database.setUser(origin);
                    }
                }
            }
        }
        if (task.hasResponse() && !this.sendResponseBack(iRequest, responsePayload)) {
            this.handleError(iRequest, responsePayload);
        }
        this.manager.messageProcessEnd(iRequest, responsePayload);
    }

    protected void handleError(ODistributedRequest iRequest, Object responsePayload) {
    }

    protected String getLocalNodeName() {
        return this.localNodeName;
    }

    private boolean sendResponseBack(ODistributedRequest iRequest, Object responsePayload) {
        return ODistributedWorker.sendResponseBack(this, this.manager, iRequest, responsePayload);
    }

    static boolean sendResponseBack(Object current, ODistributedServerManager manager, ODistributedRequest iRequest, Object responsePayload) {
        if (iRequest.getId().getMessageId() < 0L) {
            return true;
        }
        String localNodeName = manager.getLocalNodeName();
        String senderNodeName = manager.getNodeNameById(iRequest.getId().getNodeId());
        ODistributedResponse response = new ODistributedResponse(null, iRequest.getId(), localNodeName, senderNodeName, responsePayload);
        try {
            ORemoteServerController remoteSenderServer = manager.getRemoteServer(senderNodeName);
            ODistributedServerLog.debug(current, localNodeName, senderNodeName, ODistributedServerLog.DIRECTION.OUT, "Sending response %s back (reqId=%s)", response, iRequest);
            remoteSenderServer.sendResponse(response);
        }
        catch (Exception e) {
            ODistributedServerLog.debug(current, localNodeName, senderNodeName, ODistributedServerLog.DIRECTION.OUT, "Error on sending response '%s' back (reqId=%s err=%s)", response, iRequest.getId(), e.toString());
            return false;
        }
        return true;
    }

    private void waitNodeIsOnline() throws OTimeoutException {
        ODistributedServerManager mgr = this.manager.getServerInstance().getDistributedManager();
        if (mgr != null && mgr.isEnabled() && mgr.isOffline()) {
            int retry = 0;
            while (this.running) {
                if (mgr != null && mgr.isOffline()) {
                    ODistributedServerLog.info((Object)this, this.localNodeName, null, ODistributedServerLog.DIRECTION.NONE, "Node is not online yet (status=%s), blocking the command until it is online (retry=%d, queue=%d worker=%d)", new Object[]{mgr.getNodeStatus(), retry + 1, this.localQueue.size(), this.id});
                    if (this.localQueue.size() >= this.manager.getServerInstance().getContextConfiguration().getValueAsInteger(OGlobalConfiguration.DISTRIBUTED_LOCAL_QUEUESIZE)) {
                        ODistributedServerLog.warn((Object)this, this.localNodeName, null, ODistributedServerLog.DIRECTION.NONE, "Replication queue is full (retry=%d, queue=%d worker=%d), replication could be delayed", retry + 1, this.localQueue.size(), this.id);
                    }
                    try {
                        Thread.sleep(2000L);
                    }
                    catch (InterruptedException interruptedException) {}
                } else {
                    return;
                }
                ++retry;
            }
        }
    }

    public long getProcessedRequests() {
        return this.processedRequests.get();
    }

    public void reset() {
        this.localQueue.clear();
        if (this.database != null) {
            this.database.activateOnCurrentThread();
            this.database.close();
            this.database = null;
        }
    }

    public void sendShutdown() {
        this.running = false;
    }

    public ODistributedRequest getProcessing() {
        return this.currentExecuting;
    }
}

