/*
 * Decompiled with CFR 0.152.
 */
package com.aliyun.datahub.example;

import com.aliyun.datahub.DatahubClient;
import com.aliyun.datahub.DatahubConfiguration;
import com.aliyun.datahub.auth.AliyunAccount;
import com.aliyun.datahub.common.data.Field;
import com.aliyun.datahub.common.data.FieldType;
import com.aliyun.datahub.common.data.RecordSchema;
import com.aliyun.datahub.common.data.RecordType;
import com.aliyun.datahub.exception.DatahubClientException;
import com.aliyun.datahub.exception.InvalidCursorException;
import com.aliyun.datahub.model.GetCursorRequest;
import com.aliyun.datahub.model.GetRecordsResult;
import com.aliyun.datahub.model.PutRecordsResult;
import com.aliyun.datahub.model.RecordEntry;
import com.aliyun.datahub.model.ShardEntry;
import com.aliyun.datahub.wrapper.Topic;
import java.util.ArrayList;
import java.util.List;

public class DatahubWrapperExample {
    private String accessId = "63wd3dpztlmb5ocdkj94pxmm";
    private String accessKey = "oRd30z7sV4hBX9aYtJgii5qnyhg=";
    private String endpoint = "http://10.101.214.153:9111";
    private DatahubClient client;
    private Topic topic;
    private String projectName = "project_test_example";
    private String topicName = "topic_test_example";

    public DatahubWrapperExample() {
        DatahubConfiguration conf = new DatahubConfiguration(new AliyunAccount(this.accessId, this.accessKey), this.endpoint);
        this.client = new DatahubClient(conf);
    }

    public void init() {
        RecordSchema schema = new RecordSchema();
        schema.addField(new Field("a", FieldType.STRING));
        this.client.createTopic(this.projectName, this.topicName, 3, 3, RecordType.TUPLE, schema, "topic");
        Topic topic = Topic.Builder.build(this.projectName, this.topicName, this.client);
    }

    public void putRecords() {
        List<ShardEntry> shards = this.topic.listShard();
        RecordSchema schema = this.topic.getRecordSchema();
        ArrayList<RecordEntry> recordEntries = new ArrayList<RecordEntry>();
        int recordNum = 10;
        for (int n = 0; n < recordNum; ++n) {
            RecordEntry entry = new RecordEntry(schema);
            for (int i = 0; i < entry.getFieldCount(); ++i) {
                entry.setString(i, String.valueOf(n));
            }
            String shardId = shards.get(n % 3).getShardId();
            entry.setShardId(shardId);
            entry.putAttribute("partition", "ds=2016");
            recordEntries.add(entry);
        }
        int retryCount = 3;
        PutRecordsResult result = this.topic.putRecords(recordEntries, retryCount);
        if (result.getFailedRecordCount() > 0) {
            System.out.println("failed records:");
            for (RecordEntry record : result.getFailedRecords()) {
                System.out.println(record.toJsonNode().toString());
            }
        } else {
            System.out.println("successfully write all records");
        }
    }

    public void getRecords(String shardId) {
        String cursor = this.topic.getCursor(shardId, GetCursorRequest.CursorType.OLDEST);
        RecordSchema schema = this.topic.getRecordSchema();
        int count = 10;
        while (true) {
            try {
                while (true) {
                    GetRecordsResult recordRs = this.topic.getRecords(shardId, cursor, count);
                    List<RecordEntry> recordEntries = recordRs.getRecords();
                    for (RecordEntry entry : recordEntries) {
                        System.out.println(entry.toJsonNode().toString());
                        block14: for (int i = 0; i < schema.getFields().size(); ++i) {
                            Field field = schema.getField(i);
                            switch (field.getType()) {
                                case BIGINT: {
                                    long v = entry.getBigint(i);
                                    continue block14;
                                }
                                case STRING: {
                                    String v = entry.getString(i);
                                    continue block14;
                                }
                                case BOOLEAN: {
                                    boolean v = entry.getBoolean(i);
                                    continue block14;
                                }
                                case DOUBLE: {
                                    double v = entry.getDouble(i);
                                    continue block14;
                                }
                                case TIMESTAMP: {
                                    long v = entry.getTimeStamp(i);
                                    continue block14;
                                }
                            }
                        }
                    }
                    if (cursor.equals(recordRs.getNextCursor())) {
                        try {
                            Thread.sleep(10000L);
                            System.out.println("No data available waiting...");
                        }
                        catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    cursor = recordRs.getNextCursor();
                }
            }
            catch (InvalidCursorException ex) {
                cursor = this.topic.getCursor(shardId, GetCursorRequest.CursorType.OLDEST);
                continue;
            }
            break;
        }
    }

    public static void main(String[] args) {
        DatahubWrapperExample example = new DatahubWrapperExample();
        try {
            example.init();
            example.putRecords();
            example.getRecords("0");
        }
        catch (DatahubClientException e) {
            e.printStackTrace();
        }
    }
}

