/*
 * Decompiled with CFR 0.152.
 */
package com.alicloud.openservices.tablestore.reader;

import com.alicloud.openservices.tablestore.AsyncClientInterface;
import com.alicloud.openservices.tablestore.TableStoreCallback;
import com.alicloud.openservices.tablestore.model.RowQueryCriteria;
import com.alicloud.openservices.tablestore.reader.PkWithGroup;
import com.alicloud.openservices.tablestore.reader.PrimaryKeyWithTable;
import com.alicloud.openservices.tablestore.reader.ReaderEvent;
import com.alicloud.openservices.tablestore.reader.ReaderRequestManager;
import com.alicloud.openservices.tablestore.reader.ReaderStatistics;
import com.alicloud.openservices.tablestore.reader.ReqWithGroups;
import com.alicloud.openservices.tablestore.reader.RowReadResult;
import com.alicloud.openservices.tablestore.reader.TableStoreReaderConfig;
import com.lmax.disruptor.EventHandler;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.Semaphore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ReaderEventHandler
implements EventHandler<ReaderEvent> {
    private final Logger logger = LoggerFactory.getLogger(ReaderEventHandler.class);
    private final AsyncClientInterface ots;
    private final TableStoreReaderConfig config;
    private final Executor executor;
    private final Semaphore callbackSemaphore;
    private final Semaphore bucketSemaphore;
    private final int bucketConcurrency;
    private final ReaderRequestManager requestManager;
    private final ReaderStatistics statistics;

    public ReaderEventHandler(AsyncClientInterface ots, TableStoreReaderConfig config, Executor executor, Semaphore callbackSemaphore, TableStoreCallback<PrimaryKeyWithTable, RowReadResult> callback, ReaderStatistics statistics) {
        this.ots = ots;
        this.config = config;
        this.executor = executor;
        this.callbackSemaphore = callbackSemaphore;
        this.bucketSemaphore = new Semaphore(config.getConcurrency());
        this.bucketConcurrency = config.getConcurrency();
        this.statistics = statistics;
        this.requestManager = new ReaderRequestManager(ots, config, callbackSemaphore, callback, executor, this.bucketSemaphore, statistics);
    }

    public void onEvent(ReaderEvent readerEvent, long l, boolean b) throws Exception {
        boolean shouldWaitFlush = false;
        CountDownLatch latch = null;
        ReqWithGroups reqWithGroups = null;
        if (readerEvent.type == ReaderEvent.EventType.FLUSH) {
            this.logger.debug("FlushSignal with QueueSize: {}", (Object)this.requestManager.getTotalPksCount());
            if (this.requestManager.getTotalPksCount() > 0) {
                reqWithGroups = this.requestManager.makeRequest();
            }
            shouldWaitFlush = true;
            latch = readerEvent.latch;
        } else if (readerEvent.type == ReaderEvent.EventType.SEND) {
            this.logger.debug("SendSignal with QueueSize: {}", (Object)this.requestManager.getTotalPksCount());
            if (this.requestManager.getTotalPksCount() > 0) {
                reqWithGroups = this.requestManager.makeRequest();
            }
        } else {
            this.statistics.totalRowsCount.incrementAndGet();
            PrimaryKeyWithTable primaryKeyWithTable = readerEvent.pkWithTable;
            PkWithGroup pkWithGroup = new PkWithGroup(primaryKeyWithTable, readerEvent.readerGroup);
            boolean succeed = this.requestManager.appendPrimaryKey(pkWithGroup);
            if (!succeed) {
                reqWithGroups = this.requestManager.makeRequest();
                this.requestManager.appendPrimaryKey(pkWithGroup);
            }
        }
        if (reqWithGroups != null) {
            final ReqWithGroups finalRequest = reqWithGroups;
            this.bucketSemaphore.acquire();
            this.callbackSemaphore.acquire();
            this.executor.execute(new Runnable(){

                @Override
                public void run() {
                    ((ReaderEventHandler)ReaderEventHandler.this).statistics.totalRequestCount.incrementAndGet();
                    ReaderEventHandler.this.requestManager.sendRequest(finalRequest);
                }
            });
        }
        if (shouldWaitFlush) {
            this.bucketSemaphore.acquire(this.bucketConcurrency);
            this.bucketSemaphore.release(this.bucketConcurrency);
            this.logger.debug("Finish bucket waitFlush.");
            latch.countDown();
        }
    }

    public void setRowQueryCriteria(RowQueryCriteria rowQueryCriteria) {
        this.requestManager.setRowQueryCriteria(rowQueryCriteria);
    }

    public void setCallback(TableStoreCallback<PrimaryKeyWithTable, RowReadResult> callback) {
        this.requestManager.setCallback(callback);
    }
}

