/*
 * Decompiled with CFR 0.152.
 */
package org.apache.camel.component.hbase;

import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.Set;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePropertyKey;
import org.apache.camel.Processor;
import org.apache.camel.component.hbase.HBaseAttribute;
import org.apache.camel.component.hbase.HBaseEndpoint;
import org.apache.camel.component.hbase.HBaseHelper;
import org.apache.camel.component.hbase.mapping.CellMappingStrategy;
import org.apache.camel.component.hbase.model.HBaseCell;
import org.apache.camel.component.hbase.model.HBaseData;
import org.apache.camel.component.hbase.model.HBaseRow;
import org.apache.camel.support.DefaultExchange;
import org.apache.camel.support.ScheduledBatchPollingConsumer;
import org.apache.camel.util.CastUtils;
import org.apache.camel.util.ObjectHelper;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.PageFilter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HBaseConsumer
extends ScheduledBatchPollingConsumer {
    private static final Logger LOG = LoggerFactory.getLogger(HBaseConsumer.class);
    private final HBaseEndpoint endpoint;
    private HBaseRow rowModel;

    public HBaseConsumer(HBaseEndpoint endpoint, Processor processor) {
        super((Endpoint)endpoint, processor);
        this.endpoint = endpoint;
        this.rowModel = endpoint.getRowModel();
    }

    protected int poll() throws Exception {
        try (Table table = this.endpoint.getTable();){
            this.shutdownRunningTask = null;
            this.pendingExchanges = 0;
            LinkedList<DefaultExchange> queue = new LinkedList<DefaultExchange>();
            Scan scan = new Scan();
            LinkedList<Object> filters = new LinkedList<Object>();
            if (this.endpoint.getFilters() != null) {
                filters.addAll(this.endpoint.getFilters());
            }
            if (this.maxMessagesPerPoll > 0) {
                filters.add(new PageFilter((long)this.maxMessagesPerPoll));
            }
            if (!filters.isEmpty()) {
                FilterList compoundFilter = new FilterList(filters);
                scan.setFilter((Filter)compoundFilter);
            }
            if (this.rowModel != null && this.rowModel.getCells() != null) {
                Set<HBaseCell> cellModels = this.rowModel.getCells();
                for (HBaseCell cellModel : cellModels) {
                    scan.addColumn(HBaseHelper.getHBaseFieldAsBytes(cellModel.getFamily()), HBaseHelper.getHBaseFieldAsBytes(cellModel.getQualifier()));
                }
            }
            ResultScanner scanner = table.getScanner(scan);
            int exchangeCount = 0;
            DefaultExchange exchange = new DefaultExchange((Endpoint)this.endpoint);
            exchange.getIn().setHeader("CamelMappingStrategy", (Object)"body");
            CellMappingStrategy mappingStrategy = this.endpoint.getCellMappingStrategyFactory().getStrategy(exchange.getIn());
            Result result = scanner.next();
            while ((exchangeCount < this.maxMessagesPerPoll || this.maxMessagesPerPoll <= 0) && result != null) {
                HBaseData data = new HBaseData();
                HBaseRow resultRow = new HBaseRow();
                resultRow.apply(this.rowModel);
                byte[] row = result.getRow();
                resultRow.setId(this.endpoint.getCamelContext().getTypeConverter().convertTo(this.rowModel.getRowType(), (Object)row));
                List cells = result.listCells();
                if (cells != null) {
                    String family;
                    Set<HBaseCell> cellModels = this.rowModel.getCells();
                    if (!cellModels.isEmpty()) {
                        for (HBaseCell modelCell : cellModels) {
                            HBaseCell resultCell = new HBaseCell();
                            family = modelCell.getFamily();
                            String column = modelCell.getQualifier();
                            resultCell.setValue(this.endpoint.getCamelContext().getTypeConverter().convertTo(modelCell.getValueType(), (Object)result.getValue(HBaseHelper.getHBaseFieldAsBytes(family), HBaseHelper.getHBaseFieldAsBytes(column))));
                            resultCell.setFamily(modelCell.getFamily());
                            resultCell.setQualifier(modelCell.getQualifier());
                            resultRow.getCells().add(resultCell);
                        }
                    } else {
                        for (Cell cell : cells) {
                            String qualifier = new String(CellUtil.cloneQualifier((Cell)cell));
                            family = new String(CellUtil.cloneFamily((Cell)cell));
                            HBaseCell resultCell = new HBaseCell();
                            resultCell.setFamily(family);
                            resultCell.setQualifier(qualifier);
                            resultCell.setValue(this.endpoint.getCamelContext().getTypeConverter().convertTo(String.class, (Object)CellUtil.cloneValue((Cell)cell)));
                            resultRow.getCells().add(resultCell);
                        }
                    }
                    data.getRows().add(resultRow);
                    exchange = this.createExchange(true);
                    exchange.getIn().setHeader("CamelMappingStrategy", (Object)"body");
                    mappingStrategy.applyScanResults(exchange.getIn(), data);
                    exchange.getIn().setHeader(HBaseAttribute.HBASE_MARKED_ROW_ID.asHeader(), (Object)result.getRow());
                    queue.add(exchange);
                    ++exchangeCount;
                }
                result = scanner.next();
            }
            scanner.close();
            int n = queue.isEmpty() ? 0 : this.processBatch(CastUtils.cast(queue));
            return n;
        }
    }

    public int processBatch(Queue<Object> exchanges) throws Exception {
        int total = exchanges.size();
        if (this.maxMessagesPerPoll > 0 && total > this.maxMessagesPerPoll) {
            LOG.debug("Limiting to maximum messages to poll {} as there were {} messages in this poll.", (Object)this.maxMessagesPerPoll, (Object)total);
            total = this.maxMessagesPerPoll;
        }
        for (int index = 0; index < total && this.isBatchAllowed(); ++index) {
            Exchange exchange = (Exchange)ObjectHelper.cast(Exchange.class, (Object)exchanges.poll());
            exchange.setProperty(ExchangePropertyKey.BATCH_INDEX, (Object)index);
            exchange.setProperty(ExchangePropertyKey.BATCH_SIZE, (Object)total);
            exchange.setProperty(ExchangePropertyKey.BATCH_COMPLETE, (Object)(index == total - 1 ? 1 : 0));
            this.pendingExchanges = total - index - 1;
            LOG.trace("Processing exchange [{}]...", (Object)exchange);
            this.getProcessor().process(exchange);
            if (exchange.getException() != null) {
                throw exchange.getException();
            }
            if (!this.endpoint.isRemove()) continue;
            this.remove((byte[])exchange.getIn().getHeader(HBaseAttribute.HBASE_MARKED_ROW_ID.asHeader()));
        }
        return total;
    }

    private void remove(byte[] row) throws IOException {
        try (Table table = this.endpoint.getTable();){
            this.endpoint.getRemoveHandler().remove(table, row);
        }
    }

    public HBaseRow getRowModel() {
        return this.rowModel;
    }

    public void setRowModel(HBaseRow rowModel) {
        this.rowModel = rowModel;
    }
}

