/*
 * Decompiled with CFR 0.152.
 */
package com.aliyun.openservices.ots.internal.streamclient.core.task;

import com.alicloud.openservices.tablestore.model.GetStreamRecordResponse;
import com.aliyun.openservices.ots.internal.streamclient.DependencyException;
import com.aliyun.openservices.ots.internal.streamclient.StreamClientException;
import com.aliyun.openservices.ots.internal.streamclient.StreamConfig;
import com.aliyun.openservices.ots.internal.streamclient.core.DataFetcher;
import com.aliyun.openservices.ots.internal.streamclient.core.RecordProcessorCheckpointer;
import com.aliyun.openservices.ots.internal.streamclient.core.exceptions.ApplicationException;
import com.aliyun.openservices.ots.internal.streamclient.core.exceptions.ShardEndReachedException;
import com.aliyun.openservices.ots.internal.streamclient.core.task.ITask;
import com.aliyun.openservices.ots.internal.streamclient.core.task.TaskResult;
import com.aliyun.openservices.ots.internal.streamclient.core.task.TaskType;
import com.aliyun.openservices.ots.internal.streamclient.model.IRecordProcessor;
import com.aliyun.openservices.ots.internal.streamclient.model.IShutdownMarker;
import com.aliyun.openservices.ots.internal.streamclient.model.ProcessRecordsInput;
import com.aliyun.openservices.ots.internal.streamclient.model.ShardInfo;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ProcessTask
implements ITask {
    private static final Logger LOG = LoggerFactory.getLogger(ProcessTask.class);
    private final ShardInfo shardInfo;
    private final IRecordProcessor recordProcessor;
    private final RecordProcessorCheckpointer recordProcessorCheckpointer;
    private final DataFetcher dataFetcher;
    private final StreamConfig streamConfig;
    private final IShutdownMarker shutdownMarker;

    public ProcessTask(ShardInfo shardInfo, IRecordProcessor recordProcessor, RecordProcessorCheckpointer recordProcessorCheckpointer, DataFetcher dataFetcher, StreamConfig streamConfig, IShutdownMarker shutdownMarker) {
        this.shardInfo = shardInfo;
        this.recordProcessor = recordProcessor;
        this.recordProcessorCheckpointer = recordProcessorCheckpointer;
        this.dataFetcher = dataFetcher;
        this.streamConfig = streamConfig;
        this.shutdownMarker = shutdownMarker;
    }

    @Override
    public TaskResult call() {
        LOG.debug("Start, ShardId: {}.", (Object)this.shardInfo.getShardId());
        try {
            GetStreamRecordResponse getStreamRecordResult = null;
            try {
                getStreamRecordResult = this.getRecordResult();
            }
            catch (ShardEndReachedException ex) {
                LOG.debug("Complete, ShardEndReached, ShardId: {}.", (Object)this.shardInfo.getShardId());
                return new TaskResult(true);
            }
            List records = getStreamRecordResult.getRecords();
            LOG.debug("GetRecords, ShardId: {}, Num: {}.", (Object)this.shardInfo.getShardId(), (Object)records.size());
            String nextIterator = getStreamRecordResult.getNextShardIterator();
            if (nextIterator != null) {
                this.recordProcessorCheckpointer.setLargestPermittedCheckpointValue(nextIterator);
            } else {
                this.recordProcessorCheckpointer.setLargestPermittedCheckpointValue("SHARD_END");
            }
            ProcessRecordsInput processRecordsInput = new ProcessRecordsInput();
            processRecordsInput.setRecords(records);
            processRecordsInput.setCheckpointer(this.recordProcessorCheckpointer);
            processRecordsInput.setShutdownMarker(this.shutdownMarker);
            try {
                this.recordProcessor.processRecords(processRecordsInput);
            }
            catch (Throwable e) {
                throw new ApplicationException("ApplicationProcessError", e);
            }
            LOG.debug("Complete, ShardId: {}", (Object)this.shardInfo.getShardId());
            return new TaskResult(false);
        }
        catch (Exception e) {
            LOG.warn("ShardId: {}, Exception: {}", (Throwable)e);
            return new TaskResult(e);
        }
    }

    @Override
    public TaskType getTaskType() {
        return TaskType.PROCESS;
    }

    private GetStreamRecordResponse getRecordResult() throws DependencyException, StreamClientException {
        return this.dataFetcher.getRecords(this.streamConfig.getMaxRecords());
    }
}

