/*
 * Decompiled with CFR 0.152.
 */
package dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.grpc;

import dev.openfeature.contrib.providers.flagd.FlagdOptions;
import dev.openfeature.contrib.providers.flagd.resolver.common.FlagdProviderEvent;
import dev.openfeature.contrib.providers.flagd.resolver.common.GrpcConnector;
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.Connector;
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.QueuePayload;
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.QueuePayloadType;
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.grpc.GrpcResponseModel;
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.grpc.GrpcStreamHandler;
import dev.openfeature.flagd.grpc.sync.FlagSyncServiceGrpc;
import dev.openfeature.flagd.grpc.sync.Sync;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.grpc.Context;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@SuppressFBWarnings(value={"PREDICTABLE_RANDOM", "EI_EXPOSE_REP"}, justification="Random is used to generate a variation & flag configurations require exposing")
public class GrpcStreamConnector
implements Connector {
    @SuppressFBWarnings(justification="generated code")
    @Generated
    private static final Logger log = LoggerFactory.getLogger(GrpcStreamConnector.class);
    private static final int QUEUE_SIZE = 5;
    private final AtomicBoolean shutdown = new AtomicBoolean(false);
    private final BlockingQueue<QueuePayload> blockingQueue = new LinkedBlockingQueue<QueuePayload>(5);
    private final int deadline;
    private final String selector;
    private final GrpcConnector<FlagSyncServiceGrpc.FlagSyncServiceStub, FlagSyncServiceGrpc.FlagSyncServiceBlockingStub> grpcConnector;
    private final LinkedBlockingQueue<GrpcResponseModel> streamReceiver;

    public GrpcStreamConnector(FlagdOptions options, Consumer<FlagdProviderEvent> onConnectionEvent) {
        this.deadline = options.getDeadline();
        this.selector = options.getSelector();
        this.streamReceiver = new LinkedBlockingQueue(5);
        this.grpcConnector = new GrpcConnector<FlagSyncServiceGrpc.FlagSyncServiceStub, FlagSyncServiceGrpc.FlagSyncServiceBlockingStub>(options, FlagSyncServiceGrpc::newStub, FlagSyncServiceGrpc::newBlockingStub, onConnectionEvent, stub -> {
            String localSelector = this.selector;
            Sync.SyncFlagsRequest.Builder syncRequest = Sync.SyncFlagsRequest.newBuilder();
            if (localSelector != null) {
                syncRequest.setSelector(localSelector);
            }
            stub.syncFlags(syncRequest.build(), new GrpcStreamHandler(this.streamReceiver));
        });
    }

    @Override
    public void init() throws Exception {
        this.grpcConnector.initialize();
        Thread listener = new Thread(() -> {
            try {
                this.observeEventStream(this.blockingQueue, this.shutdown, this.deadline);
            }
            catch (InterruptedException e) {
                log.warn("gRPC event stream interrupted, flag configurations are stale", (Throwable)e);
                Thread.currentThread().interrupt();
            }
        });
        listener.setDaemon(true);
        listener.start();
    }

    @Override
    public BlockingQueue<QueuePayload> getStream() {
        return this.blockingQueue;
    }

    @Override
    public void shutdown() throws InterruptedException {
        if (this.shutdown.getAndSet(true)) {
            return;
        }
        this.grpcConnector.shutdown();
    }

    void observeEventStream(BlockingQueue<QueuePayload> writeTo, AtomicBoolean shutdown, int deadline) throws InterruptedException {
        log.info("Initializing sync stream observer");
        block11: while (!shutdown.get()) {
            writeTo.clear();
            Exception metadataException = null;
            log.debug("Initializing sync stream request");
            Sync.GetMetadataRequest.Builder metadataRequest = Sync.GetMetadataRequest.newBuilder();
            Sync.GetMetadataResponse metadataResponse = Sync.GetMetadataResponse.getDefaultInstance();
            Context.CancellableContext context = Context.current().withCancellation();
            Throwable throwable = null;
            try {
                try {
                    metadataResponse = this.grpcConnector.getResolver().getMetadata(metadataRequest.build());
                }
                catch (Exception e) {
                    metadataException = e;
                    log.debug("Metadata exception: {}", (Object)e.getMessage(), (Object)e);
                }
                while (!shutdown.get()) {
                    GrpcResponseModel response = this.streamReceiver.take();
                    if (response.isComplete()) {
                        log.info("Sync stream completed");
                        continue block11;
                    }
                    Throwable streamException = response.getError();
                    if (streamException != null || metadataException != null) {
                        log.debug("Exception in GRPC connection, streamException {}, metadataException {}", (Object)streamException, (Object)metadataException);
                        if (!writeTo.offer(new QueuePayload(QueuePayloadType.ERROR, "Error from stream or metadata", metadataResponse))) {
                            log.error("Failed to convey ERROR status, queue is full");
                        }
                        context.cancel((Throwable)metadataException);
                        continue block11;
                    }
                    Sync.SyncFlagsResponse flagsResponse = response.getSyncFlagsResponse();
                    String data = flagsResponse.getFlagConfiguration();
                    log.debug("Got stream response: {}", (Object)data);
                    if (writeTo.offer(new QueuePayload(QueuePayloadType.DATA, data, metadataResponse))) continue;
                    log.error("Stream writing failed");
                }
            }
            catch (Throwable throwable2) {
                throwable = throwable2;
                throw throwable2;
            }
            finally {
                if (context == null) continue;
                if (throwable != null) {
                    try {
                        context.close();
                    }
                    catch (Throwable throwable3) {
                        throwable.addSuppressed(throwable3);
                    }
                    continue;
                }
                context.close();
            }
        }
        log.info("Shutdown invoked, exiting event stream listener");
    }
}

