/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.clients.consumer.internals;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.RetriableCommitFailedException;
import org.apache.kafka.clients.consumer.internals.CoordinatorRequestManager;
import org.apache.kafka.clients.consumer.internals.GroupState;
import org.apache.kafka.clients.consumer.internals.NetworkClientDelegate;
import org.apache.kafka.clients.consumer.internals.RequestManager;
import org.apache.kafka.clients.consumer.internals.RequestState;
import org.apache.kafka.clients.consumer.internals.SubscriptionState;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.message.OffsetCommitRequestData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.OffsetCommitRequest;
import org.apache.kafka.common.requests.OffsetFetchRequest;
import org.apache.kafka.common.requests.OffsetFetchResponse;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Timer;
import org.slf4j.Logger;

public class CommitRequestManager
implements RequestManager {
    private static final String THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED = "internal.throw.on.fetch.stable.offset.unsupported";
    private final SubscriptionState subscriptionState;
    private final Logger log;
    private final Optional<AutoCommitState> autoCommitState;
    private final CoordinatorRequestManager coordinatorRequestManager;
    private final GroupState groupState;
    private final long retryBackoffMs;
    private final boolean throwOnFetchStableOffsetUnsupported;
    final PendingRequests pendingRequests;

    public CommitRequestManager(Time time, LogContext logContext, SubscriptionState subscriptionState, ConsumerConfig config, CoordinatorRequestManager coordinatorRequestManager, GroupState groupState) {
        Objects.requireNonNull(coordinatorRequestManager, "Coordinator is needed upon committing offsets");
        this.log = logContext.logger(this.getClass());
        this.pendingRequests = new PendingRequests();
        if (config.getBoolean("enable.auto.commit").booleanValue()) {
            long autoCommitInterval = Integer.toUnsignedLong(config.getInt("auto.commit.interval.ms"));
            this.autoCommitState = Optional.of(new AutoCommitState(time, autoCommitInterval));
        } else {
            this.autoCommitState = Optional.empty();
        }
        this.coordinatorRequestManager = coordinatorRequestManager;
        this.groupState = groupState;
        this.subscriptionState = subscriptionState;
        this.retryBackoffMs = config.getLong("retry.backoff.ms");
        this.throwOnFetchStableOffsetUnsupported = config.getBoolean(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED);
    }

    @Override
    public NetworkClientDelegate.PollResult poll(long currentTimeMs) {
        this.maybeAutoCommit();
        if (!this.pendingRequests.hasUnsentRequests()) {
            return new NetworkClientDelegate.PollResult(Long.MAX_VALUE, Collections.emptyList());
        }
        return new NetworkClientDelegate.PollResult(Long.MAX_VALUE, Collections.unmodifiableList(this.pendingRequests.drain(currentTimeMs)));
    }

    private void maybeAutoCommit() {
        if (!this.autoCommitState.isPresent()) {
            return;
        }
        AutoCommitState autocommit = this.autoCommitState.get();
        if (!autocommit.canSendAutocommit()) {
            return;
        }
        Map<TopicPartition, OffsetAndMetadata> allConsumedOffsets = this.subscriptionState.allConsumed();
        this.sendAutoCommit(allConsumedOffsets);
        autocommit.resetTimer();
        autocommit.setInflightCommitStatus(true);
    }

    public CompletableFuture<ClientResponse> addOffsetCommitRequest(Map<TopicPartition, OffsetAndMetadata> offsets) {
        return this.pendingRequests.addOffsetCommitRequest(offsets);
    }

    public CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> addOffsetFetchRequest(Set<TopicPartition> partitions) {
        return this.pendingRequests.addOffsetFetchRequest(partitions);
    }

    public void updateAutoCommitTimer(long currentTimeMs) {
        this.autoCommitState.ifPresent(t -> t.ack(currentTimeMs));
    }

    List<OffsetFetchRequestState> unsentOffsetFetchRequests() {
        return this.pendingRequests.unsentOffsetFetches;
    }

    Queue<OffsetCommitRequestState> unsentOffsetCommitRequests() {
        return this.pendingRequests.unsentOffsetCommits;
    }

    CompletableFuture<ClientResponse> sendAutoCommit(Map<TopicPartition, OffsetAndMetadata> allConsumedOffsets) {
        this.log.debug("Enqueuing autocommit offsets: {}", allConsumedOffsets);
        return ((CompletableFuture)this.addOffsetCommitRequest(allConsumedOffsets).whenComplete((response, throwable) -> {
            this.autoCommitState.ifPresent(autoCommitState -> autoCommitState.setInflightCommitStatus(false));
            if (throwable == null) {
                this.log.debug("Completed asynchronous auto-commit of offsets {}", (Object)allConsumedOffsets);
            }
        })).exceptionally(t -> {
            if (t instanceof RetriableCommitFailedException) {
                this.log.debug("Asynchronous auto-commit of offsets {} failed due to retriable error: {}", (Object)allConsumedOffsets, t);
            } else {
                this.log.warn("Asynchronous auto-commit of offsets {} failed: {}", (Object)allConsumedOffsets, (Object)t.getMessage());
            }
            return null;
        });
    }

    class PendingRequests {
        Queue<OffsetCommitRequestState> unsentOffsetCommits = new LinkedList<OffsetCommitRequestState>();
        List<OffsetFetchRequestState> unsentOffsetFetches = new ArrayList<OffsetFetchRequestState>();
        List<OffsetFetchRequestState> inflightOffsetFetches = new ArrayList<OffsetFetchRequestState>();

        PendingRequests() {
        }

        public boolean hasUnsentRequests() {
            return !this.unsentOffsetCommits.isEmpty() || !this.unsentOffsetFetches.isEmpty();
        }

        public CompletableFuture<ClientResponse> addOffsetCommitRequest(Map<TopicPartition, OffsetAndMetadata> offsets) {
            OffsetCommitRequestState request = new OffsetCommitRequestState(offsets, ((CommitRequestManager)CommitRequestManager.this).groupState.groupId, ((CommitRequestManager)CommitRequestManager.this).groupState.groupInstanceId.orElse(null), ((CommitRequestManager)CommitRequestManager.this).groupState.generation);
            this.unsentOffsetCommits.add(request);
            return request.future();
        }

        private CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> addOffsetFetchRequest(OffsetFetchRequestState request) {
            Optional<OffsetFetchRequestState> dupe = this.unsentOffsetFetches.stream().filter(r -> r.sameRequest(request)).findAny();
            Optional<OffsetFetchRequestState> inflight = this.inflightOffsetFetches.stream().filter(r -> r.sameRequest(request)).findAny();
            if (dupe.isPresent() || inflight.isPresent()) {
                CommitRequestManager.this.log.info("Duplicated OffsetFetchRequest: " + request.requestedPartitions);
                dupe.orElseGet(() -> (OffsetFetchRequestState)inflight.get()).chainFuture(request.future);
            } else {
                request.future.whenComplete((r, t) -> {
                    if (!this.inflightOffsetFetches.remove(request)) {
                        CommitRequestManager.this.log.warn("A duplicated, inflight, request was identified, but unable to find it in the outbound buffer:" + request);
                    }
                });
                this.unsentOffsetFetches.add(request);
            }
            return request.future;
        }

        private CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> addOffsetFetchRequest(Set<TopicPartition> partitions) {
            OffsetFetchRequestState request = new OffsetFetchRequestState(partitions, ((CommitRequestManager)CommitRequestManager.this).groupState.generation, CommitRequestManager.this.retryBackoffMs);
            return this.addOffsetFetchRequest(request);
        }

        public List<NetworkClientDelegate.UnsentRequest> drain(long currentTimeMs) {
            ArrayList<NetworkClientDelegate.UnsentRequest> unsentRequests = new ArrayList<NetworkClientDelegate.UnsentRequest>();
            unsentRequests.addAll(this.unsentOffsetCommits.stream().map(OffsetCommitRequestState::toUnsentRequest).collect(Collectors.toList()));
            Map<Boolean, List<OffsetFetchRequestState>> partitionedBySendability = this.unsentOffsetFetches.stream().collect(Collectors.partitioningBy(request -> request.canSendRequest(currentTimeMs)));
            for (OffsetFetchRequestState request2 : partitionedBySendability.get(true)) {
                request2.onSendAttempt(currentTimeMs);
                unsentRequests.add(request2.toUnsentRequest(currentTimeMs));
                this.inflightOffsetFetches.add(request2);
            }
            this.unsentOffsetCommits.clear();
            this.unsentOffsetFetches.clear();
            this.unsentOffsetFetches.addAll((Collection<OffsetFetchRequestState>)partitionedBySendability.get(false));
            return Collections.unmodifiableList(unsentRequests);
        }
    }

    private static class AutoCommitState {
        private final Timer timer;
        private final long autoCommitInterval;
        private boolean hasInflightCommit;

        public AutoCommitState(Time time, long autoCommitInterval) {
            this.autoCommitInterval = autoCommitInterval;
            this.timer = time.timer(autoCommitInterval);
            this.hasInflightCommit = false;
        }

        public boolean canSendAutocommit() {
            return !this.hasInflightCommit && this.timer.isExpired();
        }

        public void resetTimer() {
            this.timer.reset(this.autoCommitInterval);
        }

        public void ack(long currentTimeMs) {
            this.timer.update(currentTimeMs);
        }

        public void setInflightCommitStatus(boolean inflightCommitStatus) {
            this.hasInflightCommit = inflightCommitStatus;
        }
    }

    private class OffsetFetchRequestState
    extends RequestState {
        public final Set<TopicPartition> requestedPartitions;
        public final GroupState.Generation requestedGeneration;
        public CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future;

        public OffsetFetchRequestState(Set<TopicPartition> partitions, GroupState.Generation generation, long retryBackoffMs) {
            super(retryBackoffMs);
            this.requestedPartitions = partitions;
            this.requestedGeneration = generation;
            this.future = new CompletableFuture();
        }

        public boolean sameRequest(OffsetFetchRequestState request) {
            return Objects.equals(this.requestedGeneration, request.requestedGeneration) && this.requestedPartitions.equals(request.requestedPartitions);
        }

        public NetworkClientDelegate.UnsentRequest toUnsentRequest(long currentTimeMs) {
            OffsetFetchRequest.Builder builder = new OffsetFetchRequest.Builder(((CommitRequestManager)CommitRequestManager.this).groupState.groupId, true, new ArrayList<TopicPartition>(this.requestedPartitions), CommitRequestManager.this.throwOnFetchStableOffsetUnsupported);
            NetworkClientDelegate.UnsentRequest unsentRequest = new NetworkClientDelegate.UnsentRequest(builder, CommitRequestManager.this.coordinatorRequestManager.coordinator());
            unsentRequest.future().whenComplete((r, t) -> this.onResponse(currentTimeMs, (OffsetFetchResponse)r.responseBody()));
            return unsentRequest;
        }

        public void onResponse(long currentTimeMs, OffsetFetchResponse response) {
            Errors responseError = response.groupLevelError(((CommitRequestManager)CommitRequestManager.this).groupState.groupId);
            if (responseError != Errors.NONE) {
                this.onFailure(currentTimeMs, responseError);
                return;
            }
            this.onSuccess(currentTimeMs, response);
        }

        private void onFailure(long currentTimeMs, Errors responseError) {
            CommitRequestManager.this.log.debug("Offset fetch failed: {}", (Object)responseError.message());
            if (responseError == Errors.COORDINATOR_LOAD_IN_PROGRESS) {
                this.retry(currentTimeMs);
            } else if (responseError == Errors.NOT_COORDINATOR) {
                CommitRequestManager.this.coordinatorRequestManager.markCoordinatorUnknown(responseError.message(), Time.SYSTEM.milliseconds());
                this.retry(currentTimeMs);
            } else if (responseError == Errors.GROUP_AUTHORIZATION_FAILED) {
                this.future.completeExceptionally(GroupAuthorizationException.forGroupId(((CommitRequestManager)CommitRequestManager.this).groupState.groupId));
            } else {
                this.future.completeExceptionally(new KafkaException("Unexpected error in fetch offset response: " + responseError.message()));
            }
        }

        private void retry(long currentTimeMs) {
            this.onFailedAttempt(currentTimeMs);
            this.onSendAttempt(currentTimeMs);
            CommitRequestManager.this.pendingRequests.addOffsetFetchRequest(this);
        }

        private void onSuccess(long currentTimeMs, OffsetFetchResponse response) {
            HashSet<String> unauthorizedTopics = null;
            Map<TopicPartition, OffsetFetchResponse.PartitionData> responseData = response.partitionDataMap(((CommitRequestManager)CommitRequestManager.this).groupState.groupId);
            HashMap<TopicPartition, OffsetAndMetadata> offsets = new HashMap<TopicPartition, OffsetAndMetadata>(responseData.size());
            HashSet<TopicPartition> unstableTxnOffsetTopicPartitions = new HashSet<TopicPartition>();
            for (Map.Entry<TopicPartition, OffsetFetchResponse.PartitionData> entry : responseData.entrySet()) {
                TopicPartition tp = entry.getKey();
                OffsetFetchResponse.PartitionData partitionData = entry.getValue();
                if (partitionData.hasError()) {
                    Errors error = partitionData.error;
                    CommitRequestManager.this.log.debug("Failed to fetch offset for partition {}: {}", (Object)tp, (Object)error.message());
                    if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
                        this.future.completeExceptionally(new KafkaException("Topic or Partition " + tp + " does not exist"));
                        return;
                    }
                    if (error == Errors.TOPIC_AUTHORIZATION_FAILED) {
                        if (unauthorizedTopics == null) {
                            unauthorizedTopics = new HashSet<String>();
                        }
                        unauthorizedTopics.add(tp.topic());
                        continue;
                    }
                    if (error == Errors.UNSTABLE_OFFSET_COMMIT) {
                        unstableTxnOffsetTopicPartitions.add(tp);
                        continue;
                    }
                    this.future.completeExceptionally(new KafkaException("Unexpected error in fetch offset response for partition " + tp + ": " + error.message()));
                    return;
                }
                if (partitionData.offset >= 0L) {
                    offsets.put(tp, new OffsetAndMetadata(partitionData.offset, partitionData.leaderEpoch, partitionData.metadata));
                    continue;
                }
                CommitRequestManager.this.log.info("Found no committed offset for partition {}", (Object)tp);
                offsets.put(tp, null);
            }
            if (unauthorizedTopics != null) {
                this.future.completeExceptionally(new TopicAuthorizationException(unauthorizedTopics));
            } else if (!unstableTxnOffsetTopicPartitions.isEmpty()) {
                CommitRequestManager.this.log.info("The following partitions still have unstable offsets which are not cleared on the broker side: {}, this could be either transactional offsets waiting for completion, or normal offsets waiting for replication after appending to local log", unstableTxnOffsetTopicPartitions);
                this.retry(currentTimeMs);
            } else {
                this.future.complete(offsets);
            }
        }

        private CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> chainFuture(CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future) {
            return this.future.whenComplete((r, t) -> {
                if (t != null) {
                    future.completeExceptionally((Throwable)t);
                } else {
                    future.complete((Map<TopicPartition, OffsetAndMetadata>)r);
                }
            });
        }
    }

    private class OffsetCommitRequestState {
        private final Map<TopicPartition, OffsetAndMetadata> offsets;
        private final String groupId;
        private final GroupState.Generation generation;
        private final String groupInstanceId;
        private final NetworkClientDelegate.FutureCompletionHandler future;

        public OffsetCommitRequestState(Map<TopicPartition, OffsetAndMetadata> offsets, String groupId, String groupInstanceId, GroupState.Generation generation) {
            this.offsets = offsets;
            this.future = new NetworkClientDelegate.FutureCompletionHandler();
            this.groupId = groupId;
            this.generation = generation;
            this.groupInstanceId = groupInstanceId;
        }

        public CompletableFuture<ClientResponse> future() {
            return this.future.future();
        }

        public NetworkClientDelegate.UnsentRequest toUnsentRequest() {
            HashMap<String, OffsetCommitRequestData.OffsetCommitRequestTopic> requestTopicDataMap = new HashMap<String, OffsetCommitRequestData.OffsetCommitRequestTopic>();
            for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : this.offsets.entrySet()) {
                TopicPartition topicPartition = entry.getKey();
                OffsetAndMetadata offsetAndMetadata = entry.getValue();
                OffsetCommitRequestData.OffsetCommitRequestTopic topic = requestTopicDataMap.getOrDefault(topicPartition.topic(), new OffsetCommitRequestData.OffsetCommitRequestTopic().setName(topicPartition.topic()));
                topic.partitions().add(new OffsetCommitRequestData.OffsetCommitRequestPartition().setPartitionIndex(topicPartition.partition()).setCommittedOffset(offsetAndMetadata.offset()).setCommittedLeaderEpoch(offsetAndMetadata.leaderEpoch().orElse(-1)).setCommittedMetadata(offsetAndMetadata.metadata()));
                requestTopicDataMap.put(topicPartition.topic(), topic);
            }
            OffsetCommitRequest.Builder builder = new OffsetCommitRequest.Builder(new OffsetCommitRequestData().setGroupId(this.groupId).setGenerationId(this.generation.generationId).setMemberId(this.generation.memberId).setGroupInstanceId(this.groupInstanceId).setTopics(new ArrayList<OffsetCommitRequestData.OffsetCommitRequestTopic>(requestTopicDataMap.values())));
            return new NetworkClientDelegate.UnsentRequest(builder, CommitRequestManager.this.coordinatorRequestManager.coordinator(), this.future);
        }
    }
}

