/*
 * Decompiled with CFR 0.152.
 */
package io.micronaut.kubernetes.discovery.provider;

import io.kubernetes.client.common.KubernetesObject;
import io.kubernetes.client.openapi.ApiException;
import io.kubernetes.client.openapi.models.V1Endpoints;
import io.kubernetes.client.openapi.models.V1ObjectMeta;
import io.micronaut.discovery.ServiceInstance;
import io.micronaut.kubernetes.KubernetesConfiguration;
import io.micronaut.kubernetes.client.reactor.CoreV1ApiReactorClient;
import io.micronaut.kubernetes.discovery.AbstractKubernetesServiceInstanceProvider;
import io.micronaut.kubernetes.discovery.KubernetesServiceConfiguration;
import io.micronaut.kubernetes.util.KubernetesUtils;
import jakarta.inject.Singleton;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;

@Singleton
public class KubernetesServiceInstanceEndpointProvider
extends AbstractKubernetesServiceInstanceProvider {
    public static final String MODE = "endpoint";
    protected static final Logger LOG = LoggerFactory.getLogger(KubernetesServiceInstanceEndpointProvider.class);
    private final CoreV1ApiReactorClient client;
    private final KubernetesConfiguration.KubernetesDiscoveryConfiguration discoveryConfiguration;

    public KubernetesServiceInstanceEndpointProvider(CoreV1ApiReactorClient client, KubernetesConfiguration.KubernetesDiscoveryConfiguration discoveryConfiguration) {
        this.client = client;
        this.discoveryConfiguration = discoveryConfiguration;
    }

    @Override
    public String getMode() {
        return MODE;
    }

    @Override
    public Publisher<List<ServiceInstance>> getInstances(KubernetesServiceConfiguration serviceConfiguration) {
        String serviceName = serviceConfiguration.getName().orElseThrow(() -> new IllegalArgumentException("KubernetesServiceConfiguration is missing service name."));
        String serviceNamespace = serviceConfiguration.getNamespace().orElseThrow(() -> new IllegalArgumentException("KubernetesServiceConfiguration is missing namespace."));
        AtomicReference metadata = new AtomicReference();
        Predicate<KubernetesObject> globalFilter = !serviceConfiguration.isManual() ? this.compositePredicate(KubernetesUtils.getIncludesFilter(this.discoveryConfiguration.getIncludes()), KubernetesUtils.getExcludesFilter(this.discoveryConfiguration.getExcludes()), KubernetesUtils.getLabelsFilter(this.discoveryConfiguration.getLabels())) : f -> true;
        if (LOG.isTraceEnabled()) {
            LOG.trace("Fetching Endpoints {}", (Object)serviceConfiguration);
        }
        return this.client.readNamespacedEndpoints(serviceName, serviceNamespace, null, null, null).doOnError(ApiException.class, throwable -> LOG.error("Failed to list Endpoints [ " + serviceName + "] from namespace [" + serviceNamespace + "]: " + throwable.getResponseBody(), (Throwable)throwable)).filter(globalFilter).filter(v1Endpoints -> v1Endpoints.getSubsets() != null).doOnNext(endpoints -> metadata.set(endpoints.getMetadata())).flatMapIterable(V1Endpoints::getSubsets).filter(subset -> this.hasValidPortConfiguration(((List)Optional.ofNullable(subset.getPorts()).orElse(new ArrayList())).stream().map(AbstractKubernetesServiceInstanceProvider.PortBinder::fromEndpointPort).collect(Collectors.toList()), serviceConfiguration)).filter(subset -> subset.getAddresses() != null && !subset.getAddresses().isEmpty()).map(subset -> subset.getPorts().stream().filter(port -> !serviceConfiguration.getPort().isPresent() || port.getName().equals(serviceConfiguration.getPort().get())).flatMap(port -> subset.getAddresses().stream().map(address -> this.buildServiceInstance(serviceConfiguration.getServiceId(), AbstractKubernetesServiceInstanceProvider.PortBinder.fromEndpointPort(port), address.getIp(), (V1ObjectMeta)metadata.get()))).collect(Collectors.toList())).onErrorResume(throwable -> {
            if (LOG.isErrorEnabled()) {
                LOG.error("Error while processing discovered endpoints [" + serviceName + "]", throwable);
            }
            return Flux.just(Collections.emptyList());
        }).defaultIfEmpty(new ArrayList());
    }
}

