/*
 * Decompiled with CFR 0.152.
 */
package io.nats.client.impl;

import io.nats.client.JetStreamApiException;
import io.nats.client.JetStreamManagement;
import io.nats.client.JetStreamSubscription;
import io.nats.client.KeyValue;
import io.nats.client.KeyValueOptions;
import io.nats.client.Message;
import io.nats.client.MessageHandler;
import io.nats.client.PurgeOptions;
import io.nats.client.PushSubscribeOptions;
import io.nats.client.api.AckPolicy;
import io.nats.client.api.ConsumerConfiguration;
import io.nats.client.api.DeliverPolicy;
import io.nats.client.api.KeyValueEntry;
import io.nats.client.api.KeyValueOperation;
import io.nats.client.api.KeyValuePurgeOptions;
import io.nats.client.api.KeyValueStatus;
import io.nats.client.api.KeyValueWatchOption;
import io.nats.client.api.KeyValueWatcher;
import io.nats.client.api.PublishAck;
import io.nats.client.impl.Headers;
import io.nats.client.impl.NatsConnection;
import io.nats.client.impl.NatsJetStream;
import io.nats.client.impl.NatsJetStreamManagement;
import io.nats.client.impl.NatsKeyValueWatchSubscription;
import io.nats.client.impl.NatsMessage;
import io.nats.client.support.DateTimeUtils;
import io.nats.client.support.NatsKeyValueUtil;
import io.nats.client.support.Validator;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.List;

