/*
 * Decompiled with CFR 0.152.
 */
package io.trino.connector;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.net.MediaType;
import com.google.inject.Inject;
import io.airlift.concurrent.ThreadPoolExecutorMBean;
import io.airlift.concurrent.Threads;
import io.airlift.discovery.client.ServiceDescriptor;
import io.airlift.discovery.client.ServiceSelector;
import io.airlift.discovery.client.ServiceType;
import io.airlift.http.client.BodyGenerator;
import io.airlift.http.client.HttpClient;
import io.airlift.http.client.HttpUriBuilder;
import io.airlift.http.client.JsonBodyGenerator;
import io.airlift.http.client.Request;
import io.airlift.http.client.Response;
import io.airlift.http.client.ResponseHandler;
import io.airlift.json.JsonCodec;
import io.airlift.log.Logger;
import io.airlift.node.NodeInfo;
import io.airlift.units.Duration;
import io.trino.connector.CatalogPruneTaskConfig;
import io.trino.connector.ConnectorServicesProvider;
import io.trino.metadata.CatalogManager;
import io.trino.metadata.ForNodeManager;
import io.trino.server.InternalCommunicationConfig;
import io.trino.spi.connector.CatalogHandle;
import io.trino.transaction.TransactionManager;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import java.net.URI;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.weakref.jmx.Managed;
import org.weakref.jmx.Nested;

public class CatalogPruneTask {
    private static final Logger log = Logger.get(CatalogPruneTask.class);
    private static final JsonCodec<List<CatalogHandle>> CATALOG_HANDLES_CODEC = JsonCodec.listJsonCodec(CatalogHandle.class);
    private final TransactionManager transactionManager;
    private final CatalogManager catalogManager;
    private final ConnectorServicesProvider connectorServicesProvider;
    private final NodeInfo nodeInfo;
    private final ServiceSelector selector;
    private final HttpClient httpClient;
    private final ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1, Threads.daemonThreadsNamed((String)"catalog-prune"));
    private final ThreadPoolExecutorMBean executorMBean = new ThreadPoolExecutorMBean((ThreadPoolExecutor)this.executor);
    private final boolean enabled;
    private final Duration updateInterval;
    private final boolean httpsRequired;
    private final AtomicBoolean started = new AtomicBoolean();

    @Inject
    public CatalogPruneTask(TransactionManager transactionManager, CatalogManager catalogManager, ConnectorServicesProvider connectorServicesProvider, NodeInfo nodeInfo, @ServiceType(value="trino") ServiceSelector selector, @ForNodeManager HttpClient httpClient, CatalogPruneTaskConfig catalogPruneTaskConfig, InternalCommunicationConfig internalCommunicationConfig) {
        this.transactionManager = Objects.requireNonNull(transactionManager, "transactionManager is null");
        this.catalogManager = Objects.requireNonNull(catalogManager, "catalogManager is null");
        this.connectorServicesProvider = Objects.requireNonNull(connectorServicesProvider, "connectorServicesProvider is null");
        this.nodeInfo = Objects.requireNonNull(nodeInfo, "nodeInfo is null");
        this.selector = Objects.requireNonNull(selector, "selector is null");
        this.httpClient = Objects.requireNonNull(httpClient, "httpClient is null");
        this.enabled = catalogPruneTaskConfig.isEnabled();
        this.updateInterval = catalogPruneTaskConfig.getUpdateInterval();
        this.httpsRequired = internalCommunicationConfig.isHttpsRequired();
    }

    @PostConstruct
    public void start() {
        if (this.enabled && !this.started.getAndSet(true)) {
            this.executor.scheduleWithFixedDelay(() -> {
                try {
                    this.pruneWorkerCatalogs();
                }
                catch (Throwable e) {
                    log.warn(e, "Error pruning catalogs");
                }
            }, this.updateInterval.toMillis(), this.updateInterval.toMillis(), TimeUnit.MILLISECONDS);
        }
    }

    @PreDestroy
    public void shutdown() {
        this.executor.shutdownNow();
    }

    @Managed
    @Nested
    public ThreadPoolExecutorMBean getExecutor() {
        return this.executorMBean;
    }

    @VisibleForTesting
    public void pruneWorkerCatalogs() {
        Set online = (Set)this.selector.selectAllServices().stream().filter(descriptor -> !this.nodeInfo.getNodeId().equals(descriptor.getNodeId())).collect(ImmutableSet.toImmutableSet());
        List<CatalogHandle> activeCatalogs = this.getActiveCatalogs();
        this.pruneWorkerCatalogs(online, activeCatalogs);
        this.connectorServicesProvider.pruneCatalogs((Set<CatalogHandle>)ImmutableSet.of());
    }

    void pruneWorkerCatalogs(Set<ServiceDescriptor> online, List<CatalogHandle> activeCatalogs) {
        for (ServiceDescriptor service : online) {
            URI uri = this.getHttpUri(service);
            if (uri == null) continue;
            uri = HttpUriBuilder.uriBuilderFrom((URI)uri).appendPath("/v1/task/pruneCatalogs").build();
            Request request = Request.Builder.preparePost().setUri(uri).addHeader("Content-Type", MediaType.JSON_UTF_8.toString()).setBodyGenerator((BodyGenerator)JsonBodyGenerator.jsonBodyGenerator(CATALOG_HANDLES_CODEC, activeCatalogs)).build();
            this.httpClient.executeAsync(request, (ResponseHandler)new ResponseHandler<Object, Exception>(){

                public Exception handleException(Request request, Exception exception) {
                    log.debug((Throwable)exception, "Error pruning catalogs on server: %s", new Object[]{request.getUri()});
                    return exception;
                }

                public Object handle(Request request, Response response) {
                    log.debug("Pruned catalogs on server: %s", new Object[]{request.getUri()});
                    return null;
                }
            });
        }
    }

    private List<CatalogHandle> getActiveCatalogs() {
        ImmutableSet.Builder activeCatalogs = ImmutableSet.builder();
        this.transactionManager.getAllTransactionInfos().forEach(info -> activeCatalogs.addAll(info.getActiveCatalogs()));
        activeCatalogs.addAll(this.catalogManager.getActiveCatalogs());
        return ImmutableList.copyOf((Collection)activeCatalogs.build());
    }

    private URI getHttpUri(ServiceDescriptor descriptor) {
        String url = (String)descriptor.getProperties().get(this.httpsRequired ? "https" : "http");
        if (url != null) {
            return URI.create(url);
        }
        return null;
    }
}

