/*
 * Decompiled with CFR 0.152.
 */
package io.nats.examples.jetstream;

import io.nats.client.Connection;
import io.nats.client.JetStream;
import io.nats.client.JetStreamApiException;
import io.nats.client.JetStreamManagement;
import io.nats.client.Nats;
import io.nats.client.Options;
import io.nats.client.PublishOptions;
import io.nats.client.api.MessageInfo;
import io.nats.client.api.PublishAck;
import io.nats.client.api.StorageType;
import io.nats.client.impl.ErrorListenerConsoleImpl;
import io.nats.examples.jetstream.NatsJsUtils;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import java.util.function.Function;

public class ResilientPublisher
implements Runnable {
    private final Connection nc;
    private final JetStreamManagement jsm;
    private final JetStream js;
    private final String stream;
    private final String subject;
    private final AtomicLong lastPub;
    private final AtomicBoolean keepGoing;
    private boolean expectationCheck;
    private long jitter;
    private long delay;
    private boolean reporting;
    private long reportFrequency;
    private Function<Long, byte[]> dataProvider;
    private BiConsumer<Connection, Long> beforePublish;
    private BiConsumer<Connection, PublishAck> afterPublish;
    private BiConsumer<Connection, Long> publishReporter;
    private BiConsumer<Connection, Exception> exceptionReporter;
    boolean lastPublishOk = false;

    public static void main(String[] args) {
        Options options = Options.builder().socketWriteTimeout(20000L).connectionListener((conn, type) -> System.out.println((Object)type)).errorListener(new ErrorListenerConsoleImpl()).build();
        try (Connection nc = Nats.connect(options);){
            JetStreamManagement jsm = nc.jetStreamManagement();
            NatsJsUtils.createOrReplaceStream(jsm, "js-stream", StorageType.Memory, "js-subject");
            ResilientPublisher rp = new ResilientPublisher(nc, jsm, "js-stream", "js-subject").basicDataPrefix("data").delay(1L).reportFrequency(1000L);
            Thread t = new Thread(rp);
            t.start();
            t.join();
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public ResilientPublisher(Connection nc, String subject) {
        this(nc, null, null, subject);
    }

    public ResilientPublisher(Connection nc, JetStreamManagement jsm, String stream, String subject) {
        this.nc = nc;
        if (jsm == null) {
            this.jsm = null;
            this.js = null;
            this.stream = null;
        } else {
            this.jsm = jsm;
            this.js = jsm.jetStream();
            this.stream = stream;
        }
        this.subject = subject;
        this.lastPub = new AtomicLong();
        this.keepGoing = new AtomicBoolean(true);
        this.basicDataPrefix(null);
        this.beforePublish(null);
        this.afterPublish(null);
        this.publishReporter(null);
        this.exceptionReporter(null);
    }

    public ResilientPublisher expectationCheck(boolean expectationCheck) {
        this.expectationCheck = expectationCheck;
        return this;
    }

    public ResilientPublisher jitter(long jitter) {
        this.jitter = jitter;
        return this;
    }

    public ResilientPublisher delay(long delay) {
        this.delay = delay;
        return this;
    }

    public ResilientPublisher reportFrequency(long reportFrequency) {
        this.reportFrequency = reportFrequency;
        this.reporting = reportFrequency > 0L;
        return this;
    }

    public ResilientPublisher basicDataPrefix(String prefix) {
        this.dataProvider = prefix == null ? l -> null : l -> (prefix + "-" + l).getBytes();
        return this;
    }

    public ResilientPublisher dataProvider(Function<Long, byte[]> dataProvider) {
        this.dataProvider = dataProvider == null ? l -> null : dataProvider;
        return this;
    }

    public ResilientPublisher beforePublish(BiConsumer<Connection, Long> beforePublish) {
        this.beforePublish = beforePublish == null ? (c, l) -> {} : beforePublish;
        return this;
    }

    public ResilientPublisher afterPublish(BiConsumer<Connection, PublishAck> afterPublish) {
        this.afterPublish = afterPublish == null ? (c, l) -> {
            if (!this.lastPublishOk) {
                NatsJsUtils.report("Publish Start/Resume: " + l);
                this.lastPublishOk = true;
            }
        } : afterPublish;
        return this;
    }

    public ResilientPublisher publishReporter(BiConsumer<Connection, Long> publishReporter) {
        this.publishReporter = publishReporter == null ? (c, l) -> NatsJsUtils.report("Published Id: " + l) : publishReporter;
        return this;
    }

    public ResilientPublisher exceptionReporter(BiConsumer<Connection, Exception> exceptionReporter) {
        this.exceptionReporter = exceptionReporter == null ? (c, e) -> {
            if (this.lastPublishOk) {
                NatsJsUtils.report("Publish Exception: " + e);
                this.lastPublishOk = false;
            }
        } : exceptionReporter;
        return this;
    }

    public void stop() {
        this.keepGoing.set(false);
    }

    public long getLastPub() {
        return this.lastPub.get();
    }

    @Override
    public void run() {
        Object lastEx = null;
        long reportAt = 0L;
        while (this.keepGoing.get()) {
            try {
                if (this.jitter > 0L) {
                    Thread.sleep(ThreadLocalRandom.current().nextLong(this.jitter));
                }
                if (this.delay > 0L) {
                    Thread.sleep(this.delay);
                }
                long lastPubId = this.lastPub.get();
                long pubId = this.lastPub.incrementAndGet();
                this.beforePublish.accept(this.nc, pubId);
                if (this.js == null) {
                    this.nc.publish(this.subject, this.dataProvider.apply(pubId));
                } else {
                    PublishOptions po = this.expectationCheck ? PublishOptions.builder().expectedLastSequence(lastPubId).build() : null;
                    PublishAck pa = this.js.publish(this.subject, this.dataProvider.apply(pubId), po);
                    this.afterPublish.accept(this.nc, pa);
                }
                if (this.reporting && (lastEx != null || System.currentTimeMillis() > reportAt)) {
                    this.publishReporter.accept(this.nc, pubId);
                    reportAt = System.currentTimeMillis() + this.reportFrequency;
                }
                lastEx = null;
            }
            catch (Exception e) {
                boolean diff;
                boolean bl = diff = lastEx == null;
                if (e instanceof JetStreamApiException) {
                    JetStreamApiException j = (JetStreamApiException)e;
                    if (j.getApiErrorCode() == 10071) {
                        try {
                            MessageInfo mi = this.jsm.getLastMessage(this.stream, this.subject);
                            this.lastPub.set(mi.getSeq());
                        }
                        catch (Exception exception) {
                            // empty catch block
                        }
                    }
                    if (!diff && lastEx instanceof JetStreamApiException) {
                        boolean bl2 = diff = j.getApiErrorCode() != ((JetStreamApiException)lastEx).getApiErrorCode();
                    }
                }
                if (!diff && lastEx.getClass().getSimpleName().equals(e.getClass().getSimpleName())) {
                    diff = true;
                }
                if (diff || System.currentTimeMillis() > reportAt) {
                    this.exceptionReporter.accept(this.nc, e);
                    reportAt = System.currentTimeMillis() + this.reportFrequency;
                }
                lastEx = e;
            }
        }
    }
}

