/*
 * Decompiled with CFR 0.152.
 */
package com.alicloud.openservices.tablestore.tunnel.worker;

import com.alicloud.openservices.tablestore.core.utils.Preconditions;
import com.alicloud.openservices.tablestore.tunnel.worker.IChannelProcessor;
import com.alicloud.openservices.tablestore.tunnel.worker.ICheckpointer;
import com.alicloud.openservices.tablestore.tunnel.worker.ProcessRecordsInput;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DefaultChannelProcessor
implements IChannelProcessor {
    private static final Logger LOG = LoggerFactory.getLogger(DefaultChannelProcessor.class);
    private final IChannelProcessor recordProcessor;
    private long latestCheckpoint = System.currentTimeMillis();
    private final long checkpointIntervalInMillis;
    private final ICheckpointer checkpointer;

    public DefaultChannelProcessor(IChannelProcessor recordProcessor, ICheckpointer checkpointer, long checkpointIntervalInMillis) {
        Preconditions.checkNotNull(recordProcessor, "Channel record processor cannot be null.");
        Preconditions.checkNotNull(checkpointer, "Checkpointer cannot be null.");
        this.recordProcessor = recordProcessor;
        this.checkpointer = checkpointer;
        this.checkpointIntervalInMillis = checkpointIntervalInMillis;
    }

    @Override
    public void process(ProcessRecordsInput input) {
        this.recordProcessor.process(input);
        if (input.getNextToken() == null || "finished".equals(input.getNextToken())) {
            LOG.info("begin do checkpoint, token = {}", (Object)input.getNextToken());
            try {
                this.checkpointer.checkpoint("finished");
            }
            catch (Exception e) {
                LOG.error("checkpoint error, detail: {}", (Object)e.toString());
            }
        } else if (System.currentTimeMillis() - this.latestCheckpoint > this.checkpointIntervalInMillis) {
            LOG.info("begin do checkpoint, token = {}", (Object)input.getNextToken());
            try {
                this.checkpointer.checkpoint(input.getNextToken());
            }
            catch (Exception e) {
                LOG.error("checkpoint error, detail: {}", (Object)e.toString());
            }
            this.latestCheckpoint = System.currentTimeMillis();
        }
    }

    @Override
    public void shutdown() {
        this.recordProcessor.shutdown();
    }
}

