/*
 * Decompiled with CFR 0.152.
 */
package com.orientechnologies.orient.server;

import com.orientechnologies.common.log.OLogManager;
import com.orientechnologies.common.thread.OThreadPoolExecutors;
import com.orientechnologies.orient.client.remote.message.OBinaryPushRequest;
import com.orientechnologies.orient.client.remote.message.OBinaryPushResponse;
import com.orientechnologies.orient.client.remote.message.OPushDistributedConfigurationRequest;
import com.orientechnologies.orient.client.remote.message.OPushFunctionsRequest;
import com.orientechnologies.orient.client.remote.message.OPushIndexManagerRequest;
import com.orientechnologies.orient.client.remote.message.OPushSchemaRequest;
import com.orientechnologies.orient.client.remote.message.OPushSequencesRequest;
import com.orientechnologies.orient.client.remote.message.OPushStorageConfigurationRequest;
import com.orientechnologies.orient.core.config.OContextConfiguration;
import com.orientechnologies.orient.core.config.OGlobalConfiguration;
import com.orientechnologies.orient.core.config.OStorageConfiguration;
import com.orientechnologies.orient.core.db.ODatabaseDocumentInternal;
import com.orientechnologies.orient.core.db.OMetadataUpdateListener;
import com.orientechnologies.orient.core.index.OIndexManagerAbstract;
import com.orientechnologies.orient.core.index.OIndexManagerShared;
import com.orientechnologies.orient.core.metadata.schema.OSchemaShared;
import com.orientechnologies.orient.server.OClientConnection;
import com.orientechnologies.orient.server.OClientConnectionManager;
import com.orientechnologies.orient.server.OPushEventType;
import com.orientechnologies.orient.server.OPushInfo;
import com.orientechnologies.orient.server.network.protocol.binary.ONetworkProtocolBinary;
import java.io.IOException;
import java.lang.ref.WeakReference;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;

