/*
 * Decompiled with CFR 0.152.
 */
package com.mtfelisb.flink.connectors.elasticsearch.sink;

import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch.core.BulkRequest;
import co.elastic.clients.elasticsearch.core.BulkResponse;
import co.elastic.clients.elasticsearch.core.bulk.BulkOperation;
import co.elastic.clients.elasticsearch.core.bulk.BulkResponseItem;
import com.mtfelisb.flink.connectors.elasticsearch.sink.Emitter;
import com.mtfelisb.flink.connectors.elasticsearch.sink.IBulkRequestFactory;
import com.mtfelisb.flink.connectors.elasticsearch.sink.INetworkConfigFactory;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

public class ElasticsearchSink<T>
extends RichSinkFunction<T>
implements CheckpointedFunction {
    private static final Logger LOG = LogManager.getLogger(ElasticsearchSink.class);
    private transient ElasticsearchClient esClient;
    private final INetworkConfigFactory networkConfigFactory;
    private transient BulkRequest.Builder bulkRequest;
    private final IBulkRequestFactory bulkRequestFactory;
    private final AtomicLong thresholdCounter = new AtomicLong(0L);
    private final Long threshold;
    private final Emitter<T> emitter;

    public ElasticsearchSink(INetworkConfigFactory networkConfigFactory, Emitter<T> emitter, Long threshold, IBulkRequestFactory bulkRequestFactory) {
        this.networkConfigFactory = networkConfigFactory;
        this.emitter = emitter;
        this.threshold = threshold;
        this.bulkRequestFactory = bulkRequestFactory;
    }

    public void open(Configuration parameters) throws Exception {
        this.bulkRequest = this.bulkRequestFactory.create();
        this.esClient = this.networkConfigFactory.create();
    }

    public void invoke(T value, SinkFunction.Context context) throws Exception {
        this.thresholdCounter.getAndAdd(1L);
        this.bulkRequest.operations(op -> this.emitter.emit(value, (BulkOperation.Builder)op, context));
        if (this.thresholdCounter.get() == this.threshold.longValue()) {
            this.flush();
        }
    }

    private void flush() throws IOException {
        if (this.thresholdCounter.get() == 0L) {
            return;
        }
        BulkResponse result = this.esClient.bulk(this.bulkRequest.build());
        if (result.errors()) {
            LOG.error("Bulk had errors");
            for (BulkResponseItem item : result.items()) {
                if (item.error() == null) continue;
                LOG.error(item.error().reason());
            }
        }
        this.bulkRequest = this.bulkRequestFactory.create();
        this.thresholdCounter.set(0L);
        LOG.debug("Ingestion took {}ms of {} items", (Object)result.took(), (Object)result.items().size());
    }

    public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
        this.flush();
    }

    public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
    }

    public void close() throws Exception {
        this.esClient.shutdown();
    }
}

