/*
 * Decompiled with CFR 0.152.
 */
package org.openmetadata.service.apps.bundles.changeEvent.feed;

import java.util.UUID;
import org.apache.commons.lang3.tuple.Pair;
import org.openmetadata.common.utils.CommonUtil;
import org.openmetadata.schema.entity.events.SubscriptionDestination;
import org.openmetadata.schema.entity.feed.Thread;
import org.openmetadata.schema.type.ChangeEvent;
import org.openmetadata.service.apps.bundles.changeEvent.Destination;
import org.openmetadata.service.events.errors.EventPublisherException;
import org.openmetadata.service.exception.CatalogExceptionMessage;
import org.openmetadata.service.formatter.decorators.FeedMessageDecorator;
import org.openmetadata.service.jdbi3.FeedRepository;
import org.openmetadata.service.socket.WebSocketManager;
import org.openmetadata.service.util.FeedUtils;
import org.openmetadata.service.util.JsonUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ActivityFeedPublisher
implements Destination<ChangeEvent> {
    private static final Logger LOG = LoggerFactory.getLogger(ActivityFeedPublisher.class);
    private final FeedMessageDecorator feedMessageFormatter = new FeedMessageDecorator();
    FeedRepository feedRepository = new FeedRepository();
    private final SubscriptionDestination subscriptionDestination;

    public ActivityFeedPublisher(SubscriptionDestination subscription) {
        if (subscription.getType() != SubscriptionDestination.SubscriptionType.ACTIVITY_FEED) {
            throw new IllegalArgumentException("Activity Alert Invoked with Illegal Type and Settings.");
        }
        this.subscriptionDestination = subscription;
    }

    @Override
    public void sendMessage(ChangeEvent changeEvent) throws EventPublisherException {
        try {
            if (!changeEvent.getEntityType().equals("THREAD")) {
                for (Thread thread : CommonUtil.listOrEmpty(FeedUtils.getThreadWithMessage(this.feedMessageFormatter, changeEvent))) {
                    if (thread.getMessage() == null || thread.getMessage().isEmpty()) continue;
                    this.feedRepository.create(thread, changeEvent);
                    WebSocketManager.getInstance().broadCastMessageToAll("activityFeed", JsonUtils.pojoToJson(thread));
                }
            }
        }
        catch (Exception ex) {
            String message = CatalogExceptionMessage.eventPublisherFailedToPublish(SubscriptionDestination.SubscriptionType.ACTIVITY_FEED, changeEvent, ex.getMessage());
            LOG.error(message);
            throw new EventPublisherException(message, (Pair<UUID, ChangeEvent>)Pair.of((Object)this.subscriptionDestination.getId(), (Object)changeEvent));
        }
    }

    @Override
    public boolean getEnabled() {
        return this.subscriptionDestination.getEnabled();
    }

    @Override
    public void close() {
        LOG.info("Closing Activity Feed Publisher");
    }

    @Override
    public SubscriptionDestination getSubscriptionDestination() {
        return this.subscriptionDestination;
    }
}