public class NatsKeyValue
implements KeyValue {
    final NatsJetStream js;
    final JetStreamManagement jsm;
    private final String bucketName;
    private final String streamName;
    private final String streamSubject;
    private final String rawKeyPrefix;
    private final String pubSubKeyPrefix;

    NatsKeyValue(NatsConnection connection, String bucketName, KeyValueOptions kvo) throws IOException {
        this.bucketName = Validator.validateKvBucketNameRequired(bucketName);
        this.streamName = NatsKeyValueUtil.toStreamName(bucketName);
        this.streamSubject = NatsKeyValueUtil.toStreamSubject(bucketName);
        this.rawKeyPrefix = NatsKeyValueUtil.toKeyPrefix(bucketName);
        if (kvo == null) {
            this.js = new NatsJetStream(connection, null);
            this.jsm = new NatsJetStreamManagement(connection, null);
            this.pubSubKeyPrefix = this.rawKeyPrefix;
        } else {
            this.js = new NatsJetStream(connection, kvo.getJetStreamOptions());
            this.jsm = new NatsJetStreamManagement(connection, kvo.getJetStreamOptions());
            this.pubSubKeyPrefix = kvo.getJetStreamOptions().isDefaultPrefix() ? this.rawKeyPrefix : kvo.getJetStreamOptions().getPrefix() + this.rawKeyPrefix;
        }
    }

    String rawKeySubject(String key) {
        return this.rawKeyPrefix + key;
    }

    String pubSubKeySubject(String key) {
        return this.pubSubKeyPrefix + key;
    }

    @Override
    public String getBucketName() {
        return this.bucketName;
    }

    String getStreamName() {
        return this.streamName;
    }

    @Override
    public KeyValueEntry get(String key) throws IOException, JetStreamApiException {
        return this._kvGetLastMessage(Validator.validateNonWildcardKvKeyRequired(key));
    }

    @Override
    public KeyValueEntry get(String key, long revision) throws IOException, JetStreamApiException {
        return this._kvGetMessage(Validator.validateNonWildcardKvKeyRequired(key), revision);
    }

    KeyValueEntry _kvGetLastMessage(String key) throws IOException, JetStreamApiException {
        try {
            return new KeyValueEntry(this.jsm.getLastMessage(this.streamName, this.rawKeySubject(key)));
        }
        catch (JetStreamApiException jsae) {
            if (jsae.getApiErrorCode() == 10037) {
                return null;
            }
            throw jsae;
        }
    }

    KeyValueEntry _kvGetMessage(String key, long revision) throws IOException, JetStreamApiException {
        try {
            KeyValueEntry kve = new KeyValueEntry(this.jsm.getMessage(this.streamName, revision));
            return key.equals(kve.getKey()) ? kve : null;
        }
        catch (JetStreamApiException jsae) {
            if (jsae.getApiErrorCode() == 10037) {
                return null;
            }
            throw jsae;
        }
    }

    @Override
    public long put(String key, byte[] value) throws IOException, JetStreamApiException {
        return this._publishWithNonWildcardKey(key, value, null).getSeqno();
    }

    @Override
    public long put(String key, String value) throws IOException, JetStreamApiException {
        return this.put(key, value.getBytes(StandardCharsets.UTF_8));
    }

    @Override
    public long put(String key, Number value) throws IOException, JetStreamApiException {
        return this.put(key, value.toString().getBytes(StandardCharsets.US_ASCII));
    }

    @Override
    public long create(String key, byte[] value) throws IOException, JetStreamApiException {
        Validator.validateNonWildcardKvKeyRequired(key);
        try {
            return this.update(key, value, 0L);
        }
        catch (JetStreamApiException e) {
            KeyValueEntry kve;
            if (e.getApiErrorCode() == 10071 && (kve = this._kvGetLastMessage(key)) != null && kve.getOperation() != KeyValueOperation.PUT) {
                return this.update(key, value, kve.getRevision());
            }
            throw e;
        }
    }

    @Override
    public long update(String key, byte[] value, long expectedRevision) throws IOException, JetStreamApiException {
        Validator.validateNonWildcardKvKeyRequired(key);
        Headers h = new Headers().add("Nats-Expected-Last-Subject-Sequence", Long.toString(expectedRevision));
        return this._publishWithNonWildcardKey(key, value, h).getSeqno();
    }

    @Override
    public void delete(String key) throws IOException, JetStreamApiException {
        Validator.validateNonWildcardKvKeyRequired(key);
        this._publishWithNonWildcardKey(key, null, NatsKeyValueUtil.DELETE_HEADERS);
    }

    @Override
    public void purge(String key) throws IOException, JetStreamApiException {
        this._publishWithNonWildcardKey(key, null, NatsKeyValueUtil.PURGE_HEADERS);
    }

    private PublishAck _publishWithNonWildcardKey(String key, byte[] data, Headers h) throws IOException, JetStreamApiException {
        Validator.validateNonWildcardKvKeyRequired(key);
        return this.js.publish(NatsMessage.builder().subject(this.pubSubKeySubject(key)).data(data).headers(h).build());
    }

    @Override
    public NatsKeyValueWatchSubscription watch(String key, KeyValueWatcher watcher, KeyValueWatchOption ... watchOptions) throws IOException, JetStreamApiException, InterruptedException {
        Validator.validateKvKeyWildcardAllowedRequired(key);
        Validator.validateNotNull(watcher, "Watcher is required");
        return new NatsKeyValueWatchSubscription(this, key, watcher, watchOptions);
    }

    @Override
    public NatsKeyValueWatchSubscription watchAll(KeyValueWatcher watcher, KeyValueWatchOption ... watchOptions) throws IOException, JetStreamApiException, InterruptedException {
        Validator.validateNotNull(watcher, "Watcher is required");
        return new NatsKeyValueWatchSubscription(this, ">", watcher, watchOptions);
    }

    @Override
    public List<String> keys() throws IOException, JetStreamApiException, InterruptedException {
        ArrayList<String> list = new ArrayList<String>();
        this.visitSubject(this.rawKeySubject(">"), DeliverPolicy.LastPerSubject, true, false, m -> {
            KeyValueOperation op = NatsKeyValueUtil.getOperation(m.getHeaders());
            if (op == KeyValueOperation.PUT) {
                list.add(new NatsKeyValueUtil.BucketAndKey((Message)m).key);
            }
        });
        return list;
    }

    @Override
    public List<KeyValueEntry> history(String key) throws IOException, JetStreamApiException, InterruptedException {
        Validator.validateNonWildcardKvKeyRequired(key);
        ArrayList<KeyValueEntry> list = new ArrayList<KeyValueEntry>();
        this.visitSubject(this.rawKeySubject(key), DeliverPolicy.All, false, true, m -> list.add(new KeyValueEntry(m)));
        return list;
    }

    @Override
    public void purgeDeletes() throws IOException, JetStreamApiException, InterruptedException {
        this.purgeDeletes(null);
    }

    @Override
    public void purgeDeletes(KeyValuePurgeOptions options) throws IOException, JetStreamApiException, InterruptedException {
        long dmThresh;
        long l = dmThresh = options == null ? KeyValuePurgeOptions.DEFAULT_THRESHOLD_MILLIS : options.getDeleteMarkersThresholdMillis();
        ZonedDateTime limit = dmThresh < 0L ? DateTimeUtils.fromNow(600000L) : (dmThresh == 0L ? DateTimeUtils.fromNow(KeyValuePurgeOptions.DEFAULT_THRESHOLD_MILLIS) : DateTimeUtils.fromNow(-dmThresh));
        ArrayList keep0List = new ArrayList();
        ArrayList keep1List = new ArrayList();
        this.visitSubject(this.streamSubject, DeliverPolicy.LastPerSubject, true, false, m -> {
            KeyValueEntry kve = new KeyValueEntry(m);
            if (kve.getOperation() != KeyValueOperation.PUT) {
                if (kve.getCreated().isAfter(limit)) {
                    keep1List.add(new NatsKeyValueUtil.BucketAndKey((Message)m).key);
                } else {
                    keep0List.add(new NatsKeyValueUtil.BucketAndKey((Message)m).key);
                }
            }
        });
        for (String key : keep0List) {
            this.jsm.purgeStream(this.streamName, PurgeOptions.subject(this.rawKeySubject(key)));
        }
        for (String key : keep1List) {
            PurgeOptions po = PurgeOptions.builder().subject(this.rawKeySubject(key)).keep(1L).build();
            this.jsm.purgeStream(this.streamName, po);
        }
    }

    @Override
    public KeyValueStatus getStatus() throws IOException, JetStreamApiException, InterruptedException {
        return new KeyValueStatus(this.jsm.getStreamInfo(this.streamName));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void visitSubject(String subject, DeliverPolicy deliverPolicy, boolean headersOnly, boolean ordered, MessageHandler handler) throws IOException, JetStreamApiException, InterruptedException {
        PushSubscribeOptions pso = ((PushSubscribeOptions.Builder)PushSubscribeOptions.builder().ordered(ordered).configuration(ConsumerConfiguration.builder().ackPolicy(AckPolicy.None).deliverPolicy(deliverPolicy).headersOnly(headersOnly).build())).build();
        Duration timeout = this.js.jso.getRequestTimeout();
        JetStreamSubscription sub = this.js.subscribe(subject, pso);
        try {
            boolean lastWasNull = false;
            long pending = sub.getConsumerInfo().getCalculatedPending();
            while (pending > 0L) {
                Message m = sub.nextMessage(timeout);
                if (m == null) {
                    if (lastWasNull) {
                        return;
                    }
                    lastWasNull = true;
                    continue;
                }
                handler.onMessage(m);
                if (--pending == 0L) {
                    return;
                }
                lastWasNull = false;
            }
        }
        finally {
            sub.unsubscribe();
        }
    }
}

