/*
 * Decompiled with CFR 0.152.
 */
package dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.grpc;

import dev.openfeature.contrib.providers.flagd.FlagdOptions;
import dev.openfeature.contrib.providers.flagd.resolver.common.ChannelBuilder;
import dev.openfeature.contrib.providers.flagd.resolver.common.backoff.GrpcStreamConnectorBackoffService;
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.Connector;
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.QueuePayload;
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.QueuePayloadType;
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.grpc.GrpcResponseModel;
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.grpc.GrpcStreamHandler;
import dev.openfeature.flagd.grpc.sync.FlagSyncServiceGrpc;
import dev.openfeature.flagd.grpc.sync.Sync;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.grpc.Channel;
import io.grpc.Context;
import io.grpc.ManagedChannel;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.event.Level;

@SuppressFBWarnings(value={"PREDICTABLE_RANDOM", "EI_EXPOSE_REP"}, justification="Random is used to generate a variation & flag configurations require exposing")
public class GrpcStreamConnector
implements Connector {
    @SuppressFBWarnings(justification="generated code")
    @Generated
    private static final Logger log = LoggerFactory.getLogger(GrpcStreamConnector.class);
    private static final int QUEUE_SIZE = 5;
    private final AtomicBoolean shutdown = new AtomicBoolean(false);
    private final BlockingQueue<QueuePayload> blockingQueue = new LinkedBlockingQueue<QueuePayload>(5);
    private final ManagedChannel channel;
    private final FlagSyncServiceGrpc.FlagSyncServiceStub serviceStub;
    private final FlagSyncServiceGrpc.FlagSyncServiceBlockingStub serviceBlockingStub;
    private final int deadline;
    private final int streamDeadlineMs;
    private final String selector;
    private final int retryBackoffMillis;

    public GrpcStreamConnector(FlagdOptions options) {
        this.channel = ChannelBuilder.nettyChannel(options);
        this.serviceStub = FlagSyncServiceGrpc.newStub((Channel)this.channel);
        this.serviceBlockingStub = FlagSyncServiceGrpc.newBlockingStub((Channel)this.channel);
        this.deadline = options.getDeadline();
        this.streamDeadlineMs = options.getStreamDeadlineMs();
        this.selector = options.getSelector();
        this.retryBackoffMillis = options.getRetryBackoffMs();
    }

    @Override
    public void init() {
        Thread listener = new Thread(() -> {
            try {
                GrpcStreamConnector.observeEventStream(this.blockingQueue, this.shutdown, this.serviceStub, this.serviceBlockingStub, this.selector, this.deadline, this.streamDeadlineMs, this.retryBackoffMillis);
            }
            catch (InterruptedException e) {
                log.warn("gRPC event stream interrupted, flag configurations are stale", (Throwable)e);
                Thread.currentThread().interrupt();
            }
        });
        listener.setDaemon(true);
        listener.start();
    }

    @Override
    public BlockingQueue<QueuePayload> getStream() {
        return this.blockingQueue;
    }

    @Override
    public void shutdown() throws InterruptedException {
        block5: {
            if (this.shutdown.getAndSet(true)) {
                return;
            }
            try {
                if (this.channel == null || this.channel.isShutdown()) break block5;
                this.channel.shutdown();
                this.channel.awaitTermination((long)this.deadline, TimeUnit.MILLISECONDS);
            }
            catch (Throwable throwable) {
                if (this.channel != null && !this.channel.isShutdown()) {
                    this.channel.shutdownNow();
                    this.channel.awaitTermination((long)this.deadline, TimeUnit.MILLISECONDS);
                    log.warn(String.format("Unable to shut down channel by %d deadline", this.deadline));
                }
                throw throwable;
            }
        }
        if (this.channel != null && !this.channel.isShutdown()) {
            this.channel.shutdownNow();
            this.channel.awaitTermination((long)this.deadline, TimeUnit.MILLISECONDS);
            log.warn(String.format("Unable to shut down channel by %d deadline", this.deadline));
        }
    }

    static void observeEventStream(BlockingQueue<QueuePayload> writeTo, AtomicBoolean shutdown, FlagSyncServiceGrpc.FlagSyncServiceStub serviceStub, FlagSyncServiceGrpc.FlagSyncServiceBlockingStub serviceBlockingStub, String selector, int deadline, int streamDeadlineMs, int retryBackoffMillis) throws InterruptedException {
        LinkedBlockingQueue<GrpcResponseModel> streamReceiver = new LinkedBlockingQueue<GrpcResponseModel>(5);
        GrpcStreamConnectorBackoffService backoffService = new GrpcStreamConnectorBackoffService(retryBackoffMillis);
        log.info("Initializing sync stream observer");
        while (!shutdown.get()) {
            writeTo.clear();
            Exception metadataException = null;
            log.debug("Initializing sync stream request");
            Sync.SyncFlagsRequest.Builder syncRequest = Sync.SyncFlagsRequest.newBuilder();
            Sync.GetMetadataRequest.Builder metadataRequest = Sync.GetMetadataRequest.newBuilder();
            Sync.GetMetadataResponse metadataResponse = Sync.GetMetadataResponse.getDefaultInstance();
            if (selector != null) {
                syncRequest.setSelector(selector);
            }
            try (Context.CancellableContext context = Context.current().withCancellation();){
                FlagSyncServiceGrpc.FlagSyncServiceStub localServiceStub = serviceStub;
                if (streamDeadlineMs > 0) {
                    localServiceStub = (FlagSyncServiceGrpc.FlagSyncServiceStub)localServiceStub.withDeadlineAfter(streamDeadlineMs, TimeUnit.MILLISECONDS);
                }
                localServiceStub.syncFlags(syncRequest.build(), new GrpcStreamHandler(streamReceiver));
                try {
                    metadataResponse = ((FlagSyncServiceGrpc.FlagSyncServiceBlockingStub)serviceBlockingStub.withDeadlineAfter(deadline, TimeUnit.MILLISECONDS)).getMetadata(metadataRequest.build());
                }
                catch (Exception e) {
                    metadataException = e;
                }
                while (!shutdown.get()) {
                    GrpcResponseModel response = (GrpcResponseModel)streamReceiver.take();
                    if (response.isComplete()) {
                        log.info("Sync stream completed");
                        break;
                    }
                    Throwable streamException = response.getError();
                    if (streamException != null || metadataException != null) {
                        long retryDelay = backoffService.getCurrentBackoffMillis();
                        if (backoffService.shouldRetrySilently()) {
                            GrpcStreamConnector.logExceptions(Level.INFO, streamException, metadataException, retryDelay);
                        } else {
                            GrpcStreamConnector.logExceptions(Level.ERROR, streamException, metadataException, retryDelay);
                            if (!writeTo.offer(new QueuePayload(QueuePayloadType.ERROR, "Error from stream or metadata", metadataResponse))) {
                                log.error("Failed to convey ERROR status, queue is full");
                            }
                        }
                        context.cancel((Throwable)metadataException);
                        break;
                    }
                    Sync.SyncFlagsResponse flagsResponse = response.getSyncFlagsResponse();
                    String data = flagsResponse.getFlagConfiguration();
                    log.debug("Got stream response: {}", (Object)data);
                    if (!writeTo.offer(new QueuePayload(QueuePayloadType.DATA, data, metadataResponse))) {
                        log.error("Stream writing failed");
                    }
                    backoffService.reset();
                }
            }
            if (shutdown.get()) continue;
            log.debug("Stream failed, retrying in {}ms", (Object)backoffService.getCurrentBackoffMillis());
            backoffService.waitUntilNextAttempt();
        }
        log.info("Shutdown invoked, exiting event stream listener");
    }

    private static void logExceptions(Level logLevel, Throwable streamException, Exception metadataException, long retryDelay) {
        if (streamException != null) {
            log.atLevel(logLevel).setCause(streamException).log("Error initializing stream, retrying in {}ms", (Object)retryDelay);
        }
        if (metadataException != null) {
            log.atLevel(logLevel).setCause((Throwable)metadataException).log("Error initializing metadata, retrying in {}ms", (Object)retryDelay);
        }
    }
}

