/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.kafka.connect.impl;

import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.function.FunctionEx;
import com.hazelcast.internal.metrics.DynamicMetricsProvider;
import com.hazelcast.internal.metrics.MetricDescriptor;
import com.hazelcast.internal.metrics.MetricsCollectionContext;
import com.hazelcast.internal.metrics.impl.ProviderHelper;
import com.hazelcast.internal.util.Timer;
import com.hazelcast.jet.Traverser;
import com.hazelcast.jet.Traversers;
import com.hazelcast.jet.Util;
import com.hazelcast.jet.core.AbstractProcessor;
import com.hazelcast.jet.core.BroadcastKey;
import com.hazelcast.jet.core.EventTimeMapper;
import com.hazelcast.jet.core.EventTimePolicy;
import com.hazelcast.jet.core.Processor;
import com.hazelcast.jet.kafka.connect.impl.LocalKafkaConnectStats;
import com.hazelcast.jet.kafka.connect.impl.LocalKafkaConnectStatsImpl;
import com.hazelcast.jet.kafka.connect.impl.SourceConnectorWrapper;
import com.hazelcast.jet.kafka.connect.impl.State;
import com.hazelcast.jet.kafka.connect.impl.processorsupplier.ReadKafkaConnectProcessorSupplier;
import com.hazelcast.jet.retry.RetryStrategy;
import com.hazelcast.logging.ILogger;
import com.hazelcast.logging.Logger;
import com.hazelcast.spi.impl.NodeEngineImpl;
import java.io.Serializable;
import java.time.Duration;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.kafka.connect.source.SourceRecord;

