/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.impl.processor;

import com.hazelcast.instance.HazelcastInstanceImpl;
import com.hazelcast.jet.Util;
import com.hazelcast.jet.core.Inbox;
import com.hazelcast.jet.core.Outbox;
import com.hazelcast.jet.core.Processor;
import com.hazelcast.jet.core.Watermark;
import com.hazelcast.jet.function.DistributedFunction;
import com.hazelcast.jet.impl.execution.init.Contexts;
import com.hazelcast.jet.impl.execution.init.ExecutionPlan;
import com.hazelcast.jet.impl.processor.ProcessorWrapper;
import com.hazelcast.logging.ILogger;
import com.hazelcast.spi.impl.NodeEngineImpl;
import java.util.BitSet;
import java.util.function.Predicate;
import java.util.stream.IntStream;
import javax.annotation.Nonnull;

public final class PeekWrappedP<T>
extends ProcessorWrapper {
    private final DistributedFunction<? super T, ? extends CharSequence> toStringFn;
    private final Predicate<? super T> shouldLogFn;
    private final LoggingInbox loggingInbox;
    private ILogger logger;
    private final boolean peekInput;
    private final boolean peekOutput;
    private final boolean peekSnapshot;
    private boolean peekedWatermarkLogged;

    public PeekWrappedP(@Nonnull Processor wrapped, @Nonnull DistributedFunction<? super T, ? extends CharSequence> toStringFn, @Nonnull Predicate<? super T> shouldLogFn, boolean peekInput, boolean peekOutput, boolean peekSnapshot) {
        super(wrapped);
        this.toStringFn = toStringFn;
        this.shouldLogFn = shouldLogFn;
        this.peekInput = peekInput;
        this.peekOutput = peekOutput;
        this.peekSnapshot = peekSnapshot;
        this.loggingInbox = peekInput ? new LoggingInbox() : null;
    }

    @Override
    public void init(@Nonnull Outbox outbox, @Nonnull Processor.Context context) {
        this.logger = context.logger();
        outbox = new LoggingOutbox(outbox, this.peekOutput, this.peekSnapshot);
        if (context instanceof Contexts.ProcCtx) {
            Contexts.ProcCtx c = (Contexts.ProcCtx)context;
            NodeEngineImpl nodeEngine = ((HazelcastInstanceImpl)c.jetInstance().getHazelcastInstance()).node.nodeEngine;
            ILogger newLogger = nodeEngine.getLogger(ExecutionPlan.createLoggerName(this.wrapped.getClass().getName(), c.vertexName(), c.globalProcessorIndex()));
            context = new Contexts.ProcCtx(c.jetInstance(), c.getSerializationService(), newLogger, c.vertexName(), c.globalProcessorIndex(), c.processingGuarantee(), c.localParallelism(), c.totalParallelism());
        }
        super.init(outbox, context);
    }

    @Override
    public void process(int ordinal, @Nonnull Inbox inbox) {
        if (this.peekInput) {
            this.loggingInbox.wrappedInbox = inbox;
            this.loggingInbox.ordinal = ordinal;
            super.process(ordinal, this.loggingInbox);
        } else {
            super.process(ordinal, inbox);
        }
    }

    private void log(String prefix, T object) {
        if (object != null && this.shouldLogFn.test(object)) {
            this.logger.info(prefix + ": " + this.toStringFn.apply(object));
        }
    }

    @Override
    public boolean tryProcessWatermark(@Nonnull Watermark watermark) {
        if (this.peekInput && !this.peekedWatermarkLogged) {
            this.logger.info("Input: " + watermark);
            this.peekedWatermarkLogged = true;
        }
        if (super.tryProcessWatermark(watermark)) {
            this.peekedWatermarkLogged = false;
            if (this.peekOutput) {
                this.logger.info("Output forwarded: " + watermark);
            }
            return true;
        }
        return false;
    }

    private final class LoggingOutbox
    implements Outbox {
        private final Outbox wrappedOutbox;
        private final int[] all;
        private final boolean logOutput;
        private final boolean logSnapshot;
        private final BitSet broadcastTracker;

        private LoggingOutbox(Outbox wrappedOutbox, boolean logOutput, boolean logSnapshot) {
            this.wrappedOutbox = wrappedOutbox;
            this.broadcastTracker = new BitSet(wrappedOutbox.bucketCount());
            this.all = IntStream.range(0, wrappedOutbox.bucketCount()).toArray();
            this.logOutput = logOutput;
            this.logSnapshot = logSnapshot;
        }

        @Override
        public int bucketCount() {
            return this.wrappedOutbox.bucketCount();
        }

        @Override
        public boolean offer(int ordinal, @Nonnull Object item) {
            if (ordinal == -1) {
                return this.offer(this.all, item);
            }
            if (!this.wrappedOutbox.offer(ordinal, item)) {
                return false;
            }
            if (this.logOutput) {
                String prefix = "Output to ordinal " + ordinal;
                if (item instanceof Watermark) {
                    PeekWrappedP.this.logger.info(prefix + ": " + item);
                } else {
                    PeekWrappedP.this.log(prefix, item);
                }
            }
            return true;
        }

        @Override
        public boolean offer(int[] ordinals, @Nonnull Object item) {
            for (int i = 0; i < ordinals.length; ++i) {
                if (this.broadcastTracker.get(i)) continue;
                if (this.offer(i, item)) {
                    this.broadcastTracker.set(i);
                    continue;
                }
                return false;
            }
            this.broadcastTracker.clear();
            return true;
        }

        @Override
        public boolean offerToSnapshot(@Nonnull Object key, @Nonnull Object value) {
            if (!this.wrappedOutbox.offerToSnapshot(key, value)) {
                return false;
            }
            if (this.logSnapshot) {
                PeekWrappedP.this.log("Output to snapshot", Util.entry(key, value));
            }
            return true;
        }
    }

    private class LoggingInbox
    implements Inbox {
        private Inbox wrappedInbox;
        private boolean peekedItemLogged;
        private int ordinal;

        private LoggingInbox() {
        }

        @Override
        public boolean isEmpty() {
            return this.wrappedInbox.isEmpty();
        }

        @Override
        public Object peek() {
            Object res = this.wrappedInbox.peek();
            if (!this.peekedItemLogged && res != null) {
                this.log(res);
                this.peekedItemLogged = true;
            }
            return res;
        }

        @Override
        public Object poll() {
            Object res = this.wrappedInbox.poll();
            if (!this.peekedItemLogged && res != null) {
                this.log(res);
            }
            this.peekedItemLogged = false;
            return res;
        }

        private void log(T res) {
            PeekWrappedP.this.log("Input from ordinal " + this.ordinal, res);
        }

        @Override
        public void remove() {
            this.peekedItemLogged = false;
            this.wrappedInbox.remove();
        }
    }
}

