/*
 * Decompiled with CFR 0.152.
 */
package io.pravega.controller.eventProcessor.impl;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.AbstractIdleService;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.pravega.client.admin.ReaderGroupManager;
import io.pravega.client.stream.EventStreamReader;
import io.pravega.client.stream.EventStreamWriter;
import io.pravega.client.stream.EventWriterConfig;
import io.pravega.client.stream.ReaderConfig;
import io.pravega.client.stream.ReaderGroup;
import io.pravega.client.stream.ReaderGroupConfig;
import io.pravega.client.stream.Stream;
import io.pravega.common.LoggerHelpers;
import io.pravega.controller.eventProcessor.EventProcessorConfig;
import io.pravega.controller.eventProcessor.EventProcessorGroup;
import io.pravega.controller.eventProcessor.impl.EventProcessorCell;
import io.pravega.controller.eventProcessor.impl.EventProcessorSystemImpl;
import io.pravega.controller.store.checkpoint.CheckpointStore;
import io.pravega.controller.store.checkpoint.CheckpointStoreException;
import io.pravega.shared.controller.event.ControllerEvent;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.lang3.NotImplementedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class EventProcessorGroupImpl<T extends ControllerEvent>
extends AbstractIdleService
implements EventProcessorGroup<T> {
    @SuppressFBWarnings(justification="generated code")
    private static final Logger log = LoggerFactory.getLogger(EventProcessorGroupImpl.class);
    private final String objectId;
    private final EventProcessorSystemImpl actorSystem;
    private final EventProcessorConfig<T> eventProcessorConfig;
    @VisibleForTesting
    private final ConcurrentHashMap<String, EventProcessorCell<T>> eventProcessorMap;
    private final EventStreamWriter<T> writer;
    private ReaderGroup readerGroup;
    private final CheckpointStore checkpointStore;
    private final Object lock = new Object();

    EventProcessorGroupImpl(EventProcessorSystemImpl actorSystem, EventProcessorConfig<T> eventProcessorConfig, CheckpointStore checkpointStore) {
        this.objectId = String.format("EventProcessorGroup[%s]", eventProcessorConfig.getConfig().getReaderGroupName());
        this.actorSystem = actorSystem;
        this.eventProcessorConfig = eventProcessorConfig;
        this.eventProcessorMap = new ConcurrentHashMap();
        this.writer = actorSystem.clientFactory.createEventWriter(eventProcessorConfig.getConfig().getStreamName(), eventProcessorConfig.getSerializer(), EventWriterConfig.builder().build());
        this.checkpointStore = checkpointStore;
    }

    void initialize() throws CheckpointStoreException {
        try {
            this.checkpointStore.addReaderGroup(this.actorSystem.getProcess(), this.eventProcessorConfig.getConfig().getReaderGroupName());
        }
        catch (CheckpointStoreException e) {
            if (!e.getType().equals((Object)CheckpointStoreException.Type.NodeExists)) {
                throw e;
            }
            log.warn("reader group {} exists", (Object)this.eventProcessorConfig.getConfig().getReaderGroupName());
        }
        this.readerGroup = this.createIfNotExists(this.actorSystem.readerGroupManager, this.eventProcessorConfig.getConfig().getReaderGroupName(), ReaderGroupConfig.builder().disableAutomaticCheckpoints().stream(Stream.of((String)this.actorSystem.getScope(), (String)this.eventProcessorConfig.getConfig().getStreamName())).build());
        this.createEventProcessors(this.eventProcessorConfig.getConfig().getEventProcessorCount() - this.eventProcessorMap.values().size());
    }

    private ReaderGroup createIfNotExists(ReaderGroupManager readerGroupManager, String groupName, ReaderGroupConfig groupConfig) {
        readerGroupManager.createReaderGroup(groupName, groupConfig);
        return readerGroupManager.getReaderGroup(groupName);
    }

    private List<String> createEventProcessors(int count) throws CheckpointStoreException {
        ArrayList<String> readerIds = new ArrayList<String>();
        for (int i = 0; i < count; ++i) {
            String readerId = UUID.randomUUID().toString();
            this.checkpointStore.addReader(this.actorSystem.getProcess(), this.eventProcessorConfig.getConfig().getReaderGroupName(), readerId);
            EventStreamReader reader = this.actorSystem.clientFactory.createReader(readerId, this.eventProcessorConfig.getConfig().getReaderGroupName(), this.eventProcessorConfig.getSerializer(), ReaderConfig.builder().disableTimeWindows(true).build());
            EventProcessorCell<T> actorCell = new EventProcessorCell<T>(this.eventProcessorConfig, reader, this.writer, this.actorSystem.getProcess(), readerId, i, this.checkpointStore);
            log.info("Created event processor {}, id={}", (Object)i, (Object)actorCell.toString());
            this.eventProcessorMap.put(readerId, actorCell);
            readerIds.add(readerId);
        }
        return readerIds;
    }

    protected void startUp() {
        long traceId = LoggerHelpers.traceEnterWithContext((Logger)log, (String)this.objectId, (String)"startUp", (Object[])new Object[0]);
        try {
            log.info("Attempting to start all event processors in {}", (Object)this.toString());
            this.eventProcessorMap.entrySet().forEach(entry -> ((EventProcessorCell)entry.getValue()).startAsync());
            log.info("Waiting for all all event processors in {} to start", (Object)this.toString());
            this.eventProcessorMap.entrySet().forEach(entry -> ((EventProcessorCell)entry.getValue()).awaitStartupComplete());
        }
        finally {
            LoggerHelpers.traceLeave((Logger)log, (String)this.objectId, (String)"startUp", (long)traceId, (Object[])new Object[0]);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void shutDown() {
        Object object = this.lock;
        synchronized (object) {
            long traceId = LoggerHelpers.traceEnterWithContext((Logger)log, (String)this.objectId, (String)"shutDown", (Object[])new Object[0]);
            try {
                try {
                    log.info("Attempting to seal the reader group entry from checkpoint store");
                    this.checkpointStore.sealReaderGroup(this.actorSystem.getProcess(), this.readerGroup.getGroupName());
                }
                catch (CheckpointStoreException e) {
                    log.warn("Error sealing reader group " + this.objectId, (Throwable)e);
                }
                for (EventProcessorCell<T> cell : this.eventProcessorMap.values()) {
                    log.info("Stopping event processor cell: {}", cell);
                    cell.stopAsync();
                    log.info("Awaiting termination of event processor cell: {}", cell);
                    try {
                        cell.awaitTerminated();
                    }
                    catch (Exception e) {
                        log.warn("Failed terminating event processor cell {}.", cell, (Object)e);
                    }
                }
                try {
                    log.info("Attempting to clean up reader group entry from checkpoint store");
                    this.checkpointStore.removeReaderGroup(this.actorSystem.getProcess(), this.readerGroup.getGroupName());
                }
                catch (CheckpointStoreException e) {
                    log.warn("Error removing reader group " + this.objectId, (Throwable)e);
                }
                this.readerGroup.close();
                log.info("Shutdown of {} complete", (Object)this.toString());
            }
            finally {
                LoggerHelpers.traceLeave((Logger)log, (String)this.objectId, (String)"shutDown", (long)traceId, (Object[])new Object[0]);
            }
        }
    }

    /*
     * Exception decompiling
     */
    @Override
    public void notifyProcessFailure(String process) throws CheckpointStoreException {
        /*
         * This method has failed to decompile.  When submitting a bug report, please provide this stack trace, and (if you hold appropriate legal rights) the relevant class file.
         * 
         * org.benf.cfr.reader.util.ConfusedCFRException: Tried to end blocks [0[TRYBLOCK], 3[CATCHBLOCK]], but top level block is 2[TRYBLOCK]
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.processEndingBlocks(Op04StructuredStatement.java:435)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.buildNestedBlocks(Op04StructuredStatement.java:484)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op03SimpleStatement.createInitialStructuredBlock(Op03SimpleStatement.java:736)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisInner(CodeAnalyser.java:850)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisOrWrapFail(CodeAnalyser.java:278)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysis(CodeAnalyser.java:201)
         *     at org.benf.cfr.reader.entities.attributes.AttributeCode.analyse(AttributeCode.java:94)
         *     at org.benf.cfr.reader.entities.Method.analyse(Method.java:531)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseMid(ClassFile.java:1055)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseTop(ClassFile.java:942)
         *     at org.benf.cfr.reader.Driver.doJarVersionTypes(Driver.java:257)
         *     at org.benf.cfr.reader.Driver.doJar(Driver.java:139)
         *     at org.benf.cfr.reader.CfrDriverImpl.analyse(CfrDriverImpl.java:76)
         *     at org.benf.cfr.reader.Main.main(Main.java:54)
         */
        throw new IllegalStateException("Decompilation failed");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void changeEventProcessorCount(int count) throws CheckpointStoreException {
        Object object = this.lock;
        synchronized (object) {
            Preconditions.checkState((boolean)this.isRunning(), (Object)this.state().name());
            if (count <= 0) {
                throw new NotImplementedException("Decrease processor count");
            }
            List<String> readerIds = this.createEventProcessors(count);
            readerIds.stream().forEach(readerId -> this.eventProcessorMap.get(readerId).startAsync());
        }
    }

    @Override
    public EventStreamWriter<T> getWriter() {
        return this.writer;
    }

    @Override
    public Set<String> getProcesses() throws CheckpointStoreException {
        return this.checkpointStore.getProcesses();
    }

    @Override
    public void close() throws Exception {
        this.stopAsync();
    }

    public String toString() {
        return this.objectId;
    }

    @SuppressFBWarnings(justification="generated code")
    ConcurrentHashMap<String, EventProcessorCell<T>> getEventProcessorMap() {
        return this.eventProcessorMap;
    }
}

