/*
 * Decompiled with CFR 0.152.
 */
package uk.co.real_logic.aeron.samples;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import uk.co.real_logic.aeron.Aeron;
import uk.co.real_logic.aeron.FragmentAssembler;
import uk.co.real_logic.aeron.Image;
import uk.co.real_logic.aeron.Subscription;
import uk.co.real_logic.aeron.logbuffer.FragmentHandler;
import uk.co.real_logic.aeron.samples.SampleConfiguration;
import uk.co.real_logic.agrona.concurrent.BackoffIdleStrategy;
import uk.co.real_logic.agrona.concurrent.SigInt;

public class MultipleSubscribersWithFragmentAssembly {
    private static final String CHANNEL = SampleConfiguration.CHANNEL;
    private static final int STREAM_ID_1 = SampleConfiguration.STREAM_ID;
    private static final int STREAM_ID_2 = SampleConfiguration.STREAM_ID + 1;
    private static final int FRAGMENT_COUNT_LIMIT = SampleConfiguration.FRAGMENT_COUNT_LIMIT;

    public static void main(String[] args) throws Exception {
        System.out.format("Subscribing to %s on stream ID %d and stream ID %d%n", CHANNEL, STREAM_ID_1, STREAM_ID_2);
        Aeron.Context ctx = new Aeron.Context().availableImageHandler(MultipleSubscribersWithFragmentAssembly::eventAvailableImage).unavailableImageHandler(MultipleSubscribersWithFragmentAssembly::eventUnavailableImage);
        FragmentAssembler dataHandler1 = new FragmentAssembler(MultipleSubscribersWithFragmentAssembly.reassembledStringMessage1(STREAM_ID_1));
        FragmentAssembler dataHandler2 = new FragmentAssembler(MultipleSubscribersWithFragmentAssembly.reassembledStringMessage2(STREAM_ID_2));
        AtomicBoolean running = new AtomicBoolean(true);
        SigInt.register(() -> running.set(false));
        try (Aeron aeron = Aeron.connect((Aeron.Context)ctx);
             Subscription subscription1 = aeron.addSubscription(CHANNEL, STREAM_ID_1);
             Subscription subscription2 = aeron.addSubscription(CHANNEL, STREAM_ID_2);){
            BackoffIdleStrategy idleStrategy = new BackoffIdleStrategy(100L, 10L, TimeUnit.MICROSECONDS.toNanos(1L), TimeUnit.MICROSECONDS.toNanos(100L));
            int idleCount = 0;
            while (running.get()) {
                int fragmentsRead2;
                int fragmentsRead1 = subscription1.poll((FragmentHandler)dataHandler1, FRAGMENT_COUNT_LIMIT);
                if (fragmentsRead1 + (fragmentsRead2 = subscription2.poll((FragmentHandler)dataHandler2, FRAGMENT_COUNT_LIMIT)) == 0) {
                    idleStrategy.idle(idleCount++);
                    continue;
                }
                idleCount = 0;
            }
            System.out.println("Shutting down...");
        }
    }

    public static void eventAvailableImage(Image image) {
        Subscription subscription = image.subscription();
        System.out.format("new image on %s streamId %x sessionId %x from %s%n", subscription.channel(), subscription.streamId(), image.sessionId(), image.sourceIdentity());
    }

    public static void eventUnavailableImage(Image image) {
        Subscription subscription = image.subscription();
        System.out.format("inactive image on %s streamId %d sessionId %x%n", subscription.channel(), subscription.streamId(), image.sessionId());
    }

    public static FragmentHandler reassembledStringMessage1(int streamId) throws Exception {
        return (buffer, offset, length, header) -> {
            byte[] data = new byte[length];
            buffer.getBytes(offset, data);
            System.out.format("message to stream %d from session %x term id %x term offset %d (%d@%d)%n", streamId, header.sessionId(), header.termId(), header.termOffset(), length, offset);
            if (length != 10000) {
                System.out.format("Received message was not assembled properly; received length was %d, but was expecting 10000%n", length);
            }
        };
    }

    public static FragmentHandler reassembledStringMessage2(int streamId) throws Exception {
        return (buffer, offset, length, header) -> {
            byte[] data = new byte[length];
            buffer.getBytes(offset, data);
            System.out.format("message to stream %d from session %x term id %x term offset %d (%d@%d)%n", streamId, header.sessionId(), header.termId(), header.termOffset(), length, offset);
            if (length != 9000) {
                System.out.format("Received message was not assembled properly; received length was %d, but was expecting 9000%n", length);
            }
        };
    }
}

