/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.io.hbase.sink;

import java.util.List;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.hadoop.hbase.client.Put;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.impl.schema.BooleanSchema;
import org.apache.pulsar.client.impl.schema.DoubleSchema;
import org.apache.pulsar.client.impl.schema.FloatSchema;
import org.apache.pulsar.client.impl.schema.IntSchema;
import org.apache.pulsar.client.impl.schema.LongSchema;
import org.apache.pulsar.client.impl.schema.ShortSchema;
import org.apache.pulsar.client.impl.schema.StringSchema;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.core.annotations.Connector;
import org.apache.pulsar.io.core.annotations.IOType;
import org.apache.pulsar.io.hbase.sink.HbaseAbstractSink;
import org.apache.pulsar.io.hbase.sink.HbaseSinkConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Connector(name="hbase", type=IOType.SINK, help="The HbaseGenericRecordSink is used for moving messages from Pulsar to Hbase.", configClass=HbaseSinkConfig.class)
public class HbaseGenericRecordSink
extends HbaseAbstractSink<GenericRecord> {
    private static final Logger log = LoggerFactory.getLogger(HbaseGenericRecordSink.class);

    @Override
    public void bindValue(Record<GenericRecord> message, List<Put> puts) throws Exception {
        GenericRecord record = (GenericRecord)message.getValue();
        String rowKeyName = this.tableDefinition.getRowKeyName();
        Object rowKeyValue = record.getField(rowKeyName);
        String familyName = this.tableDefinition.getFamilyName();
        byte[] familyValueBytes = this.getBytes(familyName);
        List<String> qualifierNames = this.tableDefinition.getQualifierNames();
        if (CollectionUtils.isNotEmpty(qualifierNames)) {
            Put put = new Put(this.getBytes(rowKeyValue));
            for (String qualifierName : qualifierNames) {
                Object qualifierValue = record.getField(qualifierName);
                if (null == qualifierValue) continue;
                put.addColumn(familyValueBytes, this.getBytes(qualifierName), this.getBytes(qualifierValue));
            }
            if (CollectionUtils.isNotEmpty(put.getFamilyCellMap().values())) {
                puts.add(put);
            }
        }
    }

    private byte[] getBytes(Object value) throws Exception {
        if (value instanceof Integer) {
            return IntSchema.of().encode((Integer)value);
        }
        if (value instanceof Long) {
            return LongSchema.of().encode((Long)value);
        }
        if (value instanceof Double) {
            return DoubleSchema.of().encode((Double)value);
        }
        if (value instanceof Float) {
            return FloatSchema.of().encode((Float)value);
        }
        if (value instanceof Boolean) {
            return BooleanSchema.of().encode((Boolean)value);
        }
        if (value instanceof String) {
            return StringSchema.utf8().encode((String)value);
        }
        if (value instanceof Short) {
            return ShortSchema.of().encode((Short)value);
        }
        throw new Exception("Not support value type, need to add it. " + value.getClass());
    }
}

