/*
 * Decompiled with CFR 0.152.
 */
package com.scylladb.cdc.debezium.connector;

import com.scylladb.cdc.debezium.connector.CollectionId;
import com.scylladb.cdc.debezium.connector.ScyllaOffsetContext;
import com.scylladb.cdc.debezium.connector.ScyllaPartition;
import com.scylladb.cdc.debezium.connector.TaskStateOffsetContext;
import com.scylladb.cdc.model.TaskId;
import com.scylladb.cdc.model.worker.TaskState;
import com.scylladb.cdc.transport.WorkerTransport;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.source.spi.ChangeEventSource;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.spi.Partition;
import io.debezium.util.Clock;
import io.debezium.util.Threads;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;

public class ScyllaWorkerTransport
implements WorkerTransport {
    private final ChangeEventSource.ChangeEventSourceContext context;
    private final ScyllaOffsetContext offsetContext;
    private final EventDispatcher<ScyllaPartition, CollectionId> dispatcher;
    private final Map<TaskId, Threads.Timer> heartbeatTimers;
    private final long heartbeatIntervalMs;

    public ScyllaWorkerTransport(ChangeEventSource.ChangeEventSourceContext context, ScyllaOffsetContext offsetContext, EventDispatcher<ScyllaPartition, CollectionId> dispatcher, long heartbeatIntervalMs) {
        this.context = context;
        this.offsetContext = offsetContext;
        this.dispatcher = dispatcher;
        this.heartbeatTimers = new HashMap<TaskId, Threads.Timer>();
        this.heartbeatIntervalMs = heartbeatIntervalMs;
    }

    public Map<TaskId, TaskState> getTaskStates(Set<TaskId> tasks) {
        HashMap<TaskId, TaskState> result = new HashMap<TaskId, TaskState>();
        tasks.forEach(task -> {
            TaskStateOffsetContext taskStateOffsetContext = this.offsetContext.taskStateOffsetContext((TaskId)task);
            TaskState taskState = taskStateOffsetContext.getTaskState();
            if (taskState != null) {
                result.put((TaskId)task, taskState);
            }
        });
        return result;
    }

    public void setState(TaskId task, TaskState newState) {
    }

    public void moveStateToNextWindow(TaskId task, TaskState newState) {
        Threads.Timer heartbeatTimer = this.heartbeatTimers.computeIfAbsent(task, t -> this.buildHeartbeatTimer());
        TaskStateOffsetContext taskStateOffsetContext = this.offsetContext.taskStateOffsetContext(task);
        taskStateOffsetContext.dataChangeEvent(newState);
        try {
            if (heartbeatTimer.expired()) {
                this.dispatcher.alwaysDispatchHeartbeatEvent((Partition)new ScyllaPartition(this.offsetContext, taskStateOffsetContext.sourceInfo), (OffsetContext)taskStateOffsetContext);
                this.heartbeatTimers.put(task, this.buildHeartbeatTimer());
            }
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
    }

    private Threads.Timer buildHeartbeatTimer() {
        return Threads.timer((Clock)Clock.SYSTEM, (Duration)Duration.ofMillis(this.heartbeatIntervalMs));
    }

    public boolean shouldStop() {
        return !this.context.isRunning();
    }
}