public class OPushManager
implements OMetadataUpdateListener {
    protected final Set<OPushInfo> distributedConfigPush = Collections.newSetFromMap(new ConcurrentHashMap());
    protected final OPushEventType storageConfigurations = new OPushEventType();
    protected final OPushEventType schema = new OPushEventType();
    protected final OPushEventType indexManager = new OPushEventType();
    protected final OPushEventType functions = new OPushEventType();
    protected final OPushEventType sequences = new OPushEventType();
    private final Set<String> registerDatabase = Collections.newSetFromMap(new ConcurrentHashMap());
    private final ExecutorService executor;
    private final OClientConnectionManager sessions;

    public OPushManager(OContextConfiguration contextConfiguration, OClientConnectionManager sessions) {
        int timeout = contextConfiguration.getValueAsInteger(OGlobalConfiguration.NETWORK_REQUEST_TIMEOUT);
        this.executor = OThreadPoolExecutors.newScalingThreadPool((String)"Push Requests", (int)0, (int)5, (int)2000, (long)timeout, (TimeUnit)TimeUnit.MILLISECONDS);
        this.sessions = sessions;
    }

    public synchronized void pushDistributedConfig(String database, List<String> hosts) {
        Iterator<OPushInfo> iter = this.distributedConfigPush.iterator();
        while (iter.hasNext()) {
            OPushInfo ref = iter.next();
            ONetworkProtocolBinary protocolBinary = (ONetworkProtocolBinary)((Object)ref.protocol().get());
            if (protocolBinary != null) {
                OPushDistributedConfigurationRequest request = new OPushDistributedConfigurationRequest(hosts);
                try {
                    OBinaryPushResponse oBinaryPushResponse = protocolBinary.push((OBinaryPushRequest)request);
                }
                catch (IOException e) {
                    this.close(ref);
                    iter.remove();
                }
                continue;
            }
            this.close(ref);
            iter.remove();
        }
    }

    public synchronized void subscribeDistributeConfig(ONetworkProtocolBinary channel, OClientConnection session) {
        this.distributedConfigPush.add(new OPushInfo(session, new WeakReference<ONetworkProtocolBinary>(channel)));
    }

    public synchronized void cleanPushSockets() {
        Iterator<OPushInfo> iter = this.distributedConfigPush.iterator();
        while (iter.hasNext()) {
            OPushInfo ref = iter.next();
            if (ref.protocol().get() != null) continue;
            this.close(ref);
            iter.remove();
        }
        this.storageConfigurations.cleanListeners(this);
        this.schema.cleanListeners(this);
        this.indexManager.cleanListeners(this);
        this.functions.cleanListeners(this);
        this.sequences.cleanListeners(this);
    }

    public void shutdown() {
        this.executor.shutdownNow();
    }

    private void genericSubscribe(OPushEventType context, ODatabaseDocumentInternal database, ONetworkProtocolBinary protocol, OClientConnection session) {
        if (!this.registerDatabase.contains(database.getName())) {
            database.getSharedContext().registerListener((Object)this);
            this.registerDatabase.add(database.getName());
        }
        context.subscribe(database.getName(), new OPushInfo(session, new WeakReference<ONetworkProtocolBinary>(protocol)));
    }

    public synchronized void subscribeStorageConfiguration(ODatabaseDocumentInternal database, ONetworkProtocolBinary protocol, OClientConnection session) {
        this.genericSubscribe(this.storageConfigurations, database, protocol, session);
    }

    public synchronized void subscribeSchema(ODatabaseDocumentInternal database, ONetworkProtocolBinary protocol, OClientConnection session) {
        this.genericSubscribe(this.schema, database, protocol, session);
    }

    public synchronized void subscribeIndexManager(ODatabaseDocumentInternal database, ONetworkProtocolBinary protocol, OClientConnection session) {
        this.genericSubscribe(this.indexManager, database, protocol, session);
    }

    public synchronized void subscribeFunctions(ODatabaseDocumentInternal database, ONetworkProtocolBinary protocol, OClientConnection session) {
        this.genericSubscribe(this.functions, database, protocol, session);
    }

    public synchronized void subscribeSequences(ODatabaseDocumentInternal database, ONetworkProtocolBinary protocol, OClientConnection session) {
        this.genericSubscribe(this.sequences, database, protocol, session);
    }

    public void onSchemaUpdate(String database, OSchemaShared schema) {
        OPushSchemaRequest request = new OPushSchemaRequest(schema.toNetworkStream());
        this.schema.send(database, (OBinaryPushRequest<?>)request, this);
    }

    public void onIndexManagerUpdate(String database, OIndexManagerAbstract indexManager) {
        OPushIndexManagerRequest request = new OPushIndexManagerRequest(((OIndexManagerShared)indexManager).toNetworkStream());
        this.indexManager.send(database, (OBinaryPushRequest<?>)request, this);
    }

    public void onFunctionLibraryUpdate(String database) {
        OPushFunctionsRequest request = new OPushFunctionsRequest();
        this.functions.send(database, (OBinaryPushRequest<?>)request, this);
    }

    public void onSequenceLibraryUpdate(String database) {
        OPushSequencesRequest request = new OPushSequencesRequest();
        this.sequences.send(database, (OBinaryPushRequest<?>)request, this);
    }

    public void onStorageConfigurationUpdate(String database, OStorageConfiguration update) {
        OPushStorageConfigurationRequest request = new OPushStorageConfigurationRequest(update);
        this.storageConfigurations.send(database, (OBinaryPushRequest<?>)request, this);
    }

    public void close(OPushInfo info) {
        ONetworkProtocolBinary protocol = (ONetworkProtocolBinary)((Object)info.protocol().get());
        if (protocol != null) {
            protocol.shutdown();
        }
        this.sessions.disconnect(info.connection());
    }

    public void genericNotify(Map<String, Set<OPushInfo>> context, String database, OPushEventType pack) {
        try {
            this.executor.submit(() -> {
                Set clients = (Set)context.get(database);
                if (clients != null) {
                    Iterator iter = clients.iterator();
                    while (iter.hasNext()) {
                        OPushInfo ref = (OPushInfo)iter.next();
                        ONetworkProtocolBinary protocolBinary = (ONetworkProtocolBinary)((Object)((Object)ref.protocol().get()));
                        if (protocolBinary != null) {
                            try {
                                OBinaryPushRequest<?> request = pack.getRequest(database);
                                if (request == null) continue;
                                OBinaryPushResponse oBinaryPushResponse = protocolBinary.push(request);
                            }
                            catch (IOException e) {
                                this.close(ref);
                                iter.remove();
                            }
                            continue;
                        }
                        this.close(ref);
                        iter.remove();
                    }
                }
            });
        }
        catch (RejectedExecutionException e) {
            OLogManager.instance().info((Object)this, "Cannot send push request to client for database '%s'", new Object[]{database});
        }
    }
}

