/*
 * Decompiled with CFR 0.152.
 */
package reactor.kafka.sender.internals;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Proxy;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Operators;
import reactor.core.publisher.UnicastProcessor;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import reactor.kafka.sender.KafkaOutbound;
import reactor.kafka.sender.KafkaSender;
import reactor.kafka.sender.SenderOptions;
import reactor.kafka.sender.SenderRecord;
import reactor.kafka.sender.SenderResult;
import reactor.kafka.sender.TransactionManager;
import reactor.kafka.sender.internals.ProducerFactory;
import reactor.util.context.Context;

public class DefaultKafkaSender<K, V>
implements KafkaSender<K, V> {
    private static final Logger log = LoggerFactory.getLogger((String)DefaultKafkaSender.class.getName());
    private static final Set<String> DELEGATE_METHODS = new HashSet<String>(Arrays.asList("sendOffsetsToTransaction", "partitionsFor", "metrics", "flush"));
    private final Mono<Producer<K, V>> producerMono;
    private final AtomicBoolean hasProducer = new AtomicBoolean();
    private final SenderOptions<K, V> senderOptions;
    private final DefaultTransactionManager transactionManager;
    private Producer<K, V> producerProxy;

    public DefaultKafkaSender(ProducerFactory producerFactory, SenderOptions<K, V> options) {
        this.senderOptions = options.toImmutable().scheduler(options.isTransactional() ? Schedulers.newSingle((String)options.transactionalId()) : options.scheduler());
        Mono producerMono = Mono.fromCallable(() -> {
            Producer<K, V> producer = producerFactory.createProducer(this.senderOptions);
            if (this.senderOptions.isTransactional()) {
                log.info("Initializing transactions for producer {}", (Object)this.senderOptions.transactionalId());
                producer.initTransactions();
            }
            return producer;
        }).doOnSubscribe(s -> this.hasProducer.set(true)).cache();
        if (this.senderOptions.isTransactional()) {
            this.producerMono = producerMono.publishOn(this.senderOptions.scheduler());
            this.transactionManager = new DefaultTransactionManager();
        } else {
            this.transactionManager = null;
            this.producerMono = producerMono;
        }
    }

    @Override
    public <T> Flux<SenderResult<T>> send(final Publisher<? extends SenderRecord<K, V, T>> records) {
        return this.producerMono.flatMapMany(producer -> new Flux<SenderResult<T>>((Producer)producer){
            final /* synthetic */ Producer val$producer;
            {
                this.val$producer = producer;
            }

            public void subscribe(CoreSubscriber<? super SenderResult<T>> s) {
                Flux senderRecords = Flux.from((Publisher)records);
                senderRecords.subscribe(new SendSubscriber(this.val$producer, s, DefaultKafkaSender.this.senderOptions.stopOnError()));
            }
        }.doOnError(e -> log.trace("Send failed with exception {}", e)).publishOn(this.senderOptions.scheduler(), this.senderOptions.maxInFlight()));
    }

    @Override
    public KafkaOutbound<K, V> createOutbound() {
        return new DefaultKafkaOutbound(this);
    }

    @Override
    public <T> Flux<Flux<SenderResult<T>>> sendTransactionally(Publisher<? extends Publisher<? extends SenderRecord<K, V, T>>> transactionRecords) {
        UnicastProcessor processor = UnicastProcessor.create();
        return Flux.from(transactionRecords).publishOn(this.senderOptions.scheduler(), false, 1).concatMapDelayError(records -> this.transaction((Publisher)records, (UnicastProcessor<Object>)processor), false, 1).window((Publisher)processor).doOnTerminate(() -> processor.onComplete()).doOnCancel(() -> processor.onComplete());
    }

    @Override
    public TransactionManager transactionManager() {
        if (this.transactionManager == null) {
            throw new IllegalStateException("Transactions are not enabled");
        }
        return this.transactionManager;
    }

    @Override
    public <T> Mono<T> doOnProducer(Function<Producer<K, V>, ? extends T> function) {
        return this.producerMono.flatMap(producer -> Mono.create((T sink) -> {
            try {
                Object ret = function.apply(this.producerProxy((Producer<K, V>)producer));
                sink.success(ret);
            }
            catch (Throwable t) {
                sink.error(t);
            }
        }));
    }

    @Override
    public void close() {
        if (this.hasProducer.getAndSet(false)) {
            this.producerMono.doOnNext(producer -> producer.close(this.senderOptions.closeTimeout().toMillis(), TimeUnit.MILLISECONDS)).block();
            if (this.senderOptions.isTransactional()) {
                this.senderOptions.scheduler().dispose();
            }
        }
    }

    private Flux<Object> sendProducerRecords(final Publisher<? extends ProducerRecord<K, V>> records) {
        return this.producerMono.flatMapMany(producer -> new Flux<Object>(){

            public void subscribe(CoreSubscriber<? super Object> s) {
                Flux.from((Publisher)records).subscribe((CoreSubscriber)new SendSubscriberNoResponse(producer, s, DefaultKafkaSender.this.senderOptions.stopOnError()));
            }
        }.doOnError(e -> log.trace("Send failed with exception {}", e)).publishOn(this.senderOptions.scheduler(), this.senderOptions.maxInFlight()));
    }

    private Mono<Void> transaction(Publisher<? extends ProducerRecord<K, V>> transactionRecords) {
        return this.transactionManager().begin().thenMany(this.sendProducerRecords(transactionRecords)).concatWith(this.transactionManager().commit()).onErrorResume(e -> this.transactionManager().abort().then(Mono.error((Throwable)e))).publishOn(this.senderOptions.scheduler()).then();
    }

    private <T> Flux<SenderResult<T>> transaction(Publisher<? extends SenderRecord<K, V, T>> transactionRecords, UnicastProcessor<Object> transactionBoundary) {
        return this.transactionManager().begin().thenMany(this.send(transactionRecords)).concatWith(this.transactionManager().commit()).concatWith((Publisher)Mono.create((T sink) -> {
            transactionBoundary.onNext((Object)this);
            sink.success();
        })).onErrorResume(e -> this.transactionManager().abort().then(Mono.error((Throwable)e))).publishOn(this.senderOptions.scheduler());
    }

    private synchronized Producer<K, V> producerProxy(Producer<K, V> producer) {
        if (this.producerProxy == null) {
            Class[] interfaces = new Class[]{Producer.class};
            InvocationHandler handler = (proxy, method, args) -> {
                if (DELEGATE_METHODS.contains(method.getName())) {
                    try {
                        return method.invoke((Object)producer, args);
                    }
                    catch (InvocationTargetException e) {
                        throw e.getCause();
                    }
                }
                throw new UnsupportedOperationException("Method is not supported: " + method);
            };
            this.producerProxy = (Producer)Proxy.newProxyInstance(Producer.class.getClassLoader(), interfaces, handler);
        }
        return this.producerProxy;
    }

    private class DefaultTransactionManager
    implements TransactionManager {
        private DefaultTransactionManager() {
        }

        @Override
        public <T> Mono<T> begin() {
            return DefaultKafkaSender.this.producerMono.flatMap(p -> Mono.create(sink -> {
                p.beginTransaction();
                log.debug("Begin a new transaction for producer {}", (Object)DefaultKafkaSender.this.senderOptions.transactionalId());
                sink.success();
            }));
        }

        @Override
        public <T> Mono<T> sendOffsets(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId) {
            return DefaultKafkaSender.this.producerMono.flatMap(producer -> Mono.create(sink -> {
                if (!offsets.isEmpty()) {
                    producer.sendOffsetsToTransaction(offsets, consumerGroupId);
                    log.trace("Sent offsets to transaction for producer {}, offsets: {}", (Object)DefaultKafkaSender.this.senderOptions.transactionalId(), (Object)offsets);
                }
                sink.success();
            }));
        }

        @Override
        public <T> Mono<T> commit() {
            return DefaultKafkaSender.this.producerMono.flatMap(producer -> Mono.create(sink -> {
                producer.commitTransaction();
                log.debug("Commit current transaction for producer {}", (Object)DefaultKafkaSender.this.senderOptions.transactionalId());
                sink.success();
            }));
        }

        @Override
        public <T> Mono<T> abort() {
            return DefaultKafkaSender.this.producerMono.flatMap(p -> Mono.create(sink -> {
                p.abortTransaction();
                log.debug("Abort current transaction for producer {}", (Object)DefaultKafkaSender.this.senderOptions.transactionalId());
                sink.success();
            }));
        }

        @Override
        public Scheduler scheduler() {
            return DefaultKafkaSender.this.senderOptions.scheduler();
        }
    }

    private static class KafkaOutboundThen<K, V>
    extends DefaultKafkaOutbound<K, V> {
        private final Mono<Void> thenMono;

        KafkaOutboundThen(DefaultKafkaSender<K, V> sender, KafkaOutbound<K, V> kafkaOutbound, Publisher<Void> thenPublisher) {
            super(sender);
            Mono<Void> parentMono = kafkaOutbound.then();
            this.thenMono = parentMono == Mono.empty() ? Mono.from(thenPublisher) : parentMono.thenEmpty(thenPublisher);
        }

        @Override
        public Mono<Void> then() {
            return this.thenMono;
        }
    }

    private static class DefaultKafkaOutbound<K, V>
    implements KafkaOutbound<K, V> {
        final DefaultKafkaSender<K, V> sender;

        DefaultKafkaOutbound(DefaultKafkaSender<K, V> sender) {
            this.sender = sender;
        }

        @Override
        public KafkaOutbound<K, V> send(Publisher<? extends ProducerRecord<K, V>> records) {
            return this.then((Publisher<Void>)((DefaultKafkaSender)this.sender).sendProducerRecords(records).then());
        }

        @Override
        public KafkaOutbound<K, V> sendTransactionally(Publisher<? extends Publisher<? extends ProducerRecord<K, V>>> transactionRecords) {
            return this.then((Publisher<Void>)Flux.from(transactionRecords).publishOn(((DefaultKafkaSender)this.sender).senderOptions.scheduler()).concatMapDelayError(records -> ((DefaultKafkaSender)this.sender).transaction(records), false, 1));
        }

        @Override
        public KafkaOutbound<K, V> then(Publisher<Void> other) {
            return new KafkaOutboundThen<K, V>(this.sender, this, other);
        }

        @Override
        public Mono<Void> then() {
            return Mono.empty();
        }
    }

    static class Response<T>
    implements SenderResult<T> {
        private final RecordMetadata metadata;
        private final Exception exception;
        private final T correlationMetadata;

        public Response(RecordMetadata metadata, Exception exception, T correlationMetadata) {
            this.metadata = metadata;
            this.exception = exception;
            this.correlationMetadata = correlationMetadata;
        }

        @Override
        public RecordMetadata recordMetadata() {
            return this.metadata;
        }

        @Override
        public Exception exception() {
            return this.exception;
        }

        @Override
        public T correlationMetadata() {
            return this.correlationMetadata;
        }

        public String toString() {
            return String.format("Correlation=%s metadata=%s exception=%s", this.correlationMetadata, this.metadata, this.exception);
        }
    }

    private class SendSubscriberNoResponse
    extends AbstractSendSubscriber<ProducerRecord<K, V>, Object, Void> {
        SendSubscriberNoResponse(Producer<K, V> producer, CoreSubscriber<? super Object> actual, boolean stopOnError) {
            super(producer, actual, stopOnError);
        }

        @Override
        protected void handleResponse(RecordMetadata metadata, Exception e, Void correlation) {
            if (metadata != null) {
                this.actual.onNext((Object)metadata);
            } else {
                this.actual.onNext((Object)e);
            }
        }

        @Override
        protected Void correlationMetadata(ProducerRecord<K, V> request) {
            return null;
        }

        @Override
        protected ProducerRecord<K, V> producerRecord(ProducerRecord<K, V> request) {
            return request;
        }
    }

    private class SendSubscriber<T>
    extends AbstractSendSubscriber<SenderRecord<K, V, T>, SenderResult<T>, T> {
        SendSubscriber(Producer<K, V> producer, CoreSubscriber<? super SenderResult<T>> actual, boolean stopOnError) {
            super(producer, actual, stopOnError);
        }

        @Override
        protected void handleResponse(RecordMetadata metadata, Exception e, T correlation) {
            this.actual.onNext(new Response<T>(metadata, e, correlation));
        }

        @Override
        protected T correlationMetadata(SenderRecord<K, V, T> request) {
            return request.correlationMetadata();
        }

        @Override
        protected ProducerRecord<K, V> producerRecord(SenderRecord<K, V, T> request) {
            return request;
        }
    }

    private abstract class AbstractSendSubscriber<Q, S, C>
    implements CoreSubscriber<Q> {
        protected final CoreSubscriber<? super S> actual;
        private final boolean stopOnError;
        private final Producer<K, V> producer;
        private AtomicInteger inflight;
        AtomicReference<SubscriberState> state;
        private AtomicReference<Throwable> firstException;

        AbstractSendSubscriber(Producer<K, V> producer, CoreSubscriber<? super S> actual, boolean stopOnError) {
            this.producer = producer;
            this.stopOnError = stopOnError;
            this.actual = actual;
            this.state = new AtomicReference<SubscriberState>(SubscriberState.INIT);
            this.inflight = new AtomicInteger();
            this.firstException = new AtomicReference();
        }

        public void onSubscribe(Subscription s) {
            this.state.set(SubscriberState.ACTIVE);
            this.actual.onSubscribe(s);
        }

        public void onNext(Q m) {
            if (this.checkComplete(m)) {
                return;
            }
            this.inflight.incrementAndGet();
            if (Thread.interrupted()) {
                log.trace("Previous operation on this scheduler was interrupted");
            }
            C correlationMetadata = this.correlationMetadata(m);
            try {
                if (DefaultKafkaSender.this.senderOptions.isTransactional()) {
                    log.trace("Transactional send initiated for producer {} in state {} inflight {}: {}", new Object[]{DefaultKafkaSender.this.senderOptions.transactionalId(), this.state, this.inflight, m});
                }
                this.producer.send(this.producerRecord(m), (metadata, exception) -> {
                    try {
                        if (DefaultKafkaSender.this.senderOptions.isTransactional()) {
                            log.trace("Transactional send completed for producer {} in state {} inflight {}: {}", new Object[]{DefaultKafkaSender.this.senderOptions.transactionalId(), this.state, this.inflight, m});
                        }
                        if (exception == null) {
                            this.handleMetadata(metadata, correlationMetadata);
                        } else {
                            this.handleError(exception, correlationMetadata, this.stopOnError);
                        }
                    }
                    catch (Exception e) {
                        this.handleError(e, correlationMetadata, true);
                    }
                    finally {
                        if (this.inflight.decrementAndGet() == 0) {
                            this.maybeComplete();
                        }
                    }
                });
            }
            catch (Exception e) {
                this.inflight.decrementAndGet();
                this.handleError(e, correlationMetadata, true);
            }
        }

        public void onError(Throwable t) {
            log.trace("Sender failed with exception {}", t);
            if (this.state.compareAndSet(SubscriberState.ACTIVE, SubscriberState.COMPLETE) || this.state.compareAndSet(SubscriberState.OUTBOUND_DONE, SubscriberState.COMPLETE)) {
                this.actual.onError(t);
            } else if (this.firstException.compareAndSet(null, t) && this.state.get() == SubscriberState.COMPLETE) {
                Operators.onErrorDropped((Throwable)t, (Context)this.actual.currentContext());
            }
        }

        public void onComplete() {
            if (this.state.compareAndSet(SubscriberState.ACTIVE, SubscriberState.OUTBOUND_DONE) && this.inflight.get() == 0) {
                this.maybeComplete();
            }
        }

        private void maybeComplete() {
            if (this.state.compareAndSet(SubscriberState.OUTBOUND_DONE, SubscriberState.COMPLETE)) {
                Throwable exception = this.firstException.get();
                if (exception != null) {
                    this.actual.onError(exception);
                } else {
                    this.actual.onComplete();
                }
            }
        }

        public void handleMetadata(RecordMetadata metadata, C correlation) {
            if (!this.checkComplete(metadata)) {
                this.handleResponse(metadata, null, correlation);
            }
        }

        public void handleError(Exception e, C correlation, boolean abort) {
            log.error("error {}", (Throwable)e);
            boolean complete = this.checkComplete(e);
            this.firstException.compareAndSet(null, e);
            if (!complete) {
                this.handleResponse(null, e, correlation);
                if (abort || DefaultKafkaSender.this.senderOptions.fatalException(e)) {
                    this.onError(e);
                }
            }
        }

        public <T> boolean checkComplete(T t) {
            boolean complete;
            boolean bl = complete = this.state.get() == SubscriberState.COMPLETE;
            if (complete && this.firstException.get() == null) {
                Operators.onNextDropped(t, (Context)this.actual.currentContext());
            }
            return complete;
        }

        protected abstract void handleResponse(RecordMetadata var1, Exception var2, C var3);

        protected abstract ProducerRecord<K, V> producerRecord(Q var1);

        protected abstract C correlationMetadata(Q var1);
    }

    private static enum SubscriberState {
        INIT,
        ACTIVE,
        OUTBOUND_DONE,
        COMPLETE;

    }
}

