/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.easysearch.source;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Deque;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.seatunnel.easysearch.client.EasysearchClient;
import org.apache.seatunnel.connectors.seatunnel.easysearch.dto.source.ScrollResult;
import org.apache.seatunnel.connectors.seatunnel.easysearch.dto.source.SourceIndexInfo;
import org.apache.seatunnel.connectors.seatunnel.easysearch.serialize.source.DefaultSeaTunnelRowDeserializer;
import org.apache.seatunnel.connectors.seatunnel.easysearch.serialize.source.EasysearchRecord;
import org.apache.seatunnel.connectors.seatunnel.easysearch.serialize.source.SeaTunnelRowDeserializer;
import org.apache.seatunnel.connectors.seatunnel.easysearch.source.EasysearchSourceSplit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class EasysearchSourceReader
implements SourceReader<SeaTunnelRow, EasysearchSourceSplit> {
    private static final Logger log = LoggerFactory.getLogger(EasysearchSourceReader.class);
    private final SeaTunnelRowDeserializer deserializer;
    private final long pollNextWaitTime = 1000L;
    private final ReadonlyConfig pluginConfig;
    SourceReader.Context context;
    Deque<EasysearchSourceSplit> splits = new LinkedList<EasysearchSourceSplit>();
    boolean noMoreSplit;
    private EasysearchClient ezsClient;

    public EasysearchSourceReader(SourceReader.Context context, ReadonlyConfig pluginConfig, SeaTunnelRowType rowTypeInfo) {
        this.context = context;
        this.pluginConfig = pluginConfig;
        this.deserializer = new DefaultSeaTunnelRowDeserializer(rowTypeInfo);
    }

    public void open() {
        this.ezsClient = EasysearchClient.createInstance(this.pluginConfig);
    }

    public void close() throws IOException {
        this.ezsClient.close();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
        Object object = output.getCheckpointLock();
        synchronized (object) {
            EasysearchSourceSplit split = this.splits.poll();
            if (split != null) {
                SourceIndexInfo sourceIndexInfo = split.getSourceIndexInfo();
                ScrollResult scrollResult = this.ezsClient.searchByScroll(sourceIndexInfo.getIndex(), sourceIndexInfo.getSource(), sourceIndexInfo.getQuery(), sourceIndexInfo.getScrollTime(), sourceIndexInfo.getScrollSize());
                this.outputFromScrollResult(scrollResult, sourceIndexInfo.getSource(), output);
                while (scrollResult.getDocs() != null && scrollResult.getDocs().size() > 0) {
                    scrollResult = this.ezsClient.searchWithScrollId(scrollResult.getScrollId(), sourceIndexInfo.getScrollTime());
                    this.outputFromScrollResult(scrollResult, sourceIndexInfo.getSource(), output);
                }
            } else if (this.noMoreSplit) {
                log.info("Closed the bounded Easysearch source");
                this.context.signalNoMoreElement();
            } else {
                Thread.sleep(1000L);
            }
        }
    }

    private void outputFromScrollResult(ScrollResult scrollResult, List<String> source, Collector<SeaTunnelRow> output) {
        for (Map<String, Object> doc : scrollResult.getDocs()) {
            SeaTunnelRow seaTunnelRow = this.deserializer.deserialize(new EasysearchRecord(doc, source));
            output.collect((Object)seaTunnelRow);
        }
    }

    public List<EasysearchSourceSplit> snapshotState(long checkpointId) throws Exception {
        return new ArrayList<EasysearchSourceSplit>(this.splits);
    }

    public void addSplits(List<EasysearchSourceSplit> splits) {
        this.splits.addAll(splits);
    }

    public void handleNoMoreSplits() {
        this.noMoreSplit = true;
    }

    public void notifyCheckpointComplete(long checkpointId) throws Exception {
    }
}

