/*
 * Decompiled with CFR 0.152.
 */
package ai.pipestream.dynamic.grpc.client;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.RemovalCause;
import com.github.benmanes.caffeine.cache.stats.CacheStats;
import io.grpc.Channel;
import io.grpc.ManagedChannel;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.quarkus.grpc.runtime.config.GrpcClientConfiguration;
import io.quarkus.grpc.runtime.stork.StorkGrpcChannel;
import io.smallrye.mutiny.Uni;
import io.smallrye.stork.api.ServiceInstance;
import io.vertx.core.Vertx;
import io.vertx.grpc.client.GrpcClient;
import io.vertx.grpc.client.GrpcClientOptions;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.jboss.logging.Logger;

@ApplicationScoped
public class ChannelManager {
    private static final Logger LOG = Logger.getLogger(ChannelManager.class);
    @Inject
    Vertx vertx;
    @Inject
    Executor executor;
    @ConfigProperty(name="pipeline.grpc.channel.idle-ttl-minutes", defaultValue="15")
    long channelIdleTtlMinutes;
    @ConfigProperty(name="pipeline.grpc.channel.max-size", defaultValue="1000")
    long channelMaxSize;
    @ConfigProperty(name="pipeline.grpc.channel.shutdown-timeout-seconds", defaultValue="2")
    long shutdownTimeoutSeconds;
    private Cache<String, Channel> channelCache;
    private final AtomicBoolean shuttingDown = new AtomicBoolean(false);

    @PostConstruct
    void init() {
        this.channelCache = Caffeine.newBuilder().expireAfterAccess(Duration.ofMinutes(this.channelIdleTtlMinutes)).maximumSize(this.channelMaxSize).removalListener(this::onChannelRemoved).recordStats().build();
        LOG.infof("Initialized ChannelManager with TTL=%d minutes, max size=%d", (Object)this.channelIdleTtlMinutes, (Object)this.channelMaxSize);
    }

    private void onChannelRemoved(String serviceName, Channel channel, RemovalCause cause) {
        if (channel == null) {
            return;
        }
        if (this.shuttingDown.get()) {
            LOG.debugf("Application shutting down, initiating non-blocking channel shutdown for service '%s'", (Object)serviceName);
            try {
                if (channel instanceof ManagedChannel) {
                    ((ManagedChannel)channel).shutdownNow();
                } else if (channel instanceof StorkGrpcChannel) {
                    ((StorkGrpcChannel)channel).close();
                }
            }
            catch (Exception e) {
                LOG.debugf("Error during shutdown of channel for service %s: %s", (Object)serviceName, (Object)e.getMessage());
            }
            return;
        }
        LOG.infof("Evicting gRPC channel for service '%s' due to: %s", (Object)serviceName, (Object)cause);
        try {
            if (channel instanceof ManagedChannel) {
                ManagedChannel mc = (ManagedChannel)channel;
                mc.shutdown();
                if (!mc.awaitTermination(500L, TimeUnit.MILLISECONDS)) {
                    LOG.warnf("Channel for service %s did not terminate gracefully, forcing shutdown", (Object)serviceName);
                    mc.shutdownNow();
                }
            } else if (channel instanceof StorkGrpcChannel) {
                ((StorkGrpcChannel)channel).close();
            }
            LOG.debugf("Successfully shut down channel for service: %s", (Object)serviceName);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            LOG.errorf("Interrupted while shutting down channel for service %s", (Object)serviceName);
            try {
                if (channel instanceof ManagedChannel) {
                    ((ManagedChannel)channel).shutdownNow();
                }
            }
            catch (Exception ex) {
                LOG.errorf((Throwable)ex, "Error forcing shutdown of channel for service %s", (Object)serviceName);
            }
        }
        catch (Exception e) {
            LOG.errorf((Throwable)e, "Error shutting down channel for service %s", (Object)serviceName);
            try {
                if (channel instanceof ManagedChannel) {
                    ((ManagedChannel)channel).shutdownNow();
                }
            }
            catch (Exception ex) {
                LOG.errorf((Throwable)ex, "Error forcing shutdown of channel for service %s", (Object)serviceName);
            }
        }
    }

