/*
 * Decompiled with CFR 0.152.
 */
package dev.openfeature.contrib.providers.flagd.resolver.process.storage;

import dev.openfeature.contrib.providers.flagd.resolver.process.model.FeatureFlag;
import dev.openfeature.contrib.providers.flagd.resolver.process.model.FlagParser;
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.Storage;
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.StorageState;
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.Connector;
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.StreamPayload;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@SuppressFBWarnings(value={"EI_EXPOSE_REP"}, justification="Feature flag comes as a Json configuration, hence they must be exposed")
public class FlagStore
implements Storage {
    private static final Logger log = LoggerFactory.getLogger(FlagStore.class);
    private final ReentrantReadWriteLock sync = new ReentrantReadWriteLock();
    private final ReentrantReadWriteLock.ReadLock readLock = this.sync.readLock();
    private final ReentrantReadWriteLock.WriteLock writeLock = this.sync.writeLock();
    private final AtomicBoolean shutdown = new AtomicBoolean(false);
    private final BlockingQueue<StorageState> stateBlockingQueue = new LinkedBlockingQueue<StorageState>(1);
    private final Map<String, FeatureFlag> flags = new HashMap<String, FeatureFlag>();
    private final Connector connector;
    private final boolean throwIfInvalid;

    public FlagStore(Connector connector) {
        this(connector, false);
    }

    public FlagStore(Connector connector, boolean throwIfInvalid) {
        this.connector = connector;
        this.throwIfInvalid = throwIfInvalid;
    }

    @Override
    public void init() throws Exception {
        this.connector.init();
        Thread streamer = new Thread(() -> {
            try {
                this.streamerListener(this.connector);
            }
            catch (InterruptedException e) {
                log.warn("connection listener failed", (Throwable)e);
                Thread.currentThread().interrupt();
            }
        });
        streamer.setDaemon(true);
        streamer.start();
    }

    @Override
    public void shutdown() throws InterruptedException {
        if (this.shutdown.getAndSet(true)) {
            return;
        }
        this.connector.shutdown();
    }

    @Override
    public FeatureFlag getFlag(String key) {
        this.readLock.lock();
        try {
            FeatureFlag featureFlag = this.flags.get(key);
            return featureFlag;
        }
        finally {
            this.readLock.unlock();
        }
    }

    @Override
    public BlockingQueue<StorageState> getStateQueue() {
        return this.stateBlockingQueue;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void streamerListener(Connector connector) throws InterruptedException {
        BlockingQueue<StreamPayload> streamPayloads = connector.getStream();
        block9: while (!this.shutdown.get()) {
            StreamPayload take = streamPayloads.take();
            switch (take.getType()) {
                case DATA: {
                    try {
                        Map<String, FeatureFlag> flagMap = FlagParser.parseString(take.getData(), this.throwIfInvalid);
                        this.writeLock.lock();
                        try {
                            this.flags.clear();
                            this.flags.putAll(flagMap);
                        }
                        finally {
                            this.writeLock.unlock();
                        }
                        if (this.stateBlockingQueue.offer(StorageState.OK)) continue block9;
                        log.warn("Failed to convey OK satus, queue is full");
                    }
                    catch (Throwable e) {
                        log.warn("Invalid flag sync payload from connector", e);
                        if (this.stateBlockingQueue.offer(StorageState.STALE)) continue block9;
                        log.warn("Failed to convey STALE satus, queue is full");
                    }
                    break;
                }
                case ERROR: {
                    if (this.stateBlockingQueue.offer(StorageState.ERROR)) continue block9;
                    log.warn("Failed to convey ERROR satus, queue is full");
                    break;
                }
                default: {
                    log.info(String.format("Payload with unknown type: %s", new Object[]{take.getType()}));
                }
            }
        }
        log.info("Shutting down store stream listener");
    }
}

