/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.yarn.server.resourcemanager.volume.csi;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.CsiAdaptorProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.GetPluginInfoRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetPluginInfoResponse;
import org.apache.hadoop.yarn.client.NMProxy;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.VolumeManager;
import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.VolumeStates;
import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.lifecycle.Volume;
import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.provisioner.VolumeProvisioningResults;
import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.provisioner.VolumeProvisioningTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class VolumeManagerImpl
extends AbstractService
implements VolumeManager {
    private static final Logger LOG = LoggerFactory.getLogger(VolumeManagerImpl.class);
    private final VolumeStates volumeStates = new VolumeStates();
    private ScheduledExecutorService provisioningExecutor;
    private Map<String, CsiAdaptorProtocol> csiAdaptorMap = new ConcurrentHashMap<String, CsiAdaptorProtocol>();
    private static final int PROVISIONING_TASK_THREAD_POOL_SIZE = 10;

    public VolumeManagerImpl() {
        super(VolumeManagerImpl.class.getName());
        this.provisioningExecutor = Executors.newScheduledThreadPool(10);
    }

    private void initCsiAdaptorCache(Map<String, CsiAdaptorProtocol> adaptorMap, Configuration conf) throws IOException, YarnException {
        LOG.info("Initializing cache for csi-driver-adaptors");
        String[] addresses = conf.getStrings("yarn.nodemanager.csi-driver-adaptor.addresses");
        if (addresses != null && addresses.length > 0) {
            for (String addr : addresses) {
                LOG.info("Found csi-driver-adaptor socket address: " + addr);
                InetSocketAddress address = NetUtils.createSocketAddr((String)addr);
                YarnRPC rpc = YarnRPC.create((Configuration)conf);
                UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
                CsiAdaptorProtocol adaptorClient = (CsiAdaptorProtocol)NMProxy.createNMProxy((Configuration)conf, CsiAdaptorProtocol.class, (UserGroupInformation)currentUser, (YarnRPC)rpc, (InetSocketAddress)address);
                LOG.info("Retrieving info from csi-driver-adaptor on address " + addr);
                GetPluginInfoResponse response = adaptorClient.getPluginInfo(GetPluginInfoRequest.newInstance());
                if (Strings.isNullOrEmpty((String)response.getDriverName())) continue;
                String driverName = response.getDriverName();
                if (adaptorMap.containsKey(driverName)) {
                    throw new YarnException("Duplicate driver adaptor found, driver name: " + driverName);
                }
                adaptorMap.put(driverName, adaptorClient);
                LOG.info("CSI Adaptor added to the cache, adaptor name: " + driverName + ", driver version: " + response.getVersion());
            }
        }
    }

    @Override
    public CsiAdaptorProtocol getAdaptorByDriverName(String driverName) {
        return this.csiAdaptorMap.get(driverName);
    }

    @Override
    @VisibleForTesting
    public void registerCsiDriverAdaptor(String driverName, CsiAdaptorProtocol client) {
        this.csiAdaptorMap.put(driverName, client);
    }

    protected void serviceInit(Configuration conf) throws Exception {
        this.initCsiAdaptorCache(this.csiAdaptorMap, conf);
        super.serviceInit(conf);
    }

    protected void serviceStart() throws Exception {
        super.serviceStart();
    }

    protected void serviceStop() throws Exception {
        this.provisioningExecutor.shutdown();
        super.serviceStop();
    }

    @Override
    public VolumeStates getVolumeStates() {
        return this.volumeStates;
    }

    @Override
    public Volume addOrGetVolume(Volume volume) {
        if (this.volumeStates.getVolume(volume.getVolumeId()) != null) {
            return this.volumeStates.getVolume(volume.getVolumeId());
        }
        this.volumeStates.addVolumeIfAbsent(volume);
        return volume;
    }

    @Override
    public ScheduledFuture<VolumeProvisioningResults> schedule(VolumeProvisioningTask volumeProvisioningTask, int delaySecond) {
        LOG.info("Scheduling provision volume task (with delay " + delaySecond + "s), handling " + volumeProvisioningTask.getVolumes().size() + " volume provisioning");
        return this.provisioningExecutor.schedule(volumeProvisioningTask, (long)delaySecond, TimeUnit.SECONDS);
    }
}

