/*
 * Decompiled with CFR 0.152.
 */
package io.deephaven.extensions.barrage.table;

import com.google.common.annotations.VisibleForTesting;
import gnu.trove.list.TLongList;
import gnu.trove.list.linked.TLongLinkedList;
import io.deephaven.base.verify.Assert;
import io.deephaven.chunk.ChunkType;
import io.deephaven.configuration.Configuration;
import io.deephaven.engine.context.ExecutionContext;
import io.deephaven.engine.rowset.RowSet;
import io.deephaven.engine.rowset.RowSetFactory;
import io.deephaven.engine.rowset.TrackingRowSet;
import io.deephaven.engine.rowset.WritableRowSet;
import io.deephaven.engine.table.ColumnDefinition;
import io.deephaven.engine.table.ColumnSource;
import io.deephaven.engine.table.Table;
import io.deephaven.engine.table.TableDefinition;
import io.deephaven.engine.table.TableUpdate;
import io.deephaven.engine.table.WritableColumnSource;
import io.deephaven.engine.table.impl.BaseTable;
import io.deephaven.engine.table.impl.InstrumentedTableUpdateSource;
import io.deephaven.engine.table.impl.QueryTable;
import io.deephaven.engine.table.impl.sources.ArrayBackedColumnSource;
import io.deephaven.engine.table.impl.sources.LongSparseArraySource;
import io.deephaven.engine.table.impl.sources.ReinterpretUtils;
import io.deephaven.engine.table.impl.sources.WritableRedirectedColumnSource;
import io.deephaven.engine.table.impl.util.BarrageMessage;
import io.deephaven.engine.table.impl.util.LongColumnSourceWritableRowRedirection;
import io.deephaven.engine.table.impl.util.RowRedirection;
import io.deephaven.engine.table.impl.util.WritableRowRedirection;
import io.deephaven.engine.updategraph.LogicalClock;
import io.deephaven.engine.updategraph.NotificationQueue;
import io.deephaven.engine.updategraph.UpdateGraph;
import io.deephaven.engine.updategraph.UpdateSourceRegistrar;
import io.deephaven.extensions.barrage.BarragePerformanceLog;
import io.deephaven.extensions.barrage.BarrageSubscriptionPerformanceLogger;
import io.deephaven.extensions.barrage.table.BarrageBlinkTable;
import io.deephaven.extensions.barrage.table.BarrageRedirectedTable;
import io.deephaven.internal.log.LoggerFactory;
import io.deephaven.io.log.LogEntry;
import io.deephaven.io.log.LogLevel;
import io.deephaven.io.logger.Logger;
import io.deephaven.time.DateTimeUtils;
import io.deephaven.util.annotations.InternalUseOnly;
import java.time.Instant;
import java.util.ArrayDeque;
import java.util.Arrays;
import java.util.BitSet;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.function.Function;
import java.util.function.LongConsumer;
import java.util.function.Predicate;
import org.HdrHistogram.Histogram;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

