/*
 * Decompiled with CFR 0.152.
 */
package com.aliyun.datahub.example;

import com.aliyun.datahub.DatahubClient;
import com.aliyun.datahub.DatahubConfiguration;
import com.aliyun.datahub.common.data.RecordSchema;
import com.aliyun.datahub.exception.OffsetResetedException;
import com.aliyun.datahub.exception.OffsetSessionChangedException;
import com.aliyun.datahub.exception.SubscriptionOfflineException;
import com.aliyun.datahub.model.GetCursorRequest;
import com.aliyun.datahub.model.GetCursorResult;
import com.aliyun.datahub.model.GetRecordsResult;
import com.aliyun.datahub.model.OffsetContext;
import com.aliyun.datahub.model.RecordEntry;
import java.util.List;

class Consumer
extends Thread {
    private String projectName = null;
    private String topicName = null;
    private String subId = null;
    private String shardId = null;
    private RecordSchema schema = null;
    private DatahubClient client = null;

    public Consumer(String projectName, String topicName, String subId, String shardId, RecordSchema schema, DatahubConfiguration conf) {
        this.projectName = projectName;
        this.topicName = topicName;
        this.subId = subId;
        this.shardId = shardId;
        this.schema = schema;
        this.client = new DatahubClient(conf);
    }

    private void commit(OffsetContext offsetCtx) {
        this.client.commitOffset(offsetCtx);
        System.out.println("commit offset suc! offset context: " + offsetCtx.toObjectNode().toString());
    }

    @Override
    public void run() {
        try {
            GetCursorResult cursorResult;
            boolean bExit = false;
            OffsetContext offsetCtx = this.client.initOffsetContext(this.projectName, this.topicName, this.subId, this.shardId);
            String cursor = null;
            if (!offsetCtx.hasOffset()) {
                cursorResult = this.client.getCursor(this.projectName, this.topicName, this.shardId, GetCursorRequest.CursorType.OLDEST);
                cursor = cursorResult.getCursor();
            } else {
                cursorResult = this.client.getCursor(this.projectName, this.topicName, this.shardId, GetCursorRequest.CursorType.SEQUENCE, offsetCtx.getOffset().getSequence() + 1L);
                cursor = cursorResult.getCursor();
            }
            System.out.println("Start consume shard:" + this.shardId + ", start offset:" + offsetCtx.toObjectNode().toString() + ", cursor:" + cursor);
            long recordNum = 0L;
            while (!bExit) {
                try {
                    GetRecordsResult recordResult = this.client.getRecords(this.projectName, this.topicName, this.shardId, cursor, 10, this.schema);
                    List<RecordEntry> records = recordResult.getRecords();
                    if (records.size() == 0) {
                        this.commit(offsetCtx);
                        Thread.sleep(1000L);
                        System.out.println("sleep 1s and continue consume records! shard id:" + this.shardId);
                        continue;
                    }
                    for (RecordEntry record : records) {
                        offsetCtx.setOffset(record.getOffset());
                        if (++recordNum % 100L != 0L) continue;
                        this.commit(offsetCtx);
                    }
                    cursor = recordResult.getNextCursor();
                }
                catch (SubscriptionOfflineException e) {
                    bExit = true;
                    e.printStackTrace();
                }
                catch (OffsetResetedException e) {
                    this.client.updateOffsetContext(offsetCtx);
                    cursor = this.client.getCursor(this.projectName, this.topicName, this.shardId, GetCursorRequest.CursorType.SEQUENCE, offsetCtx.getOffset().getSequence() + 1L).getCursor();
                    System.out.println("Restart consume shard:" + this.shardId + ", reset offset:" + offsetCtx.toObjectNode().toString() + ", cursor:" + cursor);
                }
                catch (OffsetSessionChangedException e) {
                    bExit = true;
                    e.printStackTrace();
                }
                catch (Exception e) {
                    bExit = true;
                    e.printStackTrace();
                }
            }
        }
        catch (Exception e) {
            e.printStackTrace();
        }
    }
}

