/*
 * Decompiled with CFR 0.152.
 */
package org.killbill.billing.util.broadcast;

import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import javax.inject.Inject;
import org.killbill.billing.platform.api.LifecycleHandlerType;
import org.killbill.billing.util.broadcast.BroadcastService;
import org.killbill.billing.util.broadcast.DefaultBroadcastInternalEvent;
import org.killbill.billing.util.broadcast.dao.BroadcastDao;
import org.killbill.billing.util.broadcast.dao.BroadcastModelDao;
import org.killbill.billing.util.config.definition.BroadcastConfig;
import org.killbill.bus.api.BusEvent;
import org.killbill.bus.api.PersistentBus;
import org.killbill.commons.concurrent.Executors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DefaultBroadcastService
implements BroadcastService {
    private static final int TERMINATION_TIMEOUT_SEC = 5;
    private static final Logger logger = LoggerFactory.getLogger(DefaultBroadcastService.class);
    public static final String BROADCAST_SERVICE_NAME = "broadcast-service";
    private final BroadcastConfig broadcastConfig;
    private final BroadcastDao broadcastDao;
    private final PersistentBus eventBus;
    private AtomicLong latestRecordIdProcessed;
    private ScheduledExecutorService broadcastExecutor;
    private volatile boolean isStopped;

    @Inject
    public DefaultBroadcastService(BroadcastDao broadcastDao, BroadcastConfig broadcastConfig, PersistentBus eventBus) {
        this.broadcastDao = broadcastDao;
        this.broadcastConfig = broadcastConfig;
        this.eventBus = eventBus;
        this.isStopped = false;
    }

    public String getName() {
        return BROADCAST_SERVICE_NAME;
    }

    @LifecycleHandlerType(value=LifecycleHandlerType.LifecycleLevel.INIT_SERVICE)
    public void initialize() {
        BroadcastModelDao entry = this.broadcastDao.getLatestEntry();
        this.latestRecordIdProcessed = entry != null ? new AtomicLong(entry.getRecordId()) : new AtomicLong(0L);
        this.broadcastExecutor = Executors.newSingleThreadScheduledExecutor((String)"BroadcastExecutor");
        this.isStopped = false;
    }

    @LifecycleHandlerType(value=LifecycleHandlerType.LifecycleLevel.START_SERVICE)
    public void start() {
        TimeUnit pendingRateUnit = this.broadcastConfig.getBroadcastServiceRunningRate().getUnit();
        long pendingPeriod = this.broadcastConfig.getBroadcastServiceRunningRate().getPeriod();
        this.broadcastExecutor.scheduleAtFixedRate(new BroadcastServiceRunnable(this, this.broadcastDao, this.eventBus), pendingPeriod, pendingPeriod, pendingRateUnit);
    }

    @LifecycleHandlerType(value=LifecycleHandlerType.LifecycleLevel.STOP_SERVICE)
    public void stop() {
        if (this.isStopped) {
            logger.warn("BroadcastExecutor is already in a stopped state");
            return;
        }
        try {
            this.broadcastExecutor.shutdown();
            boolean success = this.broadcastExecutor.awaitTermination(5L, TimeUnit.SECONDS);
            if (!success) {
                logger.warn("BroadcastExecutor failed to complete termination within {} sec", (Object)5);
            }
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            logger.warn("BroadcastExecutor stop sequence got interrupted");
        }
        finally {
            this.isStopped = true;
        }
    }

    public boolean isStopped() {
        return this.isStopped;
    }

    public AtomicLong getLatestRecordIdProcessed() {
        return this.latestRecordIdProcessed;
    }

    public void setLatestRecordIdProcessed(Long latestRecordIdProcessed) {
        this.latestRecordIdProcessed.set(latestRecordIdProcessed);
    }

    private static class BroadcastServiceRunnable
    implements Runnable {
        private final DefaultBroadcastService parent;
        private final BroadcastDao broadcastDao;
        private final PersistentBus eventBus;

        public BroadcastServiceRunnable(DefaultBroadcastService defaultBroadcastService, BroadcastDao broadcastDao, PersistentBus eventBus) {
            this.parent = defaultBroadcastService;
            this.broadcastDao = broadcastDao;
            this.eventBus = eventBus;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void run() {
            if (this.parent.isStopped) {
                return;
            }
            List<BroadcastModelDao> entries = this.broadcastDao.getLatestEntriesFrom(this.parent.getLatestRecordIdProcessed().get());
            for (BroadcastModelDao cur : entries) {
                if (this.parent.isStopped()) {
                    return;
                }
                DefaultBroadcastInternalEvent event = new DefaultBroadcastInternalEvent(cur.getServiceName(), cur.getType(), cur.getEvent());
                try {
                    this.eventBus.post((BusEvent)event);
                }
                catch (PersistentBus.EventBusException e) {
                    logger.warn("Failed to post event {}", (Object)event, (Object)e);
                }
                finally {
                    this.parent.setLatestRecordIdProcessed(cur.getRecordId());
                }
            }
        }
    }
}

