/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.elasticsearch.source;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Deque;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsRestClient;
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.source.ScrollResult;
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.source.SourceIndexInfo;
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.serialize.source.DefaultSeaTunnelRowDeserializer;
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.serialize.source.ElasticsearchRecord;
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.serialize.source.SeaTunnelRowDeserializer;
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.source.ElasticsearchSourceSplit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ElasticsearchSourceReader
implements SourceReader<SeaTunnelRow, ElasticsearchSourceSplit> {
    private static final Logger log = LoggerFactory.getLogger(ElasticsearchSourceReader.class);
    SourceReader.Context context;
    private final ReadonlyConfig config;
    private EsRestClient esRestClient;
    private final SeaTunnelRowDeserializer deserializer;
    Deque<ElasticsearchSourceSplit> splits = new LinkedList<ElasticsearchSourceSplit>();
    boolean noMoreSplit;
    private final long pollNextWaitTime = 1000L;

    public ElasticsearchSourceReader(SourceReader.Context context, ReadonlyConfig config, SeaTunnelRowType rowTypeInfo) {
        this.context = context;
        this.config = config;
        this.deserializer = new DefaultSeaTunnelRowDeserializer(rowTypeInfo);
    }

    public void open() {
        this.esRestClient = EsRestClient.createInstance(this.config);
    }

    public void close() throws IOException {
        this.esRestClient.close();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
        Object object = output.getCheckpointLock();
        synchronized (object) {
            ElasticsearchSourceSplit split = this.splits.poll();
            if (split != null) {
                SourceIndexInfo sourceIndexInfo = split.getSourceIndexInfo();
                ScrollResult scrollResult = this.esRestClient.searchByScroll(sourceIndexInfo.getIndex(), sourceIndexInfo.getSource(), sourceIndexInfo.getQuery(), sourceIndexInfo.getScrollTime(), sourceIndexInfo.getScrollSize());
                this.outputFromScrollResult(scrollResult, sourceIndexInfo.getSource(), output);
                while (scrollResult.getDocs() != null && scrollResult.getDocs().size() > 0) {
                    scrollResult = this.esRestClient.searchWithScrollId(scrollResult.getScrollId(), sourceIndexInfo.getScrollTime());
                    this.outputFromScrollResult(scrollResult, sourceIndexInfo.getSource(), output);
                }
            } else if (this.noMoreSplit) {
                log.info("Closed the bounded ELasticsearch source");
                this.context.signalNoMoreElement();
            } else {
                Thread.sleep(1000L);
            }
        }
    }

    private void outputFromScrollResult(ScrollResult scrollResult, List<String> source, Collector<SeaTunnelRow> output) {
        for (Map<String, Object> doc : scrollResult.getDocs()) {
            SeaTunnelRow seaTunnelRow = this.deserializer.deserialize(new ElasticsearchRecord(doc, source));
            output.collect((Object)seaTunnelRow);
        }
    }

    public List<ElasticsearchSourceSplit> snapshotState(long checkpointId) throws Exception {
        return new ArrayList<ElasticsearchSourceSplit>(this.splits);
    }

    public void addSplits(List<ElasticsearchSourceSplit> splits) {
        this.splits.addAll(splits);
    }

    public void handleNoMoreSplits() {
        this.noMoreSplit = true;
    }

    public void notifyCheckpointComplete(long checkpointId) throws Exception {
    }
}

