/*
 * Decompiled with CFR 0.152.
 */
package nablarch.fw.reader;

import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.Map;
import nablarch.core.db.statement.SqlRow;
import nablarch.core.log.Logger;
import nablarch.core.log.LoggerManager;
import nablarch.core.util.annotation.Published;
import nablarch.fw.DataReader;
import nablarch.fw.ExecutionContext;
import nablarch.fw.reader.DatabaseRecordReader;

public class DatabaseTableQueueReader
implements DataReader<SqlRow> {
    private static final Logger LOGGER = LoggerManager.get(DatabaseTableQueueReader.class);
    private final WorkingInputDataHolder workingInputDataHolder = new WorkingInputDataHolder();
    private final DatabaseRecordReader originalReader;
    private final int waitTime;
    private final String[] primaryKeys;
    private boolean closed;

    @Published
    public DatabaseTableQueueReader(DatabaseRecordReader originalReader, int waitTime, String ... primaryKeys) {
        this.originalReader = originalReader;
        this.waitTime = waitTime;
        this.primaryKeys = primaryKeys;
        this.verifyParameter();
    }

    public synchronized SqlRow read(ExecutionContext ctx) {
        SqlRow record;
        InputDataIdentifier inputDataIdentifier;
        if (!this.originalReader.hasNext(ctx)) {
            this.waitThread();
            this.originalReader.reopen(ctx);
        }
        Thread key = Thread.currentThread();
        this.workingInputDataHolder.remove(key);
        do {
            if ((record = this.originalReader.read(ctx)) != null) continue;
            return null;
        } while (this.workingInputDataHolder.isWorkingRequest(inputDataIdentifier = new InputDataIdentifier(this.primaryKeys, record)));
        this.workingInputDataHolder.add(key, inputDataIdentifier);
        this.writeLog(inputDataIdentifier);
        return record;
    }

    @Published
    protected void writeLog(InputDataIdentifier inputDataIdentifier) {
        LOGGER.logInfo("read database record. key info: " + inputDataIdentifier, new Object[0]);
    }

    public synchronized boolean hasNext(ExecutionContext ctx) {
        return !this.closed;
    }

    public synchronized void close(ExecutionContext ctx) {
        this.closed = true;
        this.originalReader.close(ctx);
    }

    private void waitThread() {
        try {
            Thread.sleep(this.waitTime);
        }
        catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    private void verifyParameter() {
        if (this.primaryKeys == null || this.primaryKeys.length == 0) {
            throw new IllegalArgumentException("primary keys must be set.");
        }
        HashSet<String> set = new HashSet<String>(Arrays.asList(this.primaryKeys));
        if (set.size() != this.primaryKeys.length) {
            throw new IllegalArgumentException(String.format("duplicated primary key. must be unique column name. primary keys = %s", Arrays.toString(this.primaryKeys)));
        }
    }

    public DatabaseRecordReader getOriginalReader() {
        return this.originalReader;
    }

    private static final class WorkingInputDataHolder {
        private final Map<Thread, InputDataIdentifier> executingRequests = new HashMap<Thread, InputDataIdentifier>();

        private WorkingInputDataHolder() {
        }

        private void remove(Thread executor) {
            this.executingRequests.remove(executor);
        }

        private void add(Thread executor, InputDataIdentifier inputDataIdentifier) {
            this.executingRequests.put(executor, inputDataIdentifier);
        }

        private boolean isWorkingRequest(InputDataIdentifier inputDataIdentifier) {
            return this.executingRequests.containsValue(inputDataIdentifier);
        }
    }

    public static class InputDataIdentifier
    extends LinkedHashMap<String, Object> {
        public InputDataIdentifier(String[] identifierKeys, SqlRow request) {
            for (String key : identifierKeys) {
                if (!request.containsKey((Object)key)) {
                    throw new IllegalArgumentException("primary key was not found in request. primary key name = [" + key + ']');
                }
                this.put(key, request.get((Object)key));
            }
        }
    }
}

