/*
 * Decompiled with CFR 0.152.
 */
package org.apache.atlas.notification;

import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.annotations.VisibleForTesting;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.atlas.AtlasConfiguration;
import org.apache.atlas.model.notification.AtlasNotificationBaseMessage;
import org.apache.atlas.model.notification.AtlasNotificationMessage;
import org.apache.atlas.model.notification.AtlasNotificationStringMessage;
import org.apache.atlas.model.notification.MessageSource;
import org.apache.atlas.model.notification.MessageVersion;
import org.apache.atlas.notification.IncompatibleVersionException;
import org.apache.atlas.notification.MessageDeserializer;
import org.apache.atlas.notification.SplitMessageAggregator;
import org.apache.atlas.type.AtlasType;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AtlasNotificationMessageDeserializer<T>
implements MessageDeserializer<T> {
    private static final Logger LOG = LoggerFactory.getLogger(AtlasNotificationMessageDeserializer.class);
    public static final String VERSION_MISMATCH_MSG = "Notification message version mismatch. Expected %s but recieved %s. Message %s";
    private final TypeReference<T> messageType;
    private final TypeReference<AtlasNotificationMessage<T>> notificationMessageType;
    private final MessageVersion expectedVersion;
    private final Logger notificationLogger;
    private final Map<String, SplitMessageAggregator> splitMsgBuffer = new HashMap<String, SplitMessageAggregator>();
    private final long splitMessageBufferPurgeIntervalMs;
    private final long splitMessageSegmentsWaitTimeMs;
    private long splitMessagesLastPurgeTime = System.currentTimeMillis();
    private final AtomicLong messageCountTotal = new AtomicLong(0L);
    private final AtomicLong messageCountSinceLastInterval = new AtomicLong(0L);
    private long msgCreated;
    private boolean spooled;

    public AtlasNotificationMessageDeserializer(TypeReference<T> messageType, TypeReference<AtlasNotificationMessage<T>> notificationMessageType, MessageVersion expectedVersion, Logger notificationLogger) {
        this(messageType, notificationMessageType, expectedVersion, notificationLogger, AtlasConfiguration.NOTIFICATION_SPLIT_MESSAGE_SEGMENTS_WAIT_TIME_SECONDS.getLong() * 1000L, AtlasConfiguration.NOTIFICATION_SPLIT_MESSAGE_BUFFER_PURGE_INTERVAL_SECONDS.getLong() * 1000L);
    }

    public AtlasNotificationMessageDeserializer(TypeReference<T> messageType, TypeReference<AtlasNotificationMessage<T>> notificationMessageType, MessageVersion expectedVersion, Logger notificationLogger, long splitMessageSegmentsWaitTimeMs, long splitMessageBufferPurgeIntervalMs) {
        this.messageType = messageType;
        this.notificationMessageType = notificationMessageType;
        this.expectedVersion = expectedVersion;
        this.notificationLogger = notificationLogger;
        this.splitMessageSegmentsWaitTimeMs = splitMessageSegmentsWaitTimeMs;
        this.splitMessageBufferPurgeIntervalMs = splitMessageBufferPurgeIntervalMs;
    }

    public TypeReference<T> getMessageType() {
        return this.messageType;
    }

    public TypeReference<AtlasNotificationMessage<T>> getNotificationMessageType() {
        return this.notificationMessageType;
    }

    public long getMsgCreated() {
        return this.msgCreated;
    }

    public boolean getSpooled() {
        return this.spooled;
    }

    @Override
    public T deserialize(String messageJson) {
        Object ret;
        this.messageCountTotal.incrementAndGet();
        this.messageCountSinceLastInterval.incrementAndGet();
        this.msgCreated = 0L;
        this.spooled = false;
        AtlasNotificationBaseMessage msg = (AtlasNotificationBaseMessage)AtlasType.fromV1Json((String)messageJson, AtlasNotificationMessage.class);
        if (msg == null || msg.getVersion() == null) {
            ret = AtlasType.fromV1Json((String)messageJson, this.messageType);
        } else {
            this.msgCreated = ((AtlasNotificationMessage)msg).getMsgCreationTime();
            this.spooled = ((AtlasNotificationMessage)msg).getSpooled();
            String msgJson = messageJson;
            if (msg.getMsgSplitCount() > 1) {
                AtlasNotificationStringMessage splitMsg = (AtlasNotificationStringMessage)AtlasType.fromV1Json((String)msgJson, AtlasNotificationStringMessage.class);
                this.checkVersion((AtlasNotificationBaseMessage)splitMsg, msgJson);
                String msgId = splitMsg.getMsgId();
                if (StringUtils.isEmpty((CharSequence)msgId)) {
                    LOG.error("Received multi-part message with no message ID. Ignoring message");
                    msg = null;
                } else {
                    SplitMessageAggregator splitMsgs;
                    int splitIdx = splitMsg.getMsgSplitIdx();
                    int splitCount = splitMsg.getMsgSplitCount();
                    if (splitIdx == 0) {
                        splitMsgs = new SplitMessageAggregator(splitMsg);
                        this.splitMsgBuffer.put(splitMsgs.getMsgId(), splitMsgs);
                    } else {
                        splitMsgs = this.splitMsgBuffer.get(msgId);
                    }
                    if (splitMsgs == null) {
                        LOG.error("Received msgID={}: {} of {}, but first message didn't arrive. Ignoring message", new Object[]{msgId, splitIdx + 1, splitCount});
                        msg = null;
                    } else if (splitMsgs.getTotalSplitCount() <= (long)splitIdx) {
                        LOG.error("Received msgID={}: {} of {} - out of bounds. Ignoring message", new Object[]{msgId, splitIdx + 1, splitCount});
                        msg = null;
                    } else {
                        LOG.info("Received msgID={}: {} of {}", new Object[]{msgId, splitIdx + 1, splitCount});
                        boolean isReady = splitMsgs.add(splitMsg);
                        if (isReady) {
                            this.splitMsgBuffer.remove(msgId);
                            boolean isValidMessage = true;
                            StringBuilder sb = new StringBuilder();
                            int i = 0;
                            while ((long)i < splitMsgs.getTotalSplitCount()) {
                                splitMsg = splitMsgs.get(i);
                                if (splitMsg == null) {
                                    LOG.warn("MsgID={}: message {} of {} is missing. Ignoring message", new Object[]{msgId, i + 1, splitCount});
                                    isValidMessage = false;
                                    break;
                                }
                                sb.append(splitMsg.getMessage());
                                ++i;
                            }
                            if (isValidMessage) {
                                msgJson = sb.toString();
                                if (AtlasNotificationBaseMessage.CompressionKind.GZIP.equals((Object)splitMsg.getMsgCompressionKind())) {
                                    byte[] encodedBytes = AtlasNotificationBaseMessage.getBytesUtf8((String)msgJson);
                                    byte[] bytes = AtlasNotificationBaseMessage.decodeBase64AndGzipUncompress((byte[])encodedBytes);
                                    msgJson = AtlasNotificationBaseMessage.getStringUtf8((byte[])bytes);
                                    LOG.info("Received msgID={}: splitCount={}, compressed={} bytes, uncompressed={} bytes", new Object[]{msgId, splitCount, encodedBytes.length, bytes.length});
                                } else {
                                    byte[] encodedBytes = AtlasNotificationBaseMessage.getBytesUtf8((String)msgJson);
                                    byte[] bytes = AtlasNotificationBaseMessage.decodeBase64((byte[])encodedBytes);
                                    msgJson = AtlasNotificationBaseMessage.getStringUtf8((byte[])bytes);
                                    LOG.info("Received msgID={}: splitCount={}, length={} bytes", new Object[]{msgId, splitCount, bytes.length});
                                }
                                msg = (AtlasNotificationBaseMessage)AtlasType.fromV1Json((String)msgJson, AtlasNotificationBaseMessage.class);
                            } else {
                                msg = null;
                            }
                        } else {
                            msg = null;
                        }
                    }
                }
            }
            if (msg != null) {
                if (AtlasNotificationBaseMessage.CompressionKind.GZIP.equals((Object)msg.getMsgCompressionKind())) {
                    AtlasNotificationStringMessage compressedMsg = (AtlasNotificationStringMessage)AtlasType.fromV1Json((String)msgJson, AtlasNotificationStringMessage.class);
                    byte[] encodedBytes = AtlasNotificationBaseMessage.getBytesUtf8((String)compressedMsg.getMessage());
                    byte[] bytes = AtlasNotificationBaseMessage.decodeBase64AndGzipUncompress((byte[])encodedBytes);
                    msgJson = AtlasNotificationBaseMessage.getStringUtf8((byte[])bytes);
                    LOG.info("Received msgID={}: compressed={} bytes, uncompressed={} bytes", new Object[]{compressedMsg.getMsgId(), encodedBytes.length, bytes.length});
                }
                AtlasNotificationMessage atlasNotificationMessage = (AtlasNotificationMessage)AtlasType.fromV1Json((String)msgJson, this.notificationMessageType);
                this.checkCrossCombatMessageVersion((AtlasNotificationBaseMessage)atlasNotificationMessage);
                this.checkVersion((AtlasNotificationBaseMessage)atlasNotificationMessage, msgJson);
                ret = atlasNotificationMessage.getMessage();
            } else {
                ret = null;
            }
        }
        long now = System.currentTimeMillis();
        long timeSinceLastPurge = now - this.splitMessagesLastPurgeTime;
        if (timeSinceLastPurge >= this.splitMessageBufferPurgeIntervalMs) {
            AtlasNotificationMessageDeserializer.purgeStaleMessages(this.splitMsgBuffer, now, this.splitMessageSegmentsWaitTimeMs);
            LOG.info("Notification processing stats: total={}, sinceLastStatsReport={}", (Object)this.messageCountTotal.get(), (Object)this.messageCountSinceLastInterval.getAndSet(0L));
            this.splitMessagesLastPurgeTime = now;
        }
        return (T)ret;
    }

    @VisibleForTesting
    static void purgeStaleMessages(Map<String, SplitMessageAggregator> splitMsgBuffer, long now, long maxWaitTime) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("==> purgeStaleMessages(bufferedMessageCount=" + splitMsgBuffer.size() + ")");
        }
        ArrayList<SplitMessageAggregator> evictionList = null;
        for (SplitMessageAggregator aggregrator : splitMsgBuffer.values()) {
            long waitTime = now - aggregrator.getFirstSplitTimestamp();
            if (waitTime < maxWaitTime) continue;
            if (evictionList == null) {
                evictionList = new ArrayList<SplitMessageAggregator>();
            }
            evictionList.add(aggregrator);
        }
        if (evictionList != null) {
            for (SplitMessageAggregator aggregrator : evictionList) {
                LOG.error("evicting notification msgID={}, totalSplitCount={}, receivedSplitCount={}", new Object[]{aggregrator.getMsgId(), aggregrator.getTotalSplitCount(), aggregrator.getReceivedSplitCount()});
                splitMsgBuffer.remove(aggregrator.getMsgId());
            }
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("<== purgeStaleMessages(bufferedMessageCount=" + splitMsgBuffer.size() + ")");
        }
    }

    protected void checkVersion(AtlasNotificationBaseMessage notificationMessage, String messageJson) {
        int comp = notificationMessage.compareVersion(this.expectedVersion);
        if (comp > 0) {
            String msg = String.format(VERSION_MISMATCH_MSG, this.expectedVersion, notificationMessage.getVersion(), messageJson);
            this.notificationLogger.error(msg);
            throw new IncompatibleVersionException(msg);
        }
        if (comp < 0) {
            this.notificationLogger.info(String.format(VERSION_MISMATCH_MSG, this.expectedVersion, notificationMessage.getVersion(), messageJson));
        }
    }

    protected void checkCrossCombatMessageVersion(AtlasNotificationBaseMessage notificationMessage) {
        String sourceVersion = new MessageSource(this.getClass().getSimpleName()).getVersion();
        MessageSource notificationSourceVersion = notificationMessage.getSource();
        if (notificationMessage.getSource() != null && !StringUtils.equalsIgnoreCase((CharSequence)notificationSourceVersion.getVersion(), (CharSequence)sourceVersion)) {
            LOG.warn("Hook and Atlas server build versions are not similar : {}, {}", (Object)notificationSourceVersion.getVersion(), (Object)sourceVersion);
        }
    }
}

