/*
 * Decompiled with CFR 0.152.
 */
package io.nats.client.impl;

import io.nats.client.JetStreamApiException;
import io.nats.client.KeyValue;
import io.nats.client.KeyValueOptions;
import io.nats.client.Message;
import io.nats.client.MessageTtl;
import io.nats.client.PublishOptions;
import io.nats.client.PurgeOptions;
import io.nats.client.api.DeliverPolicy;
import io.nats.client.api.KeyResult;
import io.nats.client.api.KeyValueEntry;
import io.nats.client.api.KeyValueOperation;
import io.nats.client.api.KeyValuePurgeOptions;
import io.nats.client.api.KeyValueStatus;
import io.nats.client.api.KeyValueWatchOption;
import io.nats.client.api.KeyValueWatcher;
import io.nats.client.api.MessageInfo;
import io.nats.client.api.Mirror;
import io.nats.client.api.PublishAck;
import io.nats.client.api.StreamInfo;
import io.nats.client.impl.Headers;
import io.nats.client.impl.NatsConnection;
import io.nats.client.impl.NatsFeatureBase;
import io.nats.client.impl.NatsKeyValueWatchSubscription;
import io.nats.client.impl.NatsMessage;
import io.nats.client.support.DateTimeUtils;
import io.nats.client.support.NatsKeyValueUtil;
import io.nats.client.support.Validator;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;