    public Uni<Channel> getOrCreateChannel(String serviceName, List<ServiceInstance> instances) {
        Channel existing;
        if (instances == null || instances.isEmpty()) {
            return Uni.createFrom().failure((Throwable)new StatusRuntimeException(Status.UNAVAILABLE.withDescription("No instances found for service " + serviceName)));
        }
        if (this.shuttingDown.get()) {
            return Uni.createFrom().failure((Throwable)new StatusRuntimeException(Status.UNAVAILABLE.withDescription("Channel manager is shutting down")));
        }
        if (this.channelCache == null) {
            this.init();
        }
        if ((existing = (Channel)this.channelCache.getIfPresent((Object)serviceName)) != null) {
            LOG.debugf("Reusing existing gRPC channel for service: %s (type=%s)", (Object)serviceName, (Object)existing.getClass().getName());
            return Uni.createFrom().item((Object)existing);
        }
        LOG.infof("Creating new Stork gRPC channel for service: %s", (Object)serviceName);
        GrpcClientOptions clientOptions = new GrpcClientOptions();
        GrpcClient grpcClient = GrpcClient.client((Vertx)this.vertx, (GrpcClientOptions)clientOptions);
        GrpcClientConfiguration.StorkConfig storkConfig = new GrpcClientConfiguration.StorkConfig(this){

            public int threads() {
                return 10;
            }

            public long deadline() {
                return 5000L;
            }

            public int retries() {
                return 3;
            }

            public long delay() {
                return 60L;
            }

            public long period() {
                return 120L;
            }
        };
        StorkGrpcChannel created = new StorkGrpcChannel(grpcClient, serviceName, storkConfig, this.executor);
        LOG.debugf("Created StorkGrpcChannel for %s: type=%s", (Object)serviceName, (Object)created.getClass().getName());
        this.channelCache.put((Object)serviceName, (Object)created);
        return Uni.createFrom().item((Object)created);
    }

    public void evictChannel(String serviceName) {
        LOG.infof("Manually evicting channel for service: %s", (Object)serviceName);
        this.channelCache.invalidate((Object)serviceName);
    }

    public String getCacheStats() {
        CacheStats stats = this.channelCache.stats();
        return String.format("Cache stats - Size: %d, Hits: %d, Misses: %d, Hit rate: %.2f%%, Evictions: %d", this.channelCache.estimatedSize(), stats.hitCount(), stats.missCount(), stats.hitRate() * 100.0, stats.evictionCount());
    }

    public int getActiveServiceCount() {
        return Math.toIntExact(this.channelCache.estimatedSize());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @PreDestroy
    void cleanup() {
        this.shuttingDown.set(true);
        if (this.channelCache == null) {
            LOG.debug((Object)"No channel cache to clean up");
            return;
        }
        LOG.infof("Shutting down %d cached gRPC channels on application exit...", (Object)this.channelCache.estimatedSize());
        ArrayList channels = new ArrayList(this.channelCache.asMap().values());
        this.channelCache.invalidateAll();
        this.channelCache.cleanUp();
        ExecutorService executor = Executors.newSingleThreadExecutor();
        try {
            executor.submit(() -> {
                for (Channel channel : channels) {
                    try {
                        if (channel instanceof ManagedChannel) {
                            ManagedChannel mc = (ManagedChannel)channel;
                            if (mc.isShutdown()) continue;
                            mc.shutdown();
                            if (mc.awaitTermination(100L, TimeUnit.MILLISECONDS)) continue;
                            mc.shutdownNow();
                            continue;
                        }
                        if (!(channel instanceof StorkGrpcChannel)) continue;
                        ((StorkGrpcChannel)channel).close();
                    }
                    catch (Exception e) {
                        LOG.debugf("Error during channel shutdown: %s", (Object)e.getMessage());
                        try {
                            if (!(channel instanceof ManagedChannel)) continue;
                            ((ManagedChannel)channel).shutdownNow();
                        }
                        catch (Exception exception) {}
                    }
                }
            }).get(this.shutdownTimeoutSeconds, TimeUnit.SECONDS);
        }
        catch (TimeoutException e) {
            LOG.warn((Object)"Channel shutdown timed out, forcing immediate termination");
            channels.forEach(ch -> {
                try {
                    if (ch instanceof ManagedChannel) {
                        ManagedChannel mc = (ManagedChannel)ch;
                        if (!mc.isShutdown()) {
                            mc.shutdownNow();
                        }
                    } else if (ch instanceof StorkGrpcChannel) {
                        ((StorkGrpcChannel)ch).close();
                    }
                }
                catch (Exception exception) {
                    // empty catch block
                }
            });
        }
        catch (Exception e) {
            LOG.error((Object)"Error during channel cleanup", (Throwable)e);
        }
        finally {
            executor.shutdownNow();
        }
        LOG.info((Object)"ChannelManager cleanup complete.");
    }
}

