/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdds.scm.server;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import com.google.protobuf.BlockingService;
import com.google.protobuf.ProtocolMessageEnum;
import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.server.OzoneStorageContainerManager;
import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher;
import org.apache.hadoop.hdds.scm.server.SCMPolicyProvider;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.hadoop.hdds.server.ServerUtils;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.apache.hadoop.hdds.utils.HddsServerUtil;
import org.apache.hadoop.hdds.utils.ProtocolMessageMetrics;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.ozone.audit.AuditAction;
import org.apache.hadoop.ozone.audit.AuditEventStatus;
import org.apache.hadoop.ozone.audit.AuditLogger;
import org.apache.hadoop.ozone.audit.AuditLoggerType;
import org.apache.hadoop.ozone.audit.AuditMessage;
import org.apache.hadoop.ozone.audit.Auditor;
import org.apache.hadoop.ozone.audit.SCMAction;
import org.apache.hadoop.ozone.protocol.StorageContainerDatanodeProtocol;
import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand;
import org.apache.hadoop.ozone.protocol.commands.ClosePipelineCommand;
import org.apache.hadoop.ozone.protocol.commands.CreatePipelineCommand;
import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand;
import org.apache.hadoop.ozone.protocol.commands.DeleteContainerCommand;
import org.apache.hadoop.ozone.protocol.commands.RegisteredCommand;
import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.apache.hadoop.ozone.protocol.commands.SetNodeOperationalStateCommand;
import org.apache.hadoop.ozone.protocolPB.StorageContainerDatanodeProtocolPB;
import org.apache.hadoop.ozone.protocolPB.StorageContainerDatanodeProtocolServerSideTranslatorPB;
import org.apache.hadoop.security.authorize.PolicyProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SCMDatanodeProtocolServer
implements StorageContainerDatanodeProtocol,
Auditor {
    private static final Logger LOG = LoggerFactory.getLogger(SCMDatanodeProtocolServer.class);
    private static final AuditLogger AUDIT = new AuditLogger(AuditLoggerType.SCMLOGGER);
    private RPC.Server datanodeRpcServer;
    private final OzoneStorageContainerManager scm;
    private final InetSocketAddress datanodeRpcAddress;
    private final SCMDatanodeHeartbeatDispatcher heartbeatDispatcher;
    private final EventPublisher eventPublisher;
    private ProtocolMessageMetrics<ProtocolMessageEnum> protocolMessageMetrics;

    public SCMDatanodeProtocolServer(OzoneConfiguration conf, OzoneStorageContainerManager scm, EventPublisher eventPublisher) throws IOException {
        Preconditions.checkNotNull((Object)scm, (Object)"SCM cannot be null");
        Preconditions.checkNotNull((Object)eventPublisher, (Object)"EventPublisher cannot be null");
        this.scm = scm;
        this.eventPublisher = eventPublisher;
        this.heartbeatDispatcher = new SCMDatanodeHeartbeatDispatcher(scm.getScmNodeManager(), eventPublisher);
        InetSocketAddress datanodeRpcAddr = this.getDataNodeBindAddress(conf);
        this.protocolMessageMetrics = this.getProtocolMessageMetrics();
        int handlerCount = conf.getInt("ozone.scm.handler.count.key", 100);
        RPC.setProtocolEngine((Configuration)conf, (Class)this.getProtocolClass(), ProtobufRpcEngine.class);
        BlockingService dnProtoPbService = StorageContainerDatanodeProtocolProtos.StorageContainerDatanodeProtocolService.newReflectiveBlockingService((StorageContainerDatanodeProtocolProtos.StorageContainerDatanodeProtocolService.BlockingInterface)new StorageContainerDatanodeProtocolServerSideTranslatorPB((StorageContainerDatanodeProtocol)this, this.protocolMessageMetrics));
        this.datanodeRpcServer = StorageContainerManager.startRpcServer(conf, datanodeRpcAddr, this.getProtocolClass(), dnProtoPbService, handlerCount);
        this.datanodeRpcAddress = ServerUtils.updateRPCListenAddress((OzoneConfiguration)conf, (String)this.getDatanodeAddressKey(), (InetSocketAddress)datanodeRpcAddr, (RPC.Server)this.datanodeRpcServer);
        if (conf.getBoolean("hadoop.security.authorization", false)) {
            this.datanodeRpcServer.refreshServiceAcl((Configuration)conf, this.getPolicyProvider());
        }
    }

    public void start() {
        LOG.info(StorageContainerManager.buildRpcServerStartMessage("RPC server for DataNodes", this.datanodeRpcAddress));
        this.protocolMessageMetrics.register();
        this.datanodeRpcServer.start();
    }

    public InetSocketAddress getDatanodeRpcAddress() {
        return this.datanodeRpcAddress;
    }

    public StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto getVersion(StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto versionRequest) throws IOException {
        boolean auditSuccess = true;
        try {
            StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto sCMVersionResponseProto = this.scm.getScmNodeManager().getVersion(versionRequest).getProtobufMessage();
            return sCMVersionResponseProto;
        }
        catch (Exception ex) {
            auditSuccess = false;
            AUDIT.logReadFailure(this.buildAuditMessageForFailure((AuditAction)SCMAction.GET_VERSION, null, ex));
            throw ex;
        }
        finally {
            if (auditSuccess) {
                AUDIT.logReadSuccess(this.buildAuditMessageForSuccess((AuditAction)SCMAction.GET_VERSION, null));
            }
        }
    }

    public StorageContainerDatanodeProtocolProtos.SCMRegisteredResponseProto register(HddsProtos.ExtendedDatanodeDetailsProto extendedDatanodeDetailsProto, StorageContainerDatanodeProtocolProtos.NodeReportProto nodeReport, StorageContainerDatanodeProtocolProtos.ContainerReportsProto containerReportsProto, StorageContainerDatanodeProtocolProtos.PipelineReportsProto pipelineReportsProto) throws IOException {
        DatanodeDetails datanodeDetails = DatanodeDetails.getFromProtoBuf((HddsProtos.ExtendedDatanodeDetailsProto)extendedDatanodeDetailsProto);
        boolean auditSuccess = true;
        HashMap auditMap = Maps.newHashMap();
        auditMap.put("datanodeDetails", datanodeDetails.toString());
        RegisteredCommand registeredCommand = this.scm.getScmNodeManager().register(datanodeDetails, nodeReport, pipelineReportsProto);
        if (registeredCommand.getError() == StorageContainerDatanodeProtocolProtos.SCMRegisteredResponseProto.ErrorCode.success) {
            this.eventPublisher.fireEvent(SCMEvents.CONTAINER_REPORT, (Object)new SCMDatanodeHeartbeatDispatcher.ContainerReportFromDatanode(datanodeDetails, containerReportsProto));
            this.eventPublisher.fireEvent(SCMEvents.NODE_REGISTRATION_CONT_REPORT, (Object)new NodeRegistrationContainerReport(datanodeDetails, containerReportsProto));
            this.eventPublisher.fireEvent(SCMEvents.PIPELINE_REPORT, (Object)new SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode(datanodeDetails, pipelineReportsProto));
        }
        try {
            StorageContainerDatanodeProtocolProtos.SCMRegisteredResponseProto sCMRegisteredResponseProto = SCMDatanodeProtocolServer.getRegisteredResponse(registeredCommand);
            return sCMRegisteredResponseProto;
        }
        catch (Exception ex) {
            auditSuccess = false;
            AUDIT.logWriteFailure(this.buildAuditMessageForFailure((AuditAction)SCMAction.REGISTER, auditMap, ex));
            throw ex;
        }
        finally {
            if (auditSuccess) {
                AUDIT.logWriteSuccess(this.buildAuditMessageForSuccess((AuditAction)SCMAction.REGISTER, auditMap));
            }
        }
    }

    @VisibleForTesting
    public static StorageContainerDatanodeProtocolProtos.SCMRegisteredResponseProto getRegisteredResponse(RegisteredCommand cmd) {
        return cmd.getProtoBufMessage();
    }

    public StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto sendHeartbeat(StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto heartbeat) throws IOException {
        ArrayList<StorageContainerDatanodeProtocolProtos.SCMCommandProto> cmdResponses = new ArrayList<StorageContainerDatanodeProtocolProtos.SCMCommandProto>();
        for (SCMCommand cmd : this.heartbeatDispatcher.dispatch(heartbeat)) {
            cmdResponses.add(this.getCommandResponse(cmd));
        }
        boolean auditSuccess = true;
        HashMap auditMap = Maps.newHashMap();
        auditMap.put("datanodeUUID", heartbeat.getDatanodeDetails().getUuid());
        auditMap.put("command", SCMDatanodeProtocolServer.flatten(((Object)cmdResponses).toString()));
        try {
            StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto sCMHeartbeatResponseProto = StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto.newBuilder().setDatanodeUUID(heartbeat.getDatanodeDetails().getUuid()).addAllCommands(cmdResponses).build();
            return sCMHeartbeatResponseProto;
        }
        catch (Exception ex) {
            auditSuccess = false;
            AUDIT.logWriteFailure(this.buildAuditMessageForFailure((AuditAction)SCMAction.SEND_HEARTBEAT, auditMap, ex));
            throw ex;
        }
        finally {
            if (auditSuccess) {
                AUDIT.logWriteSuccess(this.buildAuditMessageForSuccess((AuditAction)SCMAction.SEND_HEARTBEAT, auditMap));
            }
        }
    }

    @VisibleForTesting
    public StorageContainerDatanodeProtocolProtos.SCMCommandProto getCommandResponse(SCMCommand cmd) throws IOException {
        StorageContainerDatanodeProtocolProtos.SCMCommandProto.Builder builder = StorageContainerDatanodeProtocolProtos.SCMCommandProto.newBuilder();
        switch (cmd.getType()) {
            case reregisterCommand: {
                return builder.setCommandType(StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type.reregisterCommand).setReregisterCommandProto(StorageContainerDatanodeProtocolProtos.ReregisterCommandProto.getDefaultInstance()).build();
            }
            case deleteBlocksCommand: {
                List<Long> txs = ((DeleteBlocksCommand)cmd).blocksTobeDeleted().stream().map(tx -> tx.getTxID()).collect(Collectors.toList());
                this.scm.getScmBlockManager().getDeletedBlockLog().incrementCount(txs);
                return builder.setCommandType(StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type.deleteBlocksCommand).setDeleteBlocksCommandProto(((DeleteBlocksCommand)cmd).getProto()).build();
            }
            case closeContainerCommand: {
                return builder.setCommandType(StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type.closeContainerCommand).setCloseContainerCommandProto(((CloseContainerCommand)cmd).getProto()).build();
            }
            case deleteContainerCommand: {
                return builder.setCommandType(StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type.deleteContainerCommand).setDeleteContainerCommandProto(((DeleteContainerCommand)cmd).getProto()).build();
            }
            case replicateContainerCommand: {
                return builder.setCommandType(StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type.replicateContainerCommand).setReplicateContainerCommandProto(((ReplicateContainerCommand)cmd).getProto()).build();
            }
            case createPipelineCommand: {
                return builder.setCommandType(StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type.createPipelineCommand).setCreatePipelineCommandProto(((CreatePipelineCommand)cmd).getProto()).build();
            }
            case closePipelineCommand: {
                return builder.setCommandType(StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type.closePipelineCommand).setClosePipelineCommandProto(((ClosePipelineCommand)cmd).getProto()).build();
            }
            case setNodeOperationalStateCommand: {
                return builder.setCommandType(StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type.setNodeOperationalStateCommand).setSetNodeOperationalStateCommandProto(((SetNodeOperationalStateCommand)cmd).getProto()).build();
            }
        }
        throw new IllegalArgumentException("Scm command " + cmd.getType().toString() + " is not implemented");
    }

    public void join() throws InterruptedException {
        LOG.trace("Join RPC server for DataNodes");
        this.datanodeRpcServer.join();
    }

    public void stop() {
        try {
            LOG.info("Stopping the RPC server for DataNodes");
            this.datanodeRpcServer.stop();
        }
        catch (Exception ex) {
            LOG.error(" datanodeRpcServer stop failed.", (Throwable)ex);
        }
        IOUtils.cleanupWithLogger((Logger)LOG, (Closeable[])new Closeable[]{this.scm.getScmNodeManager()});
        this.protocolMessageMetrics.unregister();
    }

    public AuditMessage buildAuditMessageForSuccess(AuditAction op, Map<String, String> auditMap) {
        return new AuditMessage.Builder().setUser(ServerUtils.getRemoteUserName()).atIp(Server.getRemoteAddress()).forOperation(op).withParams(auditMap).withResult(AuditEventStatus.SUCCESS).build();
    }

    public AuditMessage buildAuditMessageForFailure(AuditAction op, Map<String, String> auditMap, Throwable throwable) {
        return new AuditMessage.Builder().setUser(ServerUtils.getRemoteUserName()).atIp(Server.getRemoteAddress()).forOperation(op).withParams(auditMap).withResult(AuditEventStatus.FAILURE).withException(throwable).build();
    }

    private static String flatten(String input) {
        return input.replaceAll(System.lineSeparator(), " ").trim().replaceAll(" +", " ");
    }

    protected ProtocolMessageMetrics<ProtocolMessageEnum> getProtocolMessageMetrics() {
        return ProtocolMessageMetrics.create((String)"SCMDatanodeProtocol", (String)"SCM Datanode protocol", (Object[])StorageContainerDatanodeProtocolProtos.Type.values());
    }

    protected String getDatanodeAddressKey() {
        return "ozone.scm.datanode.address";
    }

    protected InetSocketAddress getDataNodeBindAddress(OzoneConfiguration conf) {
        return HddsServerUtil.getScmDataNodeBindAddress((ConfigurationSource)conf);
    }

    protected PolicyProvider getPolicyProvider() {
        return SCMPolicyProvider.getInstance();
    }

    protected Class getProtocolClass() {
        return StorageContainerDatanodeProtocolPB.class;
    }

    public static class NodeRegistrationContainerReport
    extends SCMDatanodeHeartbeatDispatcher.ReportFromDatanode<StorageContainerDatanodeProtocolProtos.ContainerReportsProto> {
        public NodeRegistrationContainerReport(DatanodeDetails datanodeDetails, StorageContainerDatanodeProtocolProtos.ContainerReportsProto report) {
            super(datanodeDetails, report);
        }
    }
}

