/*
 * Decompiled with CFR 0.152.
 */
package org.apache.heron.instance;

import java.io.IOException;
import java.time.Duration;
import java.util.List;
import java.util.logging.Logger;
import org.apache.heron.common.basics.Communicator;
import org.apache.heron.common.basics.NIOLooper;
import org.apache.heron.common.basics.SingletonRegistry;
import org.apache.heron.common.config.SystemConfig;
import org.apache.heron.common.network.HeronSocketOptions;
import org.apache.heron.common.utils.logging.ErrorReportLoggingHandler;
import org.apache.heron.common.utils.metrics.JVMMetrics;
import org.apache.heron.common.utils.metrics.MetricsCollector;
import org.apache.heron.instance.InstanceControlMsg;
import org.apache.heron.metrics.GatewayMetrics;
import org.apache.heron.network.MetricsManagerClient;
import org.apache.heron.network.StreamManagerClient;
import org.apache.heron.proto.system.Metrics;
import org.apache.heron.proto.system.PhysicalPlans;
import org.apache.heron.shaded.com.google.protobuf.Message;

public class Gateway
implements Runnable,
AutoCloseable {
    private static final Logger LOG = Logger.getLogger(Gateway.class.getName());
    private static final String STREAM_MGR_HOST = "127.0.0.1";
    private static final String METRICS_MGR_HOST = "127.0.0.1";
    private final MetricsManagerClient metricsManagerClient;
    private final StreamManagerClient streamManagerClient;
    private final NIOLooper gatewayLooper;
    private final MetricsCollector gatewayMetricsCollector;
    private final JVMMetrics jvmMetrics;
    private final GatewayMetrics gatewayMetrics;
    private final SystemConfig systemConfig = (SystemConfig)SingletonRegistry.INSTANCE.getSingleton(SystemConfig.HERON_SYSTEM_CONFIG);

    public Gateway(String topologyName, String topologyId, PhysicalPlans.Instance instance, int streamPort, int metricsPort, final NIOLooper gatewayLooper, final Communicator<Message> inStreamQueue, final Communicator<Message> outStreamQueue, Communicator<InstanceControlMsg> inControlQueue, List<Communicator<Metrics.MetricPublisherPublishMessage>> outMetricsQueues) throws IOException {
        this.gatewayLooper = gatewayLooper;
        this.gatewayMetricsCollector = new MetricsCollector(gatewayLooper, outMetricsQueues.get(0));
        this.jvmMetrics = new JVMMetrics();
        this.jvmMetrics.registerMetrics(this.gatewayMetricsCollector);
        this.gatewayMetrics = new GatewayMetrics();
        this.gatewayMetrics.registerMetrics(this.gatewayMetricsCollector);
        ErrorReportLoggingHandler.init(this.gatewayMetricsCollector, this.systemConfig.getHeronMetricsExportInterval(), this.systemConfig.getHeronMetricsMaxExceptionsPerMessageCount());
        HeronSocketOptions socketOptions = new HeronSocketOptions(this.systemConfig.getInstanceNetworkWriteBatchSize(), this.systemConfig.getInstanceNetworkWriteBatchTime(), this.systemConfig.getInstanceNetworkReadBatchSize(), this.systemConfig.getInstanceNetworkReadBatchTime(), this.systemConfig.getInstanceNetworkOptionsSocketSendBufferSize(), this.systemConfig.getInstanceNetworkOptionsSocketReceivedBufferSize(), this.systemConfig.getInstanceNetworkOptionsMaximumPacketSize());
        this.streamManagerClient = new StreamManagerClient(gatewayLooper, "127.0.0.1", streamPort, topologyName, topologyId, instance, inStreamQueue, outStreamQueue, inControlQueue, socketOptions, this.gatewayMetrics);
        this.metricsManagerClient = new MetricsManagerClient(gatewayLooper, "127.0.0.1", metricsPort, instance, outMetricsQueues, socketOptions, this.gatewayMetrics);
        this.gatewayMetricsCollector.registerMetricSampleRunnable(this.jvmMetrics.getJVMSampleRunnable(), this.systemConfig.getInstanceMetricsSystemSampleInterval());
        Runnable sampleStreamQueuesSize = new Runnable(){

            @Override
            public void run() {
                Gateway.this.gatewayMetrics.setInStreamQueueSize(inStreamQueue.size());
                Gateway.this.gatewayMetrics.setOutStreamQueueSize(outStreamQueue.size());
                Gateway.this.gatewayMetrics.setInStreamQueueExpectedCapacity(inStreamQueue.getExpectedAvailableCapacity());
                Gateway.this.gatewayMetrics.setOutStreamQueueExpectedCapacity(outStreamQueue.getExpectedAvailableCapacity());
            }
        };
        this.gatewayMetricsCollector.registerMetricSampleRunnable(sampleStreamQueuesSize, this.systemConfig.getInstanceMetricsSystemSampleInterval());
        final Duration instanceTuningInterval = this.systemConfig.getInstanceTuningInterval();
        Runnable tuningStreamQueueSize = new Runnable(){

            @Override
            public void run() {
                inStreamQueue.updateExpectedAvailableCapacity();
                outStreamQueue.updateExpectedAvailableCapacity();
                gatewayLooper.registerTimerEvent(instanceTuningInterval, this);
            }
        };
        gatewayLooper.registerTimerEvent(this.systemConfig.getInstanceMetricsSystemSampleInterval(), tuningStreamQueueSize);
    }

    @Override
    public void run() {
        Thread.currentThread().setName("GatewayThread");
        this.streamManagerClient.start();
        this.metricsManagerClient.start();
        this.gatewayLooper.loop();
    }

    @Override
    public void close() {
        LOG.info("Closing the Gateway thread");
        this.gatewayMetricsCollector.forceGatherAllMetrics();
        this.metricsManagerClient.sendAllMessage();
        this.streamManagerClient.sendAllMessage();
        this.metricsManagerClient.stop();
        this.streamManagerClient.stop();
    }
}

