/*
 * Decompiled with CFR 0.152.
 */
package dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.sync;

import dev.openfeature.contrib.providers.flagd.FlagdOptions;
import dev.openfeature.contrib.providers.flagd.resolver.common.ChannelConnector;
import dev.openfeature.contrib.providers.flagd.resolver.common.FlagdProviderEvent;
import dev.openfeature.contrib.providers.flagd.resolver.common.QueueingStreamObserver;
import dev.openfeature.contrib.providers.flagd.resolver.common.StreamResponseModel;
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.QueuePayload;
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.QueuePayloadType;
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.QueueSource;
import dev.openfeature.flagd.grpc.sync.FlagSyncServiceGrpc;
import dev.openfeature.flagd.grpc.sync.Sync;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.grpc.Channel;
import io.grpc.Context;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@SuppressFBWarnings(value={"PREDICTABLE_RANDOM", "EI_EXPOSE_REP"}, justification="Random is used to generate a variation & flag configurations require exposing")
public class SyncStreamQueueSource
implements QueueSource {
    @SuppressFBWarnings(justification="generated code")
    @Generated
    private static final Logger log = LoggerFactory.getLogger(SyncStreamQueueSource.class);
    private static final int QUEUE_SIZE = 5;
    private final AtomicBoolean shutdown = new AtomicBoolean(false);
    private final int streamDeadline;
    private final int deadline;
    private final String selector;
    private final String providerId;
    private final boolean syncMetadataDisabled;
    private final ChannelConnector<FlagSyncServiceGrpc.FlagSyncServiceStub, FlagSyncServiceGrpc.FlagSyncServiceBlockingStub> channelConnector;
    private final LinkedBlockingQueue<StreamResponseModel<Sync.SyncFlagsResponse>> incomingQueue = new LinkedBlockingQueue(5);
    private final BlockingQueue<QueuePayload> outgoingQueue = new LinkedBlockingQueue<QueuePayload>(5);
    private final FlagSyncServiceGrpc.FlagSyncServiceStub stub;
    private final FlagSyncServiceGrpc.FlagSyncServiceBlockingStub blockingStub;

    public SyncStreamQueueSource(FlagdOptions options, Consumer<FlagdProviderEvent> onConnectionEvent) {
        this.streamDeadline = options.getStreamDeadlineMs();
        this.deadline = options.getDeadline();
        this.selector = options.getSelector();
        this.providerId = options.getProviderId();
        this.syncMetadataDisabled = options.isSyncMetadataDisabled();
        this.channelConnector = new ChannelConnector(options, onConnectionEvent);
        this.stub = (FlagSyncServiceGrpc.FlagSyncServiceStub)FlagSyncServiceGrpc.newStub((Channel)this.channelConnector.getChannel()).withWaitForReady();
        this.blockingStub = (FlagSyncServiceGrpc.FlagSyncServiceBlockingStub)FlagSyncServiceGrpc.newBlockingStub((Channel)this.channelConnector.getChannel()).withWaitForReady();
    }

    protected SyncStreamQueueSource(FlagdOptions options, ChannelConnector<FlagSyncServiceGrpc.FlagSyncServiceStub, FlagSyncServiceGrpc.FlagSyncServiceBlockingStub> connectorMock, FlagSyncServiceGrpc.FlagSyncServiceStub stubMock, FlagSyncServiceGrpc.FlagSyncServiceBlockingStub blockingStubMock) {
        this.streamDeadline = options.getStreamDeadlineMs();
        this.deadline = options.getDeadline();
        this.selector = options.getSelector();
        this.providerId = options.getProviderId();
        this.channelConnector = connectorMock;
        this.stub = stubMock;
        this.syncMetadataDisabled = options.isSyncMetadataDisabled();
        this.blockingStub = blockingStubMock;
    }

    @Override
    public void init() throws Exception {
        this.channelConnector.initialize();
        Thread listener = new Thread(() -> {
            try {
                this.observeSyncStream();
            }
            catch (InterruptedException e) {
                log.warn("gRPC event stream interrupted, flag configurations are stale", (Throwable)e);
                Thread.currentThread().interrupt();
            }
        });
        listener.setDaemon(true);
        listener.start();
    }

    @Override
    public BlockingQueue<QueuePayload> getStreamQueue() {
        return this.outgoingQueue;
    }

    @Override
    public void shutdown() throws InterruptedException {
        if (this.shutdown.getAndSet(true)) {
            return;
        }
        this.channelConnector.shutdown();
    }

    private void observeSyncStream() throws InterruptedException {
        log.info("Initializing sync stream observer");
        block7: while (!this.shutdown.get()) {
            log.debug("Initializing sync stream request");
            Sync.GetMetadataRequest.Builder metadataRequest = Sync.GetMetadataRequest.newBuilder();
            Sync.GetMetadataResponse metadataResponse = Sync.GetMetadataResponse.getDefaultInstance();
            Context.CancellableContext context = Context.current().withCancellation();
            try {
                this.restart();
                if (!this.syncMetadataDisabled) {
                    try {
                        FlagSyncServiceGrpc.FlagSyncServiceBlockingStub localStub = this.blockingStub;
                        if (this.deadline > 0) {
                            localStub = (FlagSyncServiceGrpc.FlagSyncServiceBlockingStub)localStub.withDeadlineAfter(this.deadline, TimeUnit.MILLISECONDS);
                        }
                        metadataResponse = localStub.getMetadata(metadataRequest.build());
                    }
                    catch (Exception metaEx) {
                        log.error("Metadata exception: {}, cancelling stream", (Object)metaEx.getMessage(), (Object)metaEx);
                        context.cancel((Throwable)metaEx);
                    }
                }
                while (!this.shutdown.get() && !Context.current().isCancelled()) {
                    StreamResponseModel<Sync.SyncFlagsResponse> taken = this.incomingQueue.take();
                    if (taken.isComplete()) {
                        log.debug("Sync stream completed, will restart");
                        continue block7;
                    }
                    Throwable streamException = taken.getError();
                    if (streamException != null) {
                        log.debug("Exception in stream RPC, streamException {}, will restart", streamException);
                        if (this.outgoingQueue.offer(new QueuePayload(QueuePayloadType.ERROR, String.format("Error from stream: %s", streamException.getMessage()), metadataResponse))) continue block7;
                        log.error("Failed to convey ERROR status, queue is full");
                        continue block7;
                    }
                    Sync.SyncFlagsResponse flagsResponse = taken.getResponse();
                    String data = flagsResponse.getFlagConfiguration();
                    log.debug("Got stream response: {}", (Object)data);
                    if (this.outgoingQueue.offer(new QueuePayload(QueuePayloadType.DATA, data, metadataResponse))) continue;
                    log.error("Stream writing failed");
                }
            }
            finally {
                if (context == null) continue;
                context.close();
            }
        }
        log.info("Shutdown invoked, exiting event stream listener");
    }

    private void restart() {
        FlagSyncServiceGrpc.FlagSyncServiceStub localStub = this.stub;
        if (this.streamDeadline > 0) {
            localStub = (FlagSyncServiceGrpc.FlagSyncServiceStub)localStub.withDeadlineAfter(this.streamDeadline, TimeUnit.MILLISECONDS);
        }
        Sync.SyncFlagsRequest.Builder syncRequest = Sync.SyncFlagsRequest.newBuilder();
        if (this.selector != null) {
            syncRequest.setSelector(this.selector);
        }
        if (this.providerId != null) {
            syncRequest.setProviderId(this.providerId);
        }
        localStub.syncFlags(syncRequest.build(), new QueueingStreamObserver<Sync.SyncFlagsResponse>(this.incomingQueue));
    }
}

