/*
 * Decompiled with CFR 0.152.
 */
package net.solarnetwork.node.runtime;

import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import net.solarnetwork.domain.datum.Datum;
import net.solarnetwork.domain.datum.DatumSamplesOperations;
import net.solarnetwork.node.dao.DatumDao;
import net.solarnetwork.node.domain.datum.NodeDatum;
import net.solarnetwork.node.service.DatumEvents;
import net.solarnetwork.node.service.DatumQueue;
import net.solarnetwork.node.service.support.BaseIdentifiable;
import net.solarnetwork.service.DatumFilterService;
import net.solarnetwork.service.OptionalService;
import net.solarnetwork.service.StaticOptionalService;
import net.solarnetwork.settings.SettingSpecifier;
import net.solarnetwork.settings.SettingSpecifierProvider;
import net.solarnetwork.settings.support.BasicTextFieldSettingSpecifier;
import net.solarnetwork.settings.support.BasicTitleSettingSpecifier;
import net.solarnetwork.util.DateUtils;
import net.solarnetwork.util.ObjectUtils;
import net.solarnetwork.util.StatCounter;
import org.osgi.service.event.Event;
import org.osgi.service.event.EventAdmin;

public class DefaultDatumQueue
extends BaseIdentifiable
implements DatumQueue,
SettingSpecifierProvider,
Thread.UncaughtExceptionHandler {
    public static final long DEFAULT_QUEUE_DELAY_MS = 200L;
    public static final long DEFAULT_STARTUP_DELAY_MS = 20000L;
    public static final int DEFAULT_STAT_LOG_FREQUENCY = 250;
    private final BlockingQueue<DelayedDatum> datumQueue = new DelayQueue<DelayedDatum>();
    private final List<ConsumerThread> consumers = new CopyOnWriteArrayList<ConsumerThread>();
    private final StatCounter stats = new StatCounter("DatumQueue", "", this.log, 250, (StatCounter.Stat[])QueueStats.values());
    private final DatumDao nodeDatumDao;
    private final OptionalService<EventAdmin> eventAdmin;
    private final OptionalService<Consumer<NodeDatum>> directConsumer;
    private long startupDelayMs = 20000L;
    private long queueDelayMs = 200L;
    private OptionalService.OptionalFilterableService<DatumFilterService> datumFilterService;
    private Thread.UncaughtExceptionHandler datumProcessorExceptionHandler;
    private long processorStartupDelayMs;
    private ProcessorThread datumProcessor;

    public DefaultDatumQueue(DatumDao nodeDatumDao, OptionalService<EventAdmin> eventAdmin) {
        this(nodeDatumDao, eventAdmin, (OptionalService<Consumer<NodeDatum>>)new StaticOptionalService(null));
    }

    public DefaultDatumQueue(DatumDao nodeDatumDao, OptionalService<EventAdmin> eventAdmin, OptionalService<Consumer<NodeDatum>> directConsumer) {
        this.nodeDatumDao = (DatumDao)ObjectUtils.requireNonNullArgument((Object)nodeDatumDao, (String)"nodeDatumDao");
        this.eventAdmin = (OptionalService)ObjectUtils.requireNonNullArgument(eventAdmin, (String)"eventAdmin");
        this.directConsumer = (OptionalService)ObjectUtils.requireNonNullArgument(directConsumer, (String)"directConsumer");
        this.processorStartupDelayMs = -1L;
    }

    public synchronized void startup() {
        if (this.datumProcessor != null) {
            this.datumProcessor.processing = false;
            this.datumProcessor.interrupt();
        }
        if (this.processorStartupDelayMs < 0L) {
            this.processorStartupDelayMs = this.getStartupDelayMs();
        }
        this.datumProcessor = new ProcessorThread();
        this.datumProcessor.setUncaughtExceptionHandler(this);
        this.datumProcessor.start();
        ArrayList<ConsumerThread> newConsumers = new ArrayList<ConsumerThread>();
        for (ConsumerThread t : this.consumers) {
            t.shutdown();
            ConsumerThread newThread = new ConsumerThread(t.consumer);
            newThread.start();
            newConsumers.add(newThread);
        }
        this.consumers.clear();
        this.consumers.addAll(newConsumers);
    }

    public synchronized void shutdown() {
        if (this.datumProcessor != null) {
            this.datumProcessor.processing = false;
            this.datumProcessor.interrupt();
            this.datumProcessor = null;
        }
        for (ConsumerThread t : this.consumers) {
            t.shutdown();
        }
    }

    private void postEvent(String topic, NodeDatum datum) {
        Event event;
        EventAdmin service = (EventAdmin)OptionalService.service(this.eventAdmin);
        if (service != null && (event = DatumEvents.datumEvent(topic, datum)) != null) {
            service.postEvent(event);
        }
    }

    private NodeDatum applyTransform(DelayedDatum event) {
        DatumFilterService xform = (DatumFilterService)OptionalService.service(this.datumFilterService);
        if (xform == null) {
            return event.datum;
        }
        DatumSamplesOperations in = event.datum.asSampleOperations();
        DatumSamplesOperations out = xform.filter((Datum)event.datum, in, new HashMap(4));
        if (out == null) {
            this.stats.incrementAndGet((StatCounter.Stat)QueueStats.Filtered);
            return null;
        }
        if (out == in) {
            return event.datum;
        }
        return event.datum.copyWithSamples(out);
    }

    private void persistDatum(NodeDatum result) {
        long start = System.currentTimeMillis();
        DatumDao dao = this.getNodeDatumDao();
        dao.storeDatum(result);
        this.stats.incrementAndGet((StatCounter.Stat)QueueStats.Persisted);
        this.stats.addAndGet((StatCounter.Stat)QueueStats.PersistingTimeTotal, System.currentTimeMillis() - start, true);
    }

    @Override
    public boolean offer(NodeDatum datum) {
        return this.offer(datum, true);
    }

    @Override
    public boolean offer(NodeDatum datum, boolean persist) {
        if (datum == null || datum.getSourceId() == null) {
            return false;
        }
        if (persist) {
            this.stats.incrementAndGet((StatCounter.Stat)QueueStats.Added);
        } else {
            this.stats.incrementAndGet((StatCounter.Stat)QueueStats.Captured);
        }
        return this.datumQueue.offer(new DelayedDatum(datum, this.queueDelayMs, persist));
    }

    @Override
    public synchronized void addConsumer(Consumer<NodeDatum> consumer) {
        for (ConsumerThread t : this.consumers) {
            if (t.consumer != consumer) continue;
            return;
        }
        ConsumerThread t = new ConsumerThread(consumer);
        this.consumers.add(t);
        if (this.datumProcessor != null) {
            t.start();
        }
    }

    @Override
    public synchronized void removeConsumer(Consumer<NodeDatum> consumer) {
        ConsumerThread threadToRemove = null;
        for (ConsumerThread t : this.consumers) {
            if (t.consumer != consumer) continue;
            threadToRemove = t;
            break;
        }
        if (threadToRemove != null) {
            this.consumers.remove(threadToRemove);
            threadToRemove.shutdown();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void uncaughtException(Thread t, Throwable e) {
        DefaultDatumQueue defaultDatumQueue = this;
        synchronized (defaultDatumQueue) {
            this.datumProcessor = null;
            this.startup();
        }
        if (this.datumProcessorExceptionHandler != null) {
            this.datumProcessorExceptionHandler.uncaughtException(t, e);
        }
    }

    public String getSettingUid() {
        return "net.solarnetwork.node.runtime.dq";
    }

    public List<SettingSpecifier> getSettingSpecifiers() {
        ArrayList<SettingSpecifier> result = new ArrayList<SettingSpecifier>(4);
        result.add((SettingSpecifier)new BasicTitleSettingSpecifier("status", this.getStatusMessage(), true, true));
        result.add((SettingSpecifier)new BasicTextFieldSettingSpecifier("queueDelayMs", String.valueOf(200L)));
        result.add((SettingSpecifier)new BasicTextFieldSettingSpecifier("transformServiceUid", null));
        return result;
    }

    private String getStatusMessage() {
        int len = QueueStats.values().length;
        Object[] params = new Object[len + 2];
        for (int i = 0; i < len; ++i) {
            params[i] = this.stats.get((StatCounter.Stat)QueueStats.values()[i]);
        }
        long processCount = (Long)params[QueueStats.Processed.ordinal()];
        long totalTime = (Long)params[QueueStats.ProcessingTimeTotal.ordinal()];
        params[QueueStats.ProcessingTimeTotal.ordinal()] = DateUtils.formatHoursMinutesSeconds((long)totalTime);
        params[params.length - 2] = processCount > 0L ? String.format("%dms", totalTime / processCount) : "-";
        long persistCount = (Long)params[QueueStats.Persisted.ordinal()];
        long persistTime = (Long)params[QueueStats.PersistingTimeTotal.ordinal()];
        params[QueueStats.PersistingTimeTotal.ordinal()] = DateUtils.formatHoursMinutesSeconds((long)persistTime);
        params[params.length - 1] = persistCount > 0L ? String.format("%dms", persistTime / persistCount) : "-";
        return this.getMessageSource().getMessage("status.msg", params, Locale.getDefault());
    }

    public long getStartupDelayMs() {
        return this.startupDelayMs;
    }

    public void setStartupDelayMs(long startupDelayMs) {
        this.startupDelayMs = startupDelayMs;
    }

    public long getQueueDelayMs() {
        return this.queueDelayMs;
    }

    public void setQueueDelayMs(long queueDelayMs) {
        this.queueDelayMs = queueDelayMs;
    }

    public OptionalService.OptionalFilterableService<DatumFilterService> getDatumFilterService() {
        return this.datumFilterService;
    }

    public void setDatumFilterService(OptionalService.OptionalFilterableService<DatumFilterService> transformService) {
        this.datumFilterService = transformService;
    }

    public String getTransformServiceUid() {
        return (String)this.datumFilterService.getPropertyValue("uid");
    }

    public void setTransformServiceUid(String uid) {
        this.datumFilterService.setPropertyFilter("uid", (Object)uid);
    }

    public DatumDao getNodeDatumDao() {
        return this.nodeDatumDao;
    }

    public int getStatisticLogFrequency() {
        return this.stats.getLogFrequency();
    }

    public void setStatisticLogFrequency(int logFrequency) {
        this.stats.setLogFrequency(logFrequency);
    }

    public Thread.UncaughtExceptionHandler getDatumProcessorExceptionHandler() {
        return this.datumProcessorExceptionHandler;
    }

    public void setDatumProcessorExceptionHandler(Thread.UncaughtExceptionHandler datumProcessorExceptionHandler) {
        this.datumProcessorExceptionHandler = datumProcessorExceptionHandler;
    }

    public StatCounter getStats() {
        return this.stats;
    }

    private static final class DelayedDatum
    implements Delayed {
        private final NodeDatum datum;
        private final long ts;
        private final boolean persist;

        private DelayedDatum(NodeDatum datum, long delayMs, boolean persist) {
            this.datum = datum;
            Instant date = datum.getTimestamp();
            long now = System.currentTimeMillis();
            this.ts = (date != null && date.toEpochMilli() <= now ? date.toEpochMilli() : now) + delayMs;
            this.persist = persist;
        }

        @Override
        public int compareTo(Delayed o) {
            DelayedDatum other = (DelayedDatum)o;
            int result = Long.compare(this.ts, other.ts);
            if (result == 0 && (result = this.datum.getSourceId().compareTo(other.datum.getSourceId())) == 0) {
                result = Boolean.compare(other.persist, this.persist);
            }
            return result;
        }

        @Override
        public long getDelay(TimeUnit unit) {
            long ms = this.ts - System.currentTimeMillis();
            return unit.convert(ms, TimeUnit.MILLISECONDS);
        }

        public String toString() {
            return "DelayedDatum{" + this.ts + "," + this.datum.getSourceId() + "," + this.persist + "}";
        }
    }

    public static enum QueueStats implements StatCounter.Stat
    {
        Added("added datum"),
        Captured("captured datum"),
        Processed("processed"),
        Duplicates("duplicates"),
        Filtered("filtered"),
        Persisted("persisted"),
        Errors("errors"),
        ProcessingTimeTotal("processing ms"),
        PersistingTimeTotal("persisting ms");

        private String description;

        private QueueStats(String description) {
            this.description = description;
        }

        public int getIndex() {
            return this.ordinal();
        }

        public String getDescription() {
            return this.description;
        }
    }

    private final class ProcessorThread
    extends Thread {
        private boolean processing;

        private ProcessorThread() {
            super("DatumQueue Processor");
            this.setDaemon(true);
            this.processing = true;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            try {
                if (DefaultDatumQueue.this.processorStartupDelayMs > 0L) {
                    try {
                        DefaultDatumQueue.this.log.info("Waiting {}s before starting DatumQueue processor", (Object)(DefaultDatumQueue.this.processorStartupDelayMs / 1000L));
                        Thread.sleep(DefaultDatumQueue.this.processorStartupDelayMs);
                    }
                    catch (Exception exception) {
                        // empty catch block
                    }
                    DefaultDatumQueue.this.processorStartupDelayMs = -1L;
                }
                DefaultDatumQueue.this.log.info("Starting DatumQueue processor {}", (Object)Integer.toHexString(this.hashCode()));
                DelayedDatum event = null;
                ArrayList<DelayedDatum> events = new ArrayList<DelayedDatum>(16);
                do {
                    try {
                        event = (DelayedDatum)DefaultDatumQueue.this.datumQueue.poll(60L, TimeUnit.SECONDS);
                        if (event == null) continue;
                        long ts = event.ts;
                        events.add(event);
                        while ((event = (DelayedDatum)DefaultDatumQueue.this.datumQueue.peek()) != null && event.ts == ts) {
                            events.add((DelayedDatum)DefaultDatumQueue.this.datumQueue.take());
                        }
                        if (events.size() > 1) {
                            Collections.sort(events);
                            if (DefaultDatumQueue.this.log.isTraceEnabled()) {
                                DefaultDatumQueue.this.log.trace("Datum taken: [\n  {}\n]", (Object)events.stream().map(Object::toString).collect(Collectors.joining(",\n  ")));
                            }
                        }
                    }
                    catch (InterruptedException ts) {
                        // empty catch block
                    }
                    Consumer dirConsumer = (Consumer)OptionalService.service((OptionalService)DefaultDatumQueue.this.directConsumer);
                    long start = System.currentTimeMillis();
                    int len = events.size();
                    block15: for (int i = 0; i < len; ++i) {
                        NodeDatum result;
                        event = (DelayedDatum)events.get(i);
                        if (!event.persist) {
                            for (int j = i - 1; j >= 0; --j) {
                                DelayedDatum p = (DelayedDatum)events.get(j);
                                if (p.datum == event.datum && p.persist) {
                                    DefaultDatumQueue.this.stats.incrementAndGet((StatCounter.Stat)QueueStats.Duplicates);
                                    continue block15;
                                }
                                if (!p.datum.getSourceId().equals(event.datum.getSourceId())) break;
                            }
                        }
                        DefaultDatumQueue.this.stats.incrementAndGet((StatCounter.Stat)QueueStats.Processed);
                        DefaultDatumQueue.this.postEvent("net/solarnetwork/node/service/DatumDataSource/DATUM_CAPTURED", event.datum);
                        try {
                            result = DefaultDatumQueue.this.applyTransform(event);
                        }
                        catch (Throwable t) {
                            DefaultDatumQueue.this.stats.incrementAndGet((StatCounter.Stat)QueueStats.Errors);
                            DefaultDatumQueue.this.log.error("Error processing datum {}; discarding.", (Object)event.datum, (Object)t);
                            throw t;
                        }
                        if (result == null) continue;
                        if (dirConsumer != null) {
                            try {
                                dirConsumer.accept(result);
                            }
                            catch (Throwable t) {
                                DefaultDatumQueue.this.stats.incrementAndGet((StatCounter.Stat)QueueStats.Errors);
                                DefaultDatumQueue.this.log.error("Direct consumer {} error on datum {}; ignoring.", new Object[]{dirConsumer, result, t});
                            }
                        }
                        DefaultDatumQueue.this.postEvent("net/solarnetwork/node/DatumQueue/DATUM_ACQUIRED", result);
                        if (event.persist) {
                            try {
                                DefaultDatumQueue.this.persistDatum(result);
                            }
                            catch (Throwable t) {
                                DefaultDatumQueue.this.stats.incrementAndGet((StatCounter.Stat)QueueStats.Errors);
                                DefaultDatumQueue.this.log.error("Error persisting datum {}; discarding.", (Object)event.datum, (Object)t);
                                throw t;
                            }
                        }
                        for (Consumer consumer : DefaultDatumQueue.this.consumers) {
                            consumer.accept(result);
                        }
                    }
                    events.clear();
                    DefaultDatumQueue.this.stats.addAndGet((StatCounter.Stat)QueueStats.ProcessingTimeTotal, System.currentTimeMillis() - start, true);
                } while (this.processing);
            }
            finally {
                DefaultDatumQueue.this.log.info("Finished DatumQueue processor {}", (Object)Integer.toHexString(this.hashCode()));
            }
        }
    }

    private final class ConsumerThread
    extends Thread
    implements Consumer<NodeDatum> {
        private final Consumer<NodeDatum> consumer;
        private final BlockingQueue<NodeDatum> queue;
        private boolean processing;

        private ConsumerThread(Consumer<NodeDatum> consumer) {
            super("DatumQueue Consumer " + consumer);
            this.setDaemon(true);
            this.consumer = consumer;
            this.queue = new LinkedBlockingDeque<NodeDatum>();
            this.processing = true;
        }

        @Override
        public void accept(NodeDatum datum) {
            this.queue.add(datum);
        }

        private void shutdown() {
            this.processing = false;
            this.interrupt();
        }

        @Override
        public void run() {
            do {
                NodeDatum datum = null;
                try {
                    datum = this.queue.take();
                    this.consumer.accept(datum);
                }
                catch (InterruptedException interruptedException) {
                }
                catch (Throwable t) {
                    DefaultDatumQueue.this.stats.incrementAndGet((StatCounter.Stat)QueueStats.Errors);
                    DefaultDatumQueue.this.log.error("Consumer error on datum {}; discarding.", (Object)datum, (Object)t);
                }
            } while (this.processing);
        }
    }
}