public class NatsKeyValue
extends NatsFeatureBase
implements KeyValue {
    private final String bucketName;
    private final String streamSubject;
    private final String readPrefix;
    private final String writePrefix;

    NatsKeyValue(NatsConnection connection, String bucketName, KeyValueOptions kvo) throws IOException {
        super(connection, kvo);
        String writeTemp;
        StreamInfo si;
        this.bucketName = Validator.validateBucketName(bucketName, true);
        this.streamName = NatsKeyValueUtil.toStreamName(bucketName);
        try {
            si = this.jsm.getStreamInfo(this.streamName);
        }
        catch (JetStreamApiException e) {
            throw new IOException(e);
        }
        this.streamSubject = NatsKeyValueUtil.toStreamSubject(bucketName);
        String readTemp = NatsKeyValueUtil.toKeyPrefix(bucketName);
        Mirror m = si.getConfiguration().getMirror();
        if (m != null) {
            String mExtApi;
            String bName = NatsKeyValueUtil.trimPrefix(m.getName());
            String string = mExtApi = m.getExternal() == null ? null : m.getExternal().getApi();
            if (mExtApi == null) {
                writeTemp = NatsKeyValueUtil.toKeyPrefix(bName);
            } else {
                readTemp = NatsKeyValueUtil.toKeyPrefix(bName);
                writeTemp = mExtApi + "." + NatsKeyValueUtil.toKeyPrefix(bName);
            }
        } else {
            writeTemp = kvo == null || kvo.getJetStreamOptions().isDefaultPrefix() ? readTemp : kvo.getJetStreamOptions().getPrefix() + readTemp;
        }
        this.readPrefix = readTemp;
        this.writePrefix = writeTemp;
    }

    String readSubject(String key) {
        return this.readPrefix + key;
    }

    String writeSubject(String key) {
        return this.writePrefix + key;
    }

    @Override
    public String getBucketName() {
        return this.bucketName;
    }

    @Override
    public KeyValueEntry get(String key) throws IOException, JetStreamApiException {
        return this.existingOnly(this._get(Validator.validateNonWildcardKvKeyRequired(key)));
    }

    @Override
    public KeyValueEntry get(String key, long revision) throws IOException, JetStreamApiException {
        return this.existingOnly(this._get(Validator.validateNonWildcardKvKeyRequired(key), revision));
    }

    KeyValueEntry existingOnly(KeyValueEntry kve) {
        return kve == null || kve.getOperation() != KeyValueOperation.PUT ? null : kve;
    }

    KeyValueEntry _get(String key) throws IOException, JetStreamApiException {
        MessageInfo mi = this._getLast(this.readSubject(key));
        return mi == null ? null : new KeyValueEntry(mi);
    }

    KeyValueEntry _get(String key, long revision) throws IOException, JetStreamApiException {
        KeyValueEntry kve;
        MessageInfo mi = this._getBySeq(revision);
        if (mi != null && key.equals((kve = new KeyValueEntry(mi)).getKey())) {
            return kve;
        }
        return null;
    }

    @Override
    public long put(String key, byte[] value) throws IOException, JetStreamApiException {
        return this._write(key, value, null, null).getSeqno();
    }

    @Override
    public long put(String key, String value) throws IOException, JetStreamApiException {
        return this._write(key, value.getBytes(StandardCharsets.UTF_8), null, null).getSeqno();
    }

    @Override
    public long put(String key, Number value) throws IOException, JetStreamApiException {
        return this._write(key, value.toString().getBytes(StandardCharsets.US_ASCII), null, null).getSeqno();
    }

    @Override
    public long create(String key, byte[] value) throws IOException, JetStreamApiException {
        return this.create(key, value, null);
    }

    @Override
    public long create(String key, byte[] value, MessageTtl messageTtl) throws IOException, JetStreamApiException {
        Validator.validateNonWildcardKvKeyRequired(key);
        try {
            return this._update(key, value, 0L, messageTtl);
        }
        catch (JetStreamApiException e) {
            KeyValueEntry kve;
            if (e.getApiErrorCode() == 10071 && (kve = this._get(key)) != null && kve.getOperation() != KeyValueOperation.PUT) {
                return this._update(key, value, kve.getRevision(), messageTtl);
            }
            throw e;
        }
    }

    @Override
    public long update(String key, byte[] value, long expectedRevision) throws IOException, JetStreamApiException {
        Validator.validateNonWildcardKvKeyRequired(key);
        return this._update(key, value, expectedRevision, null);
    }

    private long _update(String key, byte[] value, long expectedRevision, MessageTtl messageTtl) throws IOException, JetStreamApiException {
        return this._write(key, value, null, NatsKeyValueUtil.getPublishOptions(expectedRevision, messageTtl)).getSeqno();
    }

    @Override
    public long update(String key, String value, long expectedRevision) throws IOException, JetStreamApiException {
        return this.update(key, value.getBytes(StandardCharsets.UTF_8), expectedRevision);
    }

    @Override
    public void delete(String key) throws IOException, JetStreamApiException {
        this._write(key, null, NatsKeyValueUtil.getDeleteHeaders(), null);
    }

    @Override
    public void delete(String key, long expectedRevision) throws IOException, JetStreamApiException {
        this._write(key, null, NatsKeyValueUtil.getDeleteHeaders(), NatsKeyValueUtil.getPublishOptions(expectedRevision, null));
    }

    @Override
    public void purge(String key) throws IOException, JetStreamApiException {
        this._write(key, null, NatsKeyValueUtil.getPurgeHeaders(), null);
    }

    @Override
    public void purge(String key, long expectedRevision) throws IOException, JetStreamApiException {
        this._write(key, null, NatsKeyValueUtil.getPurgeHeaders(), NatsKeyValueUtil.getPublishOptions(expectedRevision, null));
    }

    private PublishAck _write(String key, byte[] data, Headers h, PublishOptions popts) throws IOException, JetStreamApiException {
        Validator.validateNonWildcardKvKeyRequired(key);
        return this.js.publish(NatsMessage.builder().subject(this.writeSubject(key)).data(data).headers(h).build(), popts);
    }

    @Override
    public NatsKeyValueWatchSubscription watch(String key, KeyValueWatcher watcher, KeyValueWatchOption ... watchOptions) throws IOException, JetStreamApiException, InterruptedException {
        Validator.validateKvKeyWildcardAllowedRequired(key);
        Validator.validateNotNull(watcher, "Watcher is required");
        return new NatsKeyValueWatchSubscription(this, Collections.singletonList(key), watcher, -1L, watchOptions);
    }

    @Override
    public NatsKeyValueWatchSubscription watch(String key, KeyValueWatcher watcher, long fromRevision, KeyValueWatchOption ... watchOptions) throws IOException, JetStreamApiException, InterruptedException {
        Validator.validateKvKeyWildcardAllowedRequired(key);
        Validator.validateNotNull(watcher, "Watcher is required");
        return new NatsKeyValueWatchSubscription(this, Collections.singletonList(key), watcher, fromRevision, watchOptions);
    }

    @Override
    public NatsKeyValueWatchSubscription watch(List<String> keys, KeyValueWatcher watcher, KeyValueWatchOption ... watchOptions) throws IOException, JetStreamApiException, InterruptedException {
        Validator.validateKvKeysWildcardAllowedRequired(keys);
        Validator.validateNotNull(watcher, "Watcher is required");
        return new NatsKeyValueWatchSubscription(this, keys, watcher, -1L, watchOptions);
    }

    @Override
    public NatsKeyValueWatchSubscription watch(List<String> keys, KeyValueWatcher watcher, long fromRevision, KeyValueWatchOption ... watchOptions) throws IOException, JetStreamApiException, InterruptedException {
        Validator.validateKvKeysWildcardAllowedRequired(keys);
        Validator.validateNotNull(watcher, "Watcher is required");
        return new NatsKeyValueWatchSubscription(this, keys, watcher, fromRevision, watchOptions);
    }

    @Override
    public NatsKeyValueWatchSubscription watchAll(KeyValueWatcher watcher, KeyValueWatchOption ... watchOptions) throws IOException, JetStreamApiException, InterruptedException {
        return new NatsKeyValueWatchSubscription(this, Collections.singletonList(">"), watcher, -1L, watchOptions);
    }

    @Override
    public NatsKeyValueWatchSubscription watchAll(KeyValueWatcher watcher, long fromRevision, KeyValueWatchOption ... watchOptions) throws IOException, JetStreamApiException, InterruptedException {
        return new NatsKeyValueWatchSubscription(this, Collections.singletonList(">"), watcher, fromRevision, watchOptions);
    }

    @Override
    public List<String> keys() throws IOException, JetStreamApiException, InterruptedException {
        return this._keys(Collections.singletonList(this.readSubject(">")));
    }

    @Override
    public List<String> keys(String filter) throws IOException, JetStreamApiException, InterruptedException {
        return this._keys(Collections.singletonList(this.readSubject(filter)));
    }

    @Override
    public List<String> keys(List<String> filters) throws IOException, JetStreamApiException, InterruptedException {
        ArrayList<String> readSubjectFilters = new ArrayList<String>(filters.size());
        for (String f : filters) {
            readSubjectFilters.add(this.readSubject(f));
        }
        return this._keys(readSubjectFilters);
    }

    private List<String> _keys(List<String> readSubjectFilters) throws IOException, JetStreamApiException, InterruptedException {
        ArrayList<String> list = new ArrayList<String>();
        this.visitSubject(readSubjectFilters, DeliverPolicy.LastPerSubject, true, false, (Message m) -> {
            KeyValueOperation op = NatsKeyValueUtil.getOperation(m.getHeaders());
            if (op == KeyValueOperation.PUT) {
                list.add(new NatsKeyValueUtil.BucketAndKey((Message)m).key);
            }
        });
        return list;
    }

    @Override
    public LinkedBlockingQueue<KeyResult> consumeKeys() {
        return this._consumeKeys(Collections.singletonList(this.readSubject(">")));
    }

    @Override
    public LinkedBlockingQueue<KeyResult> consumeKeys(String filter) {
        return this._consumeKeys(Collections.singletonList(this.readSubject(filter)));
    }

    @Override
    public LinkedBlockingQueue<KeyResult> consumeKeys(List<String> filters) {
        ArrayList<String> readSubjectFilters = new ArrayList<String>(filters.size());
        for (String f : filters) {
            readSubjectFilters.add(this.readSubject(f));
        }
        return this._consumeKeys(readSubjectFilters);
    }

    private LinkedBlockingQueue<KeyResult> _consumeKeys(List<String> readSubjectFilters) {
        LinkedBlockingQueue<KeyResult> q = new LinkedBlockingQueue<KeyResult>();
        this.js.conn.getOptions().getExecutor().submit(() -> {
            try {
                this.visitSubject(readSubjectFilters, DeliverPolicy.LastPerSubject, true, false, (Message m) -> {
                    KeyValueOperation op = NatsKeyValueUtil.getOperation(m.getHeaders());
                    if (op == KeyValueOperation.PUT) {
                        q.offer(new KeyResult(new NatsKeyValueUtil.BucketAndKey((Message)m).key));
                    }
                });
                q.offer(new KeyResult());
            }
            catch (JetStreamApiException | IOException e) {
                q.offer(new KeyResult(e));
            }
            catch (InterruptedException e) {
                q.offer(new KeyResult(e));
                Thread.currentThread().interrupt();
            }
        });
        return q;
    }

    @Override
    public List<KeyValueEntry> history(String key) throws IOException, JetStreamApiException, InterruptedException {
        Validator.validateNonWildcardKvKeyRequired(key);
        ArrayList<KeyValueEntry> list = new ArrayList<KeyValueEntry>();
        this.visitSubject(this.readSubject(key), DeliverPolicy.All, false, true, (Message m) -> list.add(new KeyValueEntry(m)));
        return list;
    }

    @Override
    public void purgeDeletes() throws IOException, JetStreamApiException, InterruptedException {
        this.purgeDeletes(null);
    }

    @Override
    public void purgeDeletes(KeyValuePurgeOptions options) throws IOException, JetStreamApiException, InterruptedException {
        long dmThresh;
        long l = dmThresh = options == null ? KeyValuePurgeOptions.DEFAULT_THRESHOLD_MILLIS : options.getDeleteMarkersThresholdMillis();
        ZonedDateTime limit = dmThresh < 0L ? DateTimeUtils.fromNow(600000L) : (dmThresh == 0L ? DateTimeUtils.fromNow(KeyValuePurgeOptions.DEFAULT_THRESHOLD_MILLIS) : DateTimeUtils.fromNow(-dmThresh));
        ArrayList keep0List = new ArrayList();
        ArrayList keep1List = new ArrayList();
        this.visitSubject(this.streamSubject, DeliverPolicy.LastPerSubject, true, false, (Message m) -> {
            KeyValueEntry kve = new KeyValueEntry(m);
            if (kve.getOperation() != KeyValueOperation.PUT) {
                if (kve.getCreated().isAfter(limit)) {
                    keep1List.add(new NatsKeyValueUtil.BucketAndKey((Message)m).key);
                } else {
                    keep0List.add(new NatsKeyValueUtil.BucketAndKey((Message)m).key);
                }
            }
        });
        for (String key : keep0List) {
            this.jsm.purgeStream(this.streamName, PurgeOptions.subject(this.readSubject(key)));
        }
        for (String key : keep1List) {
            PurgeOptions po = PurgeOptions.builder().subject(this.readSubject(key)).keep(1L).build();
            this.jsm.purgeStream(this.streamName, po);
        }
    }

    @Override
    public KeyValueStatus getStatus() throws IOException, JetStreamApiException, InterruptedException {
        return new KeyValueStatus(this.jsm.getStreamInfo(this.streamName));
    }
}