public abstract class BarrageTable
extends QueryTable
implements BarrageMessage.Listener {
    public static final boolean DEBUG_ENABLED = Configuration.getInstance().getBooleanWithDefault("BarrageTable.debug", false);
    protected static final Logger log = LoggerFactory.getLogger(BarrageTable.class);
    protected static final int BATCH_SIZE = 65536;
    private final UpdateSourceRegistrar registrar;
    private final NotificationQueue notificationQueue;
    private final ScheduledExecutorService executorService;
    protected final Stats stats;
    protected long capacity = 0L;
    protected final WritableColumnSource<?>[] destSources;
    private RowSet serverViewport;
    private BitSet serverColumns;
    private boolean serverReverseViewport;
    private final ArrayDeque<Runnable> pendingVpChangeNotifications = new ArrayDeque();
    private final Object pendingUpdatesLock = new Object();
    private ArrayDeque<BarrageMessage> pendingUpdates = new ArrayDeque();
    private ArrayDeque<BarrageMessage> shadowPendingUpdates = new ArrayDeque();
    private Throwable pendingError = null;
    private volatile int prevTrackingEnabled = 0;
    private static final AtomicIntegerFieldUpdater<BarrageTable> PREV_TRACKING_UPDATER = AtomicIntegerFieldUpdater.newUpdater(BarrageTable.class, "prevTrackingEnabled");
    private final List<Object> processedData;
    private final TLongList processedStep;
    private final SourceRefresher refresher;
    @Nullable
    private ViewportChangedCallback viewportChangedCallback;

    protected BarrageTable(UpdateSourceRegistrar registrar, NotificationQueue notificationQueue, @Nullable ScheduledExecutorService executorService, LinkedHashMap<String, ColumnSource<?>> columns, WritableColumnSource<?>[] writableSources, Map<String, Object> attributes, @Nullable ViewportChangedCallback viewportChangedCallback) {
        super((TrackingRowSet)RowSetFactory.empty().toTracking(), columns);
        attributes.entrySet().stream().filter(e -> !((String)e.getKey()).equals("SystemicTable")).forEach(e -> this.setAttribute((String)e.getKey(), e.getValue()));
        this.registrar = registrar;
        this.notificationQueue = notificationQueue;
        this.executorService = executorService;
        String tableKey = BarragePerformanceLog.getKeyFor((Table)this);
        this.stats = executorService == null || tableKey == null ? null : new Stats(tableKey);
        this.destSources = new WritableColumnSource[writableSources.length];
        for (int ii = 0; ii < writableSources.length; ++ii) {
            this.destSources[ii] = ReinterpretUtils.maybeConvertToWritablePrimitive(writableSources[ii]);
        }
        long currentClockValue = this.getUpdateGraph().clock().currentValue();
        this.setLastNotificationStep(LogicalClock.getState((long)currentClockValue) == LogicalClock.State.Updating ? LogicalClock.getStep((long)currentClockValue) - 1L : LogicalClock.getStep((long)currentClockValue));
        if (DEBUG_ENABLED) {
            this.processedData = new LinkedList<Object>();
            this.processedStep = new TLongLinkedList();
        } else {
            this.processedData = null;
            this.processedStep = null;
        }
        this.refresher = new SourceRefresher();
        this.viewportChangedCallback = viewportChangedCallback;
    }

    public void addSourceToRegistrar() {
        this.setRefreshing(true);
        this.registrar.addSource((Runnable)((Object)this.refresher));
    }

    protected abstract TableUpdate applyUpdates(ArrayDeque<BarrageMessage> var1);

    public ChunkType[] getWireChunkTypes() {
        return (ChunkType[])Arrays.stream(this.destSources).map(s -> ChunkType.fromElementType((Class)s.getType())).toArray(ChunkType[]::new);
    }

    public Class<?>[] getWireTypes() {
        return (Class[])this.getColumnSources().stream().map(ColumnSource::getType).toArray(Class[]::new);
    }

    public Class<?>[] getWireComponentTypes() {
        return (Class[])this.getColumnSources().stream().map(ColumnSource::getComponentType).toArray(Class[]::new);
    }

    @VisibleForTesting
    public RowSet getServerViewport() {
        return this.serverViewport;
    }

    @VisibleForTesting
    public boolean getServerReverseViewport() {
        return this.serverReverseViewport;
    }

    @VisibleForTesting
    public BitSet getServerColumns() {
        return this.serverColumns;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void handleBarrageMessage(BarrageMessage update) {
        if (this.pendingError != null || this.isFailed()) {
            this.beginLog(LogLevel.INFO).append((CharSequence)": Discarding update for errored table!").endl();
            return;
        }
        Object object = this.pendingUpdatesLock;
        synchronized (object) {
            this.pendingUpdates.add(update.clone());
        }
        if (!this.isRefreshing()) {
            try {
                this.realRefresh();
            }
            catch (Throwable err) {
                this.tryToDeliverErrorToCallback(err);
                throw err;
            }
        } else {
            this.doWakeup();
        }
    }

    public void handleBarrageError(Throwable t) {
        this.enqueueError(t);
    }

    private synchronized void tryToDeliverErrorToCallback(Throwable err) {
        if (this.viewportChangedCallback != null) {
            this.viewportChangedCallback.onError(err);
            this.viewportChangedCallback = null;
        }
    }

    protected synchronized void updateServerViewport(RowSet viewport, BitSet columns, boolean reverseViewport) {
        Assert.holdsLock((Object)((Object)this), (String)"BarrageTable.this");
        WritableRowSet finalViewport = viewport == null ? null : viewport.copy();
        BitSet finalColumns = columns == null || columns.cardinality() == this.numColumns() ? null : (BitSet)columns.clone();
        this.serverViewport = finalViewport;
        this.serverColumns = finalColumns;
        this.serverReverseViewport = reverseViewport;
        if (this.viewportChangedCallback == null) {
            return;
        }
        this.pendingVpChangeNotifications.add(() -> this.lambda$updateServerViewport$6((RowSet)finalViewport, finalColumns, reverseViewport));
    }

    protected boolean isSubscribedColumn(int i) {
        return this.serverColumns == null || this.serverColumns.get(i);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private synchronized void realRefresh() {
        ArrayDeque<BarrageMessage> localPendingUpdates;
        if (this.isFailed()) {
            this.discardAnyPendingUpdates();
            return;
        }
        if (this.pendingError != null) {
            this.tryToDeliverErrorToCallback(this.pendingError);
            if (this.isRefreshing()) {
                this.notifyListenersOnError(this.pendingError, null);
            }
            this.cleanup();
            Assert.eqZero((int)this.shadowPendingUpdates.size(), (String)"shadowPendingUpdates.size()");
            return;
        }
        Object object = this.pendingUpdatesLock;
        synchronized (object) {
            localPendingUpdates = this.pendingUpdates;
            this.pendingUpdates = this.shadowPendingUpdates;
            this.shadowPendingUpdates = localPendingUpdates;
            Assert.eqZero((int)this.pendingUpdates.size(), (String)"pendingUpdates.size()");
        }
        TableUpdate update = this.applyUpdates(localPendingUpdates);
        localPendingUpdates.clear();
        if (update != null) {
            if (this.isRefreshing()) {
                this.maybeEnablePrevTracking();
                this.notifyListeners(update);
            } else {
                update.release();
            }
        }
        if (!this.pendingVpChangeNotifications.isEmpty()) {
            this.pendingVpChangeNotifications.forEach(Runnable::run);
            this.pendingVpChangeNotifications.clear();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void discardAnyPendingUpdates() {
        Object object = this.pendingUpdatesLock;
        synchronized (object) {
            this.pendingUpdates.forEach(BarrageMessage::close);
            this.pendingUpdates.clear();
        }
    }

    private void cleanup() {
        if (this.stats != null) {
            this.stats.stop();
        }
        if (this.isRefreshing()) {
            this.registrar.removeSource((Runnable)((Object)this.refresher));
        }
        this.discardAnyPendingUpdates();
    }

    protected NotificationQueue getNotificationQueue() {
        return this.notificationQueue;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void enqueueError(Throwable e) {
        Object object = this.pendingUpdatesLock;
        synchronized (object) {
            this.pendingError = e;
        }
        if (!this.isRefreshing()) {
            try {
                this.realRefresh();
            }
            catch (Throwable err) {
                this.tryToDeliverErrorToCallback(err);
                throw err;
            }
        } else {
            this.doWakeup();
        }
    }

    @InternalUseOnly
    public static BarrageTable make(@Nullable ScheduledExecutorService executorService, TableDefinition tableDefinition, Map<String, Object> attributes, @Nullable ViewportChangedCallback vpCallback) {
        UpdateGraph ug = ExecutionContext.getContext().getUpdateGraph();
        return BarrageTable.make((UpdateSourceRegistrar)ug, (NotificationQueue)ug, executorService, tableDefinition, attributes, vpCallback);
    }

    @VisibleForTesting
    public static BarrageTable make(UpdateSourceRegistrar registrar, NotificationQueue queue, @Nullable ScheduledExecutorService executor, TableDefinition tableDefinition, Map<String, Object> attributes, @Nullable ViewportChangedCallback vpCallback) {
        BarrageTable table;
        List columns = tableDefinition.getColumns();
        WritableColumnSource[] writableSources = new WritableColumnSource[columns.size()];
        Predicate<String> getAttribute = attr -> {
            Boolean value = attributes.getOrDefault(attr, false);
            return value instanceof Boolean && value != false;
        };
        if (getAttribute.test("BlinkTable")) {
            LinkedHashMap<String, ColumnSource<?>> finalColumns = BarrageTable.makeColumns(columns, writableSources);
            table = new BarrageBlinkTable(registrar, queue, executor, finalColumns, writableSources, attributes, vpCallback);
        } else {
            boolean isFlat = getAttribute.test("IsFlat");
            Object rowRedirection = getAttribute.test("AppendOnly") || isFlat ? new LongColumnSourceWritableRowRedirection((WritableColumnSource)new LongSparseArraySource()) : WritableRowRedirection.FACTORY.createRowRedirection(1024);
            LinkedHashMap<String, ColumnSource<?>> finalColumns = BarrageTable.makeColumns(columns, writableSources, rowRedirection);
            table = new BarrageRedirectedTable(registrar, queue, executor, finalColumns, (WritableColumnSource<?>[])writableSources, (WritableRowRedirection)rowRedirection, attributes, isFlat, vpCallback);
        }
        return table;
    }

    @NotNull
    protected static LinkedHashMap<String, ColumnSource<?>> makeColumns(List<ColumnDefinition<?>> columns, WritableColumnSource<?>[] writableSources, WritableRowRedirection emptyRowRedirection) {
        int numColumns = columns.size();
        LinkedHashMap finalColumns = new LinkedHashMap(numColumns);
        for (int ii = 0; ii < numColumns; ++ii) {
            ColumnDefinition<?> column = columns.get(ii);
            writableSources[ii] = ArrayBackedColumnSource.getMemoryColumnSource((long)0L, (Class)column.getDataType(), (Class)column.getComponentType());
            finalColumns.put(column.getName(), (ColumnSource<?>)WritableRedirectedColumnSource.maybeRedirect((RowRedirection)emptyRowRedirection, writableSources[ii], (long)0L));
        }
        return finalColumns;
    }

    @NotNull
    protected static LinkedHashMap<String, ColumnSource<?>> makeColumns(List<ColumnDefinition<?>> columns, WritableColumnSource<?>[] writableSources) {
        int numColumns = columns.size();
        LinkedHashMap finalColumns = new LinkedHashMap(numColumns);
        for (int ii = 0; ii < numColumns; ++ii) {
            ColumnDefinition<?> column = columns.get(ii);
            writableSources[ii] = ArrayBackedColumnSource.getMemoryColumnSource((long)0L, (Class)column.getDataType(), (Class)column.getComponentType());
            finalColumns.put(column.getName(), (ColumnSource<?>)writableSources[ii]);
        }
        return finalColumns;
    }

    protected void saveForDebugging(BarrageMessage snapshotOrDelta) {
        if (!DEBUG_ENABLED) {
            return;
        }
        if (this.processedData.size() > 10) {
            BarrageMessage msg = (BarrageMessage)this.processedData.remove(0);
            msg.close();
            this.processedStep.remove(0L);
        }
        this.processedData.add(snapshotOrDelta.clone());
        this.processedStep.add(this.getUpdateGraph().clock().currentStep());
    }

    protected boolean maybeEnablePrevTracking() {
        if (!PREV_TRACKING_UPDATER.compareAndSet(this, 0, 1)) {
            return false;
        }
        for (WritableColumnSource<?> ws : this.destSources) {
            ws.startTrackingPrevValues();
        }
        return true;
    }

    protected void doWakeup() {
        this.registrar.requestRefresh();
    }

    @Nullable
    public Object getAttribute(@NotNull String key) {
        Object localAttribute = super.getAttribute(key);
        if (localAttribute != null && key.equals("InputTable")) {
            throw new UnsupportedOperationException();
        }
        return localAttribute;
    }

    protected LogEntry beginLog(LogLevel level) {
        return log.getEntry(level).append(System.identityHashCode((Object)this));
    }

    protected void destroy() {
        super.destroy();
        this.cleanup();
    }

    public LongConsumer getDeserializationTmConsumer() {
        if (this.stats == null) {
            return ignored -> {};
        }
        return value -> this.recordMetric(stats -> stats.deserialize, value);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void recordMetric(Function<Stats, Histogram> hist, long value) {
        if (this.stats == null) {
            return;
        }
        Stats stats = this.stats;
        synchronized (stats) {
            hist.apply(this.stats).recordValue(value);
        }
    }

    private /* synthetic */ void lambda$updateServerViewport$6(RowSet finalViewport, BitSet finalColumns, boolean reverseViewport) {
        if (this.viewportChangedCallback == null) {
            return;
        }
        if (!this.viewportChangedCallback.viewportChanged(finalViewport, finalColumns, reverseViewport)) {
            this.viewportChangedCallback = null;
        }
    }

    protected class Stats
    implements Runnable {
        private final int NUM_SIG_FIGS = 3;
        public final String tableId = Integer.toHexString(System.identityHashCode((Object)BarrageTable.this));
        public final String tableKey;
        public final Histogram deserialize = new Histogram(3);
        public final Histogram processUpdate = new Histogram(3);
        public final Histogram refresh = new Histogram(3);
        public final ScheduledFuture<?> runFuture;

        public Stats(String tableKey) {
            this.tableKey = tableKey;
            this.runFuture = BarrageTable.this.executorService.scheduleWithFixedDelay(this, BarragePerformanceLog.CYCLE_DURATION_MILLIS, BarragePerformanceLog.CYCLE_DURATION_MILLIS, TimeUnit.MILLISECONDS);
        }

        public void stop() {
            this.runFuture.cancel(false);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            BarrageSubscriptionPerformanceLogger logger;
            Instant now = DateTimeUtils.now();
            BarrageSubscriptionPerformanceLogger barrageSubscriptionPerformanceLogger = logger = BarragePerformanceLog.getInstance().getSubscriptionLogger();
            synchronized (barrageSubscriptionPerformanceLogger) {
                this.flush(now, logger, this.deserialize, "DeserializationMillis");
                this.flush(now, logger, this.processUpdate, "ProcessUpdateMillis");
                this.flush(now, logger, this.refresh, "RefreshMillis");
            }
        }

        private void flush(Instant now, BarrageSubscriptionPerformanceLogger logger, Histogram hist, String statType) {
            if (hist.getTotalCount() == 0L) {
                return;
            }
            logger.log(this.tableId, this.tableKey, statType, now, hist);
            hist.reset();
        }
    }

    private class SourceRefresher
    extends InstrumentedTableUpdateSource {
        SourceRefresher() {
            super((BaseTable)BarrageTable.this, "BarrageTable(" + System.identityHashCode((Object)BarrageTable.this) + (String)(BarrageTable.this.stats != null ? ") " + BarrageTable.this.stats.tableKey : ")"));
        }

        protected void instrumentedRefresh() {
            try {
                long startTm = System.nanoTime();
                BarrageTable.this.realRefresh();
                BarrageTable.this.recordMetric(stats -> stats.refresh, System.nanoTime() - startTm);
            }
            catch (Throwable err) {
                BarrageTable.this.beginLog(LogLevel.ERROR).append((CharSequence)": Failure during BarrageTable instrumentedRefresh: ").append(err).endl();
                BarrageTable.this.tryToDeliverErrorToCallback(err);
                throw err;
            }
        }
    }

    public static interface ViewportChangedCallback {
        public boolean viewportChanged(@Nullable RowSet var1, @Nullable BitSet var2, boolean var3);

        public void onError(@NotNull Throwable var1);
    }
}

