/*
 * Decompiled with CFR 0.152.
 */
package io.nats.client.impl;

import io.nats.client.JetStream;
import io.nats.client.JetStreamApiException;
import io.nats.client.JetStreamManagement;
import io.nats.client.JetStreamOptions;
import io.nats.client.JetStreamSubscription;
import io.nats.client.KeyValueManagement;
import io.nats.client.Message;
import io.nats.client.PurgeOptions;
import io.nats.client.PushSubscribeOptions;
import io.nats.client.api.AckPolicy;
import io.nats.client.api.BucketConfiguration;
import io.nats.client.api.BucketInfo;
import io.nats.client.api.ConsumerConfiguration;
import io.nats.client.api.KvEntry;
import io.nats.client.api.KvEntryHandler;
import io.nats.client.api.PurgeResponse;
import io.nats.client.impl.NatsConnection;
import io.nats.client.impl.NatsJetStream;
import io.nats.client.impl.NatsJetStreamImplBase;
import io.nats.client.impl.NatsJetStreamManagement;
import io.nats.client.support.NatsKeyValueUtil;
import io.nats.client.support.Validator;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

public class NatsKeyValueManagement
extends NatsJetStreamImplBase
implements KeyValueManagement {
    private final JetStreamManagement jsm;
    private final JetStream js;

    public NatsKeyValueManagement(NatsConnection connection, JetStreamOptions jsOptions) throws IOException {
        super(connection, jsOptions);
        this.jsm = new NatsJetStreamManagement(connection, jsOptions);
        this.js = new NatsJetStream(connection, jsOptions);
    }

    @Override
    public BucketInfo createBucket(BucketConfiguration config) throws IOException, JetStreamApiException {
        return new BucketInfo(this.jsm.addStream(config.getBackingConfig()));
    }

    @Override
    public boolean deleteBucket(String bucketName) throws IOException, JetStreamApiException {
        Validator.validateBucketNameRequired(bucketName);
        return this.jsm.deleteStream(NatsKeyValueUtil.streamName(bucketName));
    }

    @Override
    public BucketInfo getBucketInfo(String bucketName) throws IOException, JetStreamApiException {
        Validator.validateBucketNameRequired(bucketName);
        return new BucketInfo(this.jsm.getStreamInfo(NatsKeyValueUtil.streamName(bucketName)));
    }

    @Override
    public PurgeResponse purgeBucket(String bucketName) throws IOException, JetStreamApiException {
        Validator.validateBucketNameRequired(bucketName);
        return this.jsm.purgeStream(NatsKeyValueUtil.streamName(bucketName));
    }

    @Override
    public PurgeResponse purgeKey(String bucketName, String key) throws IOException, JetStreamApiException {
        Validator.validateBucketNameRequired(bucketName);
        Validator.validateKeyRequired(key);
        PurgeOptions options = PurgeOptions.builder().subject(NatsKeyValueUtil.keySubject(bucketName, key)).build();
        return this.jsm.purgeStream(NatsKeyValueUtil.streamName(bucketName), options);
    }

    @Override
    public List<KvEntry> getHistory(String bucketName, String key) throws IOException, JetStreamApiException, InterruptedException {
        Validator.validateBucketNameRequired(bucketName);
        Validator.validateKeyRequired(key);
        ArrayList<KvEntry> list = new ArrayList<KvEntry>();
        this.visit(NatsKeyValueUtil.keySubject(bucketName, key), list::add);
        return list;
    }

    @Override
    public Set<String> keys(String bucketName) throws IOException, JetStreamApiException, InterruptedException {
        Validator.validateBucketNameRequired(bucketName);
        HashSet<String> set = new HashSet<String>();
        this.visit(NatsKeyValueUtil.streamSubject(bucketName), kve -> set.add(kve.getKey()));
        return set;
    }

    private void visit(String subject, KvEntryHandler handler) throws IOException, JetStreamApiException, InterruptedException {
        PushSubscribeOptions pso = ((PushSubscribeOptions.Builder)PushSubscribeOptions.builder().configuration(ConsumerConfiguration.builder().ackPolicy(AckPolicy.None).build())).build();
        JetStreamSubscription sub = this.js.subscribe(subject, pso);
        Message m = sub.nextMessage(Duration.ofMillis(1000L));
        while (m != null) {
            handler.handle(new KvEntry(m));
            m = sub.nextMessage(Duration.ofMillis(100L));
        }
        sub.unsubscribe();
    }

    @Override
    public List<String> bucketsNames() throws IOException, JetStreamApiException, InterruptedException {
        ArrayList<String> buckets = new ArrayList<String>();
        List<String> all = this.jsm.getStreamNames();
        for (String a : all) {
            if (!a.startsWith("KV_")) continue;
            buckets.add(a.substring(NatsKeyValueUtil.KV_STREAM_PREFIX_LEN));
        }
        return buckets;
    }
}

