/*
 * Decompiled with CFR 0.152.
 */
package dev.openfeature.contrib.providers.flagd.resolver.process.storage;

import dev.openfeature.contrib.providers.flagd.resolver.common.Convert;
import dev.openfeature.contrib.providers.flagd.resolver.process.model.FeatureFlag;
import dev.openfeature.contrib.providers.flagd.resolver.process.model.FlagParser;
import dev.openfeature.contrib.providers.flagd.resolver.process.model.ParsingResult;
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.Storage;
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.StorageQueryResult;
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.StorageState;
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.StorageStateChange;
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.Connector;
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.QueuePayload;
import dev.openfeature.flagd.grpc.sync.Sync;
import dev.openfeature.sdk.ImmutableStructure;
import dev.openfeature.sdk.Structure;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@SuppressFBWarnings(value={"EI_EXPOSE_REP"}, justification="Feature flag comes as a Json configuration, hence they must be exposed")
public class FlagStore
implements Storage {
    @SuppressFBWarnings(justification="generated code")
    @Generated
    private static final Logger log = LoggerFactory.getLogger(FlagStore.class);
    private final ReentrantReadWriteLock sync = new ReentrantReadWriteLock();
    private final ReentrantReadWriteLock.ReadLock readLock = this.sync.readLock();
    private final ReentrantReadWriteLock.WriteLock writeLock = this.sync.writeLock();
    private final AtomicBoolean shutdown = new AtomicBoolean(false);
    private final BlockingQueue<StorageStateChange> stateBlockingQueue = new LinkedBlockingQueue<StorageStateChange>(1);
    private final Map<String, FeatureFlag> flags = new HashMap<String, FeatureFlag>();
    private final Map<String, Object> flagSetMetadata = new HashMap<String, Object>();
    private final Connector connector;
    private final boolean throwIfInvalid;

    public FlagStore(Connector connector) {
        this(connector, false);
    }

    public FlagStore(Connector connector, boolean throwIfInvalid) {
        this.connector = connector;
        this.throwIfInvalid = throwIfInvalid;
    }

    @Override
    public void init() throws Exception {
        this.connector.init();
        Thread streamer = new Thread(() -> {
            try {
                this.streamerListener(this.connector);
            }
            catch (InterruptedException e) {
                log.warn("connection listener failed", (Throwable)e);
                Thread.currentThread().interrupt();
            }
        });
        streamer.setDaemon(true);
        streamer.start();
    }

    @Override
    public void shutdown() throws InterruptedException {
        if (this.shutdown.getAndSet(true)) {
            return;
        }
        this.connector.shutdown();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public StorageQueryResult getFlag(String key) {
        HashMap<String, Object> metadata;
        FeatureFlag flag;
        this.readLock.lock();
        try {
            flag = this.flags.get(key);
            metadata = new HashMap<String, Object>(this.flagSetMetadata);
        }
        finally {
            this.readLock.unlock();
        }
        return new StorageQueryResult(flag, metadata);
    }

    @Override
    public BlockingQueue<StorageStateChange> getStateQueue() {
        return this.stateBlockingQueue;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void streamerListener(Connector connector) throws InterruptedException {
        BlockingQueue<QueuePayload> streamPayloads = connector.getStream();
        block9: while (!this.shutdown.get()) {
            QueuePayload payload = streamPayloads.take();
            switch (payload.getType()) {
                case DATA: {
                    try {
                        List<String> changedFlagsKeys;
                        ParsingResult parsingResult = FlagParser.parseString(payload.getFlagData(), this.throwIfInvalid);
                        Map<String, FeatureFlag> flagMap = parsingResult.getFlags();
                        Map<String, Object> flagSetMetadataMap = parsingResult.getFlagSetMetadata();
                        Structure metadata = this.parseSyncMetadata(payload.getMetadataResponse());
                        this.writeLock.lock();
                        try {
                            changedFlagsKeys = this.getChangedFlagsKeys(flagMap);
                            this.flags.clear();
                            this.flags.putAll(flagMap);
                            this.flagSetMetadata.clear();
                            this.flagSetMetadata.putAll(flagSetMetadataMap);
                        }
                        finally {
                            this.writeLock.unlock();
                        }
                        if (this.stateBlockingQueue.offer(new StorageStateChange(StorageState.OK, changedFlagsKeys, metadata))) continue block9;
                        log.warn("Failed to convey OK satus, queue is full");
                    }
                    catch (Throwable e) {
                        log.warn("Invalid flag sync payload from connector", e);
                        if (this.stateBlockingQueue.offer(new StorageStateChange(StorageState.STALE))) continue block9;
                        log.warn("Failed to convey STALE satus, queue is full");
                    }
                    break;
                }
                case ERROR: {
                    if (this.stateBlockingQueue.offer(new StorageStateChange(StorageState.ERROR))) continue block9;
                    log.warn("Failed to convey ERROR satus, queue is full");
                    break;
                }
                default: {
                    log.info(String.format("Payload with unknown type: %s", new Object[]{payload.getType()}));
                }
            }
        }
        log.info("Shutting down store stream listener");
    }

    private Structure parseSyncMetadata(Sync.GetMetadataResponse metadataResponse) {
        try {
            return Convert.convertProtobufMapToStructure(metadataResponse.getMetadata().getFieldsMap());
        }
        catch (Exception exception) {
            log.error("Failed to parse metadataResponse, provider metadata may not be up-to-date");
            return new ImmutableStructure();
        }
    }

    private List<String> getChangedFlagsKeys(Map<String, FeatureFlag> newFlags) {
        HashMap changedFlags = new HashMap();
        HashMap addedFeatureFlags = new HashMap();
        HashMap removedFeatureFlags = new HashMap();
        HashMap updatedFeatureFlags = new HashMap();
        newFlags.forEach((key, value) -> {
            if (!this.flags.containsKey(key)) {
                addedFeatureFlags.put(key, value);
            } else if (this.flags.containsKey(key) && !value.equals(this.flags.get(key))) {
                updatedFeatureFlags.put(key, value);
            }
        });
        this.flags.forEach((key, value) -> {
            if (!newFlags.containsKey(key)) {
                removedFeatureFlags.put(key, value);
            }
        });
        changedFlags.putAll(addedFeatureFlags);
        changedFlags.putAll(removedFeatureFlags);
        changedFlags.putAll(updatedFeatureFlags);
        return changedFlags.keySet().stream().collect(Collectors.toList());
    }
}