public class ReadKafkaConnectP<T>
extends AbstractProcessor
implements DynamicMetricsProvider {
    private ILogger logger = Logger.getLogger(ReadKafkaConnectP.class);
    private transient SourceConnectorWrapper sourceConnectorWrapper;
    private final EventTimeMapper<T> eventTimeMapper;
    private final FunctionEx<SourceRecord, T> projectionFn;
    private Properties propertiesFromUser;
    private boolean snapshotInProgress;
    private Traverser<Map.Entry<BroadcastKey<String>, ? extends Serializable>> snapshotTraverser;
    private boolean snapshotsEnabled;
    private Traverser<?> traverser = Traversers.empty();
    private final LocalKafkaConnectStatsImpl localKafkaConnectStats = new LocalKafkaConnectStatsImpl();
    private int globalProcessorIndex;
    private int localProcessorIndex;
    private int processorOrder;
    private final AtomicInteger counter = new AtomicInteger();
    private volatile boolean active = true;
    private RetryStrategy retryStrategy;

    public ReadKafkaConnectP(@Nonnull EventTimePolicy<? super T> eventTimePolicy, @Nonnull FunctionEx<SourceRecord, T> projectionFn) {
        Objects.requireNonNull(eventTimePolicy, "eventTimePolicy is required");
        Objects.requireNonNull(projectionFn, "projectionFn is required");
        this.eventTimeMapper = new EventTimeMapper(eventTimePolicy);
        this.projectionFn = projectionFn;
        this.eventTimeMapper.addPartitions(1);
    }

    protected void init(@Nonnull Processor.Context context) {
        this.logger = this.getLogger();
        this.globalProcessorIndex = context.globalProcessorIndex();
        this.localProcessorIndex = context.localProcessorIndex();
        this.logger.info("Entering ReadKafkaConnectP init processorOrder=" + this.processorOrder + " localProcessorIndex= " + this.localProcessorIndex + ", globalProcessorIndex=" + this.globalProcessorIndex + ", snapshotsEnabled=" + this.snapshotsEnabled);
        if (this.sourceConnectorWrapper == null) {
            this.sourceConnectorWrapper = new SourceConnectorWrapper(this.propertiesFromUser, this.processorOrder, context, this.retryStrategy);
            this.sourceConnectorWrapper.setActiveStatusSetter(this::setActive);
        }
        this.snapshotsEnabled = context.snapshottingEnabled();
        NodeEngineImpl nodeEngine = com.hazelcast.jet.impl.util.Util.getNodeEngine((HazelcastInstance)context.hazelcastInstance());
        nodeEngine.getMetricsRegistry().registerDynamicMetricsProvider((DynamicMetricsProvider)this);
    }

    public boolean isCooperative() {
        return false;
    }

    public boolean complete() {
        if (!this.active) {
            this.emitFromTraverser(this.eventTimeMapper.flatMapIdle());
            return false;
        }
        if (!this.configurationReceived()) {
            return false;
        }
        if (this.snapshotInProgress) {
            return false;
        }
        if (!this.emitFromTraverser(this.traverser)) {
            return false;
        }
        if (this.sourceConnectorWrapper.waitNeeded()) {
            return false;
        }
        long start = Timer.nanos();
        List<SourceRecord> sourceRecords = this.sourceConnectorWrapper.poll();
        this.logger.fine("Total polled record size " + this.counter.addAndGet(sourceRecords.size()));
        long durationInNanos = Timer.nanosElapsed((long)start);
        this.localKafkaConnectStats.addSourceRecordPollDuration(Duration.ofNanos(durationInNanos));
        this.localKafkaConnectStats.incrementSourceRecordPoll(sourceRecords.size());
        this.traverser = sourceRecords.isEmpty() ? this.eventTimeMapper.flatMapIdle() : Traversers.traverseIterable(sourceRecords).flatMap(sourceRecord -> {
            long eventTime = sourceRecord.timestamp() == null ? Long.MIN_VALUE : sourceRecord.timestamp();
            Object projectedRecord = this.projectionFn.apply(sourceRecord);
            this.sourceConnectorWrapper.commitRecord((SourceRecord)sourceRecord);
            return this.eventTimeMapper.flatMapEvent(projectedRecord, 0, eventTime);
        });
        this.emitFromTraverser(this.traverser);
        return false;
    }

    boolean configurationReceived() {
        return this.sourceConnectorWrapper.hasTaskConfiguration();
    }

    public boolean saveToSnapshot() {
        this.logger.info("Saving to snapshot for globalProcessorIndex=" + this.globalProcessorIndex + " localProcessorIndex= " + this.localProcessorIndex);
        if (!this.snapshotsEnabled) {
            return true;
        }
        if (!this.emitFromTraverser(this.traverser)) {
            return false;
        }
        this.snapshotInProgress = true;
        if (this.snapshotTraverser == null) {
            this.snapshotTraverser = Traversers.traverseItems((Object[])new Map.Entry[]{Util.entry(this.snapshotKey(), (Object)this.sourceConnectorWrapper.copyState()), Util.entry(this.snapshotKeyWm(), (Object)this.eventTimeMapper.getWatermark(0))}).onFirstNull(() -> {
                this.snapshotTraverser = null;
                this.logger.finest("Finished saving snapshot");
            });
        }
        return this.emitFromTraverserToSnapshot(this.snapshotTraverser);
    }

    private BroadcastKey<String> snapshotKey() {
        return BroadcastKey.broadcastKey((Object)("snapshot-" + this.processorOrder));
    }

    private BroadcastKey<String> snapshotKeyWm() {
        return BroadcastKey.broadcastKey((Object)("snapshotWm-" + this.processorOrder));
    }

    protected void restoreFromSnapshot(@Nonnull Object key, @Nonnull Object value) {
        boolean forThisProcessorWm;
        this.logger.info("Restoring from snapshot with key " + key + " value " + value);
        boolean forThisProcessor = this.snapshotKey().equals(key);
        if (forThisProcessor) {
            this.sourceConnectorWrapper.restoreState((State)value);
        }
        if (forThisProcessorWm = this.snapshotKeyWm().equals(key)) {
            this.eventTimeMapper.restoreWatermark(0, ((Long)value).longValue());
        }
    }

    public boolean snapshotCommitFinish(boolean success) {
        try {
            if (success) {
                this.sourceConnectorWrapper.commit();
            }
        }
        finally {
            this.snapshotInProgress = false;
        }
        return true;
    }

    public void close() {
        if (this.sourceConnectorWrapper != null) {
            this.sourceConnectorWrapper.close();
        }
        this.logger.info("Closed ReadKafkaConnectP");
    }

    public Map<String, LocalKafkaConnectStats> getStats() {
        HashMap<String, LocalKafkaConnectStats> connectStats = new HashMap<String, LocalKafkaConnectStats>();
        if (this.sourceConnectorWrapper != null && this.sourceConnectorWrapper.hasTaskRunner()) {
            connectStats.put(this.sourceConnectorWrapper.getTaskRunnerName(), this.localKafkaConnectStats);
        }
        return connectStats;
    }

    public void provideDynamicMetrics(MetricDescriptor descriptor, MetricsCollectionContext context) {
        if (this.sourceConnectorWrapper != null && this.sourceConnectorWrapper.hasTaskRunner()) {
            descriptor.copy().withTag("task.runner", this.sourceConnectorWrapper.getTaskRunnerName());
        }
        ProviderHelper.provide((MetricDescriptor)descriptor, (MetricsCollectionContext)context, (String)"kafka.connect", this.getStats());
    }

    public void setPropertiesFromUser(Properties propertiesFromUser) {
        this.propertiesFromUser = propertiesFromUser;
    }

    public void setSourceConnectorWrapper(SourceConnectorWrapper sourceConnectorWrapper) {
        this.sourceConnectorWrapper = sourceConnectorWrapper;
    }

    public void setProcessorOrder(int processorOrder) {
        this.processorOrder = processorOrder;
    }

    public void setActive(boolean active) {
        this.active = active;
    }

    private void setRetryStrategy(@Nullable RetryStrategy retryStrategy) {
        this.retryStrategy = retryStrategy;
    }

    EventTimeMapper<T> eventTimeMapper() {
        return this.eventTimeMapper;
    }

    public static <T> ReadKafkaConnectProcessorSupplier processorSupplier(final @Nonnull Properties propertiesFromUser, final @Nonnull EventTimePolicy<? super T> eventTimePolicy, final @Nonnull FunctionEx<SourceRecord, T> projectionFn, final @Nullable RetryStrategy retryStrategy) {
        return new ReadKafkaConnectProcessorSupplier(){
            private static final long serialVersionUID = 1L;

            @Override
            @Nonnull
            public Collection<ReadKafkaConnectP<?>> get(int localParallelismForMember) {
                return IntStream.range(0, localParallelismForMember).mapToObj(i -> {
                    ReadKafkaConnectP processor = new ReadKafkaConnectP(eventTimePolicy, projectionFn);
                    processor.setPropertiesFromUser(propertiesFromUser);
                    processor.setRetryStrategy(retryStrategy);
                    return processor;
                }).collect(Collectors.toList());
            }
        };
    }
}

