/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.admin.impl;

import com.google.common.collect.Maps;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
import java.time.Duration;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ScheduledExecutorService;
import javax.ws.rs.DELETE;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.Suspended;
import javax.ws.rs.core.Response;
import org.apache.pulsar.PulsarVersion;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.loadbalance.LeaderBroker;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.web.PulsarWebResource;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.conf.InternalConfigurationData;
import org.apache.pulsar.common.policies.data.BrokerInfo;
import org.apache.pulsar.common.policies.data.NamespaceOwnershipStatus;
import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BrokersBase
extends PulsarWebResource {
    private static final Logger LOG = LoggerFactory.getLogger(BrokersBase.class);
    private static final Duration HEALTHCHECK_READ_TIMEOUT = Duration.ofSeconds(10L);

    @GET
    @Path(value="/{cluster}")
    @ApiOperation(value="Get the list of active brokers (web service addresses) in the cluster.If authorization is not enabled, any cluster name is valid.", response=String.class, responseContainer="Set")
    @ApiResponses(value={@ApiResponse(code=307, message="Current broker doesn't serve this cluster"), @ApiResponse(code=401, message="Authentication required"), @ApiResponse(code=403, message="This operation requires super-user access"), @ApiResponse(code=404, message="Cluster does not exist: cluster={clustername}")})
    public Set<String> getActiveBrokers(@PathParam(value="cluster") String cluster) throws Exception {
        this.validateSuperUserAccess();
        this.validateClusterOwnership(cluster);
        try {
            return new HashSet<String>(this.dynamicConfigurationResources().getChildren("/loadbalance/brokers"));
        }
        catch (Exception e) {
            LOG.error("[{}] Failed to get active broker list: cluster={}", new Object[]{this.clientAppId(), cluster, e});
            throw new RestException(e);
        }
    }

    @GET
    @Path(value="/leaderBroker")
    @ApiOperation(value="Get the information of the leader broker.", response=BrokerInfo.class)
    @ApiResponses(value={@ApiResponse(code=401, message="Authentication required"), @ApiResponse(code=403, message="This operation requires super-user access"), @ApiResponse(code=404, message="Leader broker not found")})
    public BrokerInfo getLeaderBroker() throws Exception {
        this.validateSuperUserAccess();
        try {
            LeaderBroker leaderBroker = this.pulsar().getLeaderElectionService().getCurrentLeader().orElseThrow(() -> new RestException(Response.Status.NOT_FOUND, "Couldn't find leader broker"));
            return BrokerInfo.builder().serviceUrl(leaderBroker.getServiceUrl()).build();
        }
        catch (Exception e) {
            LOG.error("[{}] Failed to get the information of the leader broker.", (Object)this.clientAppId(), (Object)e);
            throw new RestException(e);
        }
    }

    @GET
    @Path(value="/{clusterName}/{broker-webserviceurl}/ownedNamespaces")
    @ApiOperation(value="Get the list of namespaces served by the specific broker", response=NamespaceOwnershipStatus.class, responseContainer="Map")
    @ApiResponses(value={@ApiResponse(code=307, message="Current broker doesn't serve the cluster"), @ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Cluster doesn't exist")})
    public Map<String, NamespaceOwnershipStatus> getOwnedNamespaces(@PathParam(value="clusterName") String cluster, @PathParam(value="broker-webserviceurl") String broker) throws Exception {
        this.validateSuperUserAccess();
        this.validateClusterOwnership(cluster);
        this.validateBrokerName(broker);
        try {
            return this.pulsar().getNamespaceService().getOwnedNameSpacesStatus();
        }
        catch (Exception e) {
            LOG.error("[{}] Failed to get the namespace ownership status. cluster={}, broker={}", new Object[]{this.clientAppId(), cluster, broker});
            throw new RestException(e);
        }
    }

    @POST
    @Path(value="/configuration/{configName}/{configValue}")
    @ApiOperation(value="Update dynamic serviceconfiguration into zk only. This operation requires Pulsar super-user privileges.")
    @ApiResponses(value={@ApiResponse(code=204, message="Service configuration updated successfully"), @ApiResponse(code=403, message="You don't have admin permission to update service-configuration"), @ApiResponse(code=404, message="Configuration not found"), @ApiResponse(code=412, message="Invalid dynamic-config value"), @ApiResponse(code=500, message="Internal server error")})
    public void updateDynamicConfiguration(@PathParam(value="configName") String configName, @PathParam(value="configValue") String configValue) throws Exception {
        this.validateSuperUserAccess();
        this.persistDynamicConfiguration(configName, configValue);
    }

    @DELETE
    @Path(value="/configuration/{configName}")
    @ApiOperation(value="Delete dynamic serviceconfiguration into zk only. This operation requires Pulsar super-user privileges.")
    @ApiResponses(value={@ApiResponse(code=204, message="Service configuration updated successfully"), @ApiResponse(code=403, message="You don't have admin permission to update service-configuration"), @ApiResponse(code=412, message="Invalid dynamic-config value"), @ApiResponse(code=500, message="Internal server error")})
    public void deleteDynamicConfiguration(@PathParam(value="configName") String configName) throws Exception {
        this.validateSuperUserAccess();
        this.deleteDynamicConfigurationOnZk(configName);
    }

    @GET
    @Path(value="/configuration/values")
    @ApiOperation(value="Get value of all dynamic configurations' value overridden on local config")
    @ApiResponses(value={@ApiResponse(code=403, message="You don't have admin permission to view configuration"), @ApiResponse(code=404, message="Configuration not found"), @ApiResponse(code=500, message="Internal server error")})
    public Map<String, String> getAllDynamicConfigurations() throws Exception {
        this.validateSuperUserAccess();
        try {
            return (Map)this.dynamicConfigurationResources().get("/admin/configuration").orElseThrow(() -> new RestException(Response.Status.NOT_FOUND, "Couldn't find configuration in zk"));
        }
        catch (RestException e) {
            LOG.error("[{}] couldn't find any configuration in zk {}", new Object[]{this.clientAppId(), e.getMessage(), e});
            throw e;
        }
        catch (Exception e) {
            LOG.error("[{}] Failed to retrieve configuration from zk {}", new Object[]{this.clientAppId(), e.getMessage(), e});
            throw new RestException(e);
        }
    }

    @GET
    @Path(value="/configuration")
    @ApiOperation(value="Get all updatable dynamic configurations's name")
    @ApiResponses(value={@ApiResponse(code=403, message="You don't have admin permission to get configuration")})
    public List<String> getDynamicConfigurationName() {
        this.validateSuperUserAccess();
        return BrokerService.getDynamicConfiguration();
    }

    @GET
    @Path(value="/configuration/runtime")
    @ApiOperation(value="Get all runtime configurations. This operation requires Pulsar super-user privileges.")
    @ApiResponses(value={@ApiResponse(code=403, message="Don't have admin permission")})
    public Map<String, String> getRuntimeConfiguration() {
        this.validateSuperUserAccess();
        return this.pulsar().getBrokerService().getRuntimeConfiguration();
    }

    private synchronized void persistDynamicConfiguration(String configName, String configValue) {
        try {
            if (!BrokerService.validateDynamicConfiguration(configName, configValue)) {
                throw new RestException(Response.Status.PRECONDITION_FAILED, " Invalid dynamic-config value");
            }
            if (!BrokerService.isDynamicConfiguration(configName)) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("[{}] Can't update non-dynamic configuration {}/{}", new Object[]{this.clientAppId(), configName, configValue});
                }
                throw new RestException(Response.Status.PRECONDITION_FAILED, " Can't update non-dynamic configuration");
            }
            this.dynamicConfigurationResources().setWithCreate("/admin/configuration", old -> {
                HashMap configurationMap = old.isPresent() ? (Map)old.get() : Maps.newHashMap();
                configurationMap.put(configName, configValue);
                return configurationMap;
            });
            LOG.info("[{}] Updated Service configuration {}/{}", new Object[]{this.clientAppId(), configName, configValue});
        }
        catch (RestException re) {
            throw re;
        }
        catch (Exception ie) {
            LOG.error("[{}] Failed to update configuration {}/{}, {}", new Object[]{this.clientAppId(), configName, configValue, ie.getMessage(), ie});
            throw new RestException(ie);
        }
    }

    @GET
    @Path(value="/internal-configuration")
    @ApiOperation(value="Get the internal configuration data", response=InternalConfigurationData.class)
    @ApiResponses(value={@ApiResponse(code=403, message="Don't have admin permission")})
    public InternalConfigurationData getInternalConfigurationData() {
        this.validateSuperUserAccess();
        return this.pulsar().getInternalConfigurationData();
    }

    @GET
    @Path(value="/backlog-quota-check")
    @ApiOperation(value="An REST endpoint to trigger backlogQuotaCheck")
    @ApiResponses(value={@ApiResponse(code=200, message="Everything is OK"), @ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=500, message="Internal server error")})
    public void backlogQuotaCheck(@Suspended AsyncResponse asyncResponse) {
        this.validateSuperUserAccess();
        this.pulsar().getBrokerService().executor().execute(() -> {
            try {
                this.pulsar().getBrokerService().monitorBacklogQuota();
                asyncResponse.resume((Object)Response.noContent().build());
            }
            catch (Exception e) {
                LOG.error("trigger backlogQuotaCheck fail", (Throwable)e);
                asyncResponse.resume((Throwable)((Object)new RestException(e)));
            }
        });
    }

    @GET
    @Path(value="/ready")
    @ApiOperation(value="Check if the broker is fully initialized")
    @ApiResponses(value={@ApiResponse(code=200, message="Broker is ready"), @ApiResponse(code=500, message="Broker is not ready")})
    public void isReady(@Suspended AsyncResponse asyncResponse) {
        if (this.pulsar().getState() == PulsarService.State.Started) {
            asyncResponse.resume((Object)Response.ok((Object)"ok").build());
        } else {
            asyncResponse.resume((Object)Response.serverError().build());
        }
    }

    @GET
    @Path(value="/health")
    @ApiOperation(value="Run a healthcheck against the broker")
    @ApiResponses(value={@ApiResponse(code=200, message="Everything is OK"), @ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Cluster doesn't exist"), @ApiResponse(code=500, message="Internal server error")})
    public void healthcheck(@Suspended AsyncResponse asyncResponse) throws Exception {
        this.validateSuperUserAccess();
        String heartbeatNamespace = NamespaceService.getHeartbeatNamespace(this.pulsar().getAdvertisedAddress(), this.pulsar().getConfiguration());
        String topic = String.format("persistent://%s/healthcheck", heartbeatNamespace);
        PulsarClient client = this.pulsar().getClient();
        String messageStr = UUID.randomUUID().toString();
        try {
            this.pulsar().getBrokerService().getTopic(topic, true).get().ifPresent(t -> t.getSubscriptions().forEach((__, value) -> {
                try {
                    value.deleteForcefully();
                }
                catch (Exception e) {
                    LOG.warn("Failed to delete previous subscription {} for health check", (Object)value.getName(), (Object)e);
                }
            }));
        }
        catch (Exception e) {
            LOG.warn("Failed to try to delete subscriptions for health check", (Throwable)e);
        }
        CompletableFuture producerFuture = client.newProducer(Schema.STRING).topic(topic).createAsync();
        CompletableFuture readerFuture = client.newReader(Schema.STRING).topic(topic).startMessageId(MessageId.latest).createAsync();
        CompletableFuture completePromise = new CompletableFuture();
        CompletableFuture.allOf(producerFuture, readerFuture).whenComplete((ignore, exception) -> {
            if (exception != null) {
                completePromise.completeExceptionally((Throwable)exception);
            } else {
                ((CompletableFuture)producerFuture.thenCompose(producer -> producer.sendAsync((Object)messageStr))).whenComplete((ignore2, exception2) -> {
                    if (exception2 != null) {
                        completePromise.completeExceptionally((Throwable)exception2);
                    }
                });
                this.healthcheckReadLoop(readerFuture, completePromise, messageStr);
                FutureUtil.addTimeoutHandling((CompletableFuture)completePromise, (Duration)HEALTHCHECK_READ_TIMEOUT, (ScheduledExecutorService)this.pulsar().getExecutor(), () -> FutureUtil.createTimeoutException((String)"Timed out reading", this.getClass(), (String)"healthcheck(...)"));
            }
        });
        completePromise.whenComplete((ignore, exception) -> {
            producerFuture.thenAccept(producer -> producer.closeAsync().whenComplete((ignore2, exception2) -> {
                if (exception2 != null) {
                    LOG.warn("Error closing producer for healthcheck", exception2);
                }
            }));
            readerFuture.thenAccept(reader -> reader.closeAsync().whenComplete((ignore2, exception2) -> {
                if (exception2 != null) {
                    LOG.warn("Error closing reader for healthcheck", exception2);
                }
            }));
            if (exception != null) {
                asyncResponse.resume((Throwable)((Object)new RestException((Throwable)exception)));
            } else {
                asyncResponse.resume((Object)"ok");
            }
        });
    }

    private void healthcheckReadLoop(CompletableFuture<Reader<String>> readerFuture, CompletableFuture<?> completablePromise, String messageStr) {
        readerFuture.thenAccept(reader -> {
            CompletionStage readFuture = reader.readNextAsync().whenComplete((m, exception) -> {
                if (exception != null) {
                    completablePromise.completeExceptionally((Throwable)exception);
                } else if (((String)m.getValue()).equals(messageStr)) {
                    completablePromise.complete(null);
                } else {
                    this.healthcheckReadLoop(readerFuture, completablePromise, messageStr);
                }
            });
        });
    }

    private synchronized void deleteDynamicConfigurationOnZk(String configName) {
        try {
            if (!BrokerService.isDynamicConfiguration(configName)) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("[{}] Can't update non-dynamic configuration {}", (Object)this.clientAppId(), (Object)configName);
                }
                throw new RestException(Response.Status.PRECONDITION_FAILED, " Can't update non-dynamic configuration");
            }
            this.dynamicConfigurationResources().set("/admin/configuration", old -> {
                if (old != null) {
                    old.remove(configName);
                }
                return old;
            });
            LOG.info("[{}] Deleted Service configuration {}", (Object)this.clientAppId(), (Object)configName);
        }
        catch (RestException re) {
            throw re;
        }
        catch (Exception ie) {
            LOG.error("[{}] Failed to update configuration {}, {}", new Object[]{this.clientAppId(), configName, ie.getMessage(), ie});
            throw new RestException(ie);
        }
    }

    @GET
    @Path(value="/version")
    @ApiOperation(value="Get version of current broker")
    @ApiResponses(value={@ApiResponse(code=200, message="Everything is OK"), @ApiResponse(code=500, message="Internal server error")})
    public String version() throws Exception {
        return PulsarVersion.getVersion();
    }
}

