/*
 * Decompiled with CFR 0.152.
 */
package io.pravega.connectors.flink;

import io.pravega.client.ClientConfig;
import io.pravega.client.admin.ReaderGroupManager;
import io.pravega.client.stream.Checkpoint;
import io.pravega.client.stream.ReaderGroup;
import io.pravega.client.stream.ReaderGroupConfig;
import io.pravega.connectors.flink.CheckpointSerializer;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.annotation.concurrent.GuardedBy;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class ReaderCheckpointHook
implements MasterTriggerRestoreHook<Checkpoint> {
    private static final Logger log = LoggerFactory.getLogger(ReaderCheckpointHook.class);
    private static final String PRAVEGA_CHECKPOINT_NAME_PREFIX = "PVG-CHK-";
    private static final int DEFAULT_CHECKPOINT_THREAD_POOL_SIZE = 3;
    protected ReaderGroup readerGroup;
    protected ReaderGroupManager readerGroupManager;
    private final String hookUid;
    private final CheckpointSerializer checkpointSerializer;
    private final Time triggerTimeout;
    private final ReaderGroupConfig readerGroupConfig;
    private final Object scheduledExecutorLock = new Object();
    @GuardedBy(value="scheduledExecutorLock")
    private ScheduledExecutorService scheduledExecutorService;

    ReaderCheckpointHook(String hookUid, String readerGroupName, String readerGroupScope, Time triggerTimeout, ClientConfig clientConfig, ReaderGroupConfig readerGroupConfig) {
        this.hookUid = (String)Preconditions.checkNotNull((Object)hookUid);
        this.triggerTimeout = triggerTimeout;
        this.readerGroupConfig = readerGroupConfig;
        this.checkpointSerializer = new CheckpointSerializer();
        this.initializeReaderGroup(readerGroupName, readerGroupScope, clientConfig);
    }

    protected void initializeReaderGroup(String readerGroupName, String readerGroupScope, ClientConfig clientConfig) {
        this.readerGroupManager = ReaderGroupManager.withScope(readerGroupScope, clientConfig);
        this.readerGroupManager.createReaderGroup(readerGroupName, this.readerGroupConfig);
        this.readerGroup = this.readerGroupManager.getReaderGroup(readerGroupName);
    }

    public String getIdentifier() {
        return this.hookUid;
    }

    public CompletableFuture<Checkpoint> triggerCheckpoint(long checkpointId, long checkpointTimestamp, Executor executor) throws Exception {
        this.ensureScheduledExecutorExists();
        String checkpointName = ReaderCheckpointHook.createCheckpointName(checkpointId);
        CompletableFuture<Checkpoint> checkpointResult = this.readerGroup.initiateCheckpoint(checkpointName, this.scheduledExecutorService);
        this.scheduledExecutorService.schedule(() -> checkpointResult.cancel(false), this.triggerTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);
        return checkpointResult;
    }

    public void restoreCheckpoint(long checkpointId, Checkpoint checkpoint) throws Exception {
        if (checkpoint != null) {
            this.readerGroup.resetReaderGroup(ReaderGroupConfig.builder().maxOutstandingCheckpointRequest(this.readerGroupConfig.getMaxOutstandingCheckpointRequest()).groupRefreshTimeMillis(this.readerGroupConfig.getGroupRefreshTimeMillis()).disableAutomaticCheckpoints().startFromCheckpoint(checkpoint).build());
        }
    }

    public void reset() {
        log.info("resetting the reader group to initial state using the RG config {}", (Object)this.readerGroupConfig);
        this.readerGroup.resetReaderGroup(this.readerGroupConfig);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void close() {
        log.info("closing reader group Manager");
        this.readerGroupManager.close();
        log.info("closing the reader group");
        this.readerGroup.close();
        Object object = this.scheduledExecutorLock;
        synchronized (object) {
            if (this.scheduledExecutorService != null) {
                log.info("Closing Scheduled Executor for hook {}", (Object)this.hookUid);
                this.scheduledExecutorService.shutdownNow();
                this.scheduledExecutorService = null;
            }
        }
    }

    public SimpleVersionedSerializer<Checkpoint> createCheckpointDataSerializer() {
        return this.checkpointSerializer;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void ensureScheduledExecutorExists() {
        Object object = this.scheduledExecutorLock;
        synchronized (object) {
            if (this.scheduledExecutorService == null) {
                log.info("Creating Scheduled Executor for hook {}", (Object)this.hookUid);
                this.scheduledExecutorService = this.createScheduledExecutorService();
            }
        }
    }

    protected ScheduledExecutorService createScheduledExecutorService() {
        return Executors.newScheduledThreadPool(3);
    }

    protected ScheduledExecutorService getScheduledExecutorService() {
        return this.scheduledExecutorService;
    }

    static long parseCheckpointId(String checkpointName) {
        Preconditions.checkArgument((boolean)checkpointName.startsWith(PRAVEGA_CHECKPOINT_NAME_PREFIX));
        try {
            return Long.parseLong(checkpointName.substring(PRAVEGA_CHECKPOINT_NAME_PREFIX.length()));
        }
        catch (IndexOutOfBoundsException | NumberFormatException e) {
            throw new IllegalArgumentException(e);
        }
    }

    static String createCheckpointName(long checkpointId) {
        return PRAVEGA_CHECKPOINT_NAME_PREFIX + checkpointId;
    }
}

