/*
 * Decompiled with CFR 0.152.
 */
package com.rabbitmq.stream.impl;

import com.rabbitmq.stream.Codec;
import com.rabbitmq.stream.ConfirmationHandler;
import com.rabbitmq.stream.Message;
import com.rabbitmq.stream.MessageBuilder;
import com.rabbitmq.stream.Producer;
import com.rabbitmq.stream.impl.RoutingStrategy;
import com.rabbitmq.stream.impl.StreamEnvironment;
import com.rabbitmq.stream.impl.StreamProducerBuilder;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class SuperStreamProducer
implements Producer {
    private static final Logger LOGGER = LoggerFactory.getLogger(SuperStreamProducer.class);
    private final RoutingStrategy routingStrategy;
    private final Codec codec;
    private final String superStream;
    private final Map<String, Producer> producers = new ConcurrentHashMap<String, Producer>();
    private final StreamProducerBuilder producerBuilder;

    SuperStreamProducer(StreamProducerBuilder producerBuilder, String superStream, RoutingStrategy routingStrategy, StreamEnvironment streamEnvironment) {
        this.routingStrategy = routingStrategy;
        this.codec = streamEnvironment.codec();
        this.superStream = superStream;
        this.producerBuilder = producerBuilder.duplicate();
        this.producerBuilder.stream(null);
        this.producerBuilder.routing(null, null);
    }

    @Override
    public MessageBuilder messageBuilder() {
        return this.codec.messageBuilder();
    }

    @Override
    public long getLastPublishingId() {
        return 0L;
    }

    @Override
    public void send(Message message, ConfirmationHandler confirmationHandler) {
        String stream = this.routingStrategy.route(message);
        Producer producer = this.producers.computeIfAbsent(stream, stream1 -> this.producerBuilder.duplicate().stream((String)stream1).build());
        producer.send(message, confirmationHandler);
    }

    @Override
    public void close() {
        for (Map.Entry<String, Producer> entry : this.producers.entrySet()) {
            try {
                entry.getValue().close();
            }
            catch (Exception e) {
                LOGGER.info("Error while closing producer for partition {} of super stream {}: {}", new Object[]{entry.getKey(), this.superStream, e.getMessage()});
            }
        }
    }
}

