/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.sink;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.util.MockOutput;
import org.apache.flink.streaming.util.MockStreamConfig;
import org.apache.paimon.append.MultiTableAppendCompactTask;
import org.apache.paimon.catalog.CatalogLoader;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.sink.AppendOnlyMultiTableCompactionWorkerOperator;
import org.apache.paimon.flink.sink.AppendOnlySingleTableCompactionWorkerOperator;
import org.apache.paimon.flink.sink.AppendOnlySingleTableCompactionWorkerOperatorTest;
import org.apache.paimon.flink.sink.MultiTableCommittable;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.TableTestBase;
import org.apache.paimon.table.sink.CommitMessageImpl;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

public class AppendOnlyMultiTableCompactionWorkerOperatorTest
extends TableTestBase {
    private final String[] tables = new String[]{"a", "b"};

    @Test
    public void testAsyncCompactionWorks() throws Exception {
        AppendOnlyMultiTableCompactionWorkerOperator workerOperator = (AppendOnlyMultiTableCompactionWorkerOperator)new AppendOnlyMultiTableCompactionWorkerOperator.Factory((CatalogLoader & Serializable)() -> this.catalog, "user", new Options(), false).createStreamOperator(new StreamOperatorParameters((StreamTask)new SourceOperatorStreamTask((Environment)new DummyEnvironment()), (StreamConfig)new MockStreamConfig(new Configuration(), 1), (Output)new MockOutput(new ArrayList()), null, null, null));
        ArrayList records = new ArrayList();
        for (String table : this.tables) {
            Identifier identifier = this.identifier(table);
            this.createTable(identifier);
            List commitMessages = this.writeData((Table)this.getTable(identifier), 200, 20);
            AppendOnlySingleTableCompactionWorkerOperatorTest.packTask(commitMessages, 5).stream().map(task -> new StreamRecord((Object)new MultiTableAppendCompactTask(task.partition(), task.compactBefore(), identifier))).forEach(records::add);
        }
        Assertions.assertThat((int)records.size()).isEqualTo(8);
        workerOperator.open();
        for (StreamRecord record : records) {
            workerOperator.processElement(record);
        }
        ArrayList committables = new ArrayList();
        Long timeStart = System.currentTimeMillis();
        long timeout = 60000L;
        Assertions.assertThatCode(() -> {
            while (committables.size() != 8) {
                committables.addAll(workerOperator.prepareCommit(false, Long.MAX_VALUE));
                Long now = System.currentTimeMillis();
                if (now - timeStart > timeout && committables.size() != 8) {
                    throw new RuntimeException("Timeout waiting for compaction, maybe some error happens in " + AppendOnlySingleTableCompactionWorkerOperator.class.getName());
                }
                Thread.sleep(1000L);
            }
        }).doesNotThrowAnyException();
        committables.forEach(a -> Assertions.assertThat((((CommitMessageImpl)a.wrappedCommittable()).compactIncrement().compactAfter().size() == 1 ? 1 : 0) != 0).isTrue());
        Set table = committables.stream().map(MultiTableCommittable::getTable).collect(Collectors.toSet());
        Assertions.assertThat(table).hasSameElementsAs(Arrays.asList(this.tables));
    }
}

