/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.store.file.mergetree.compact;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.PriorityQueue;
import javax.annotation.Nullable;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.store.file.KeyValue;
import org.apache.flink.table.store.file.mergetree.compact.MergeFunction;
import org.apache.flink.table.store.file.mergetree.compact.MergeFunctionHelper;
import org.apache.flink.table.store.file.utils.RecordReader;
import org.apache.flink.util.Preconditions;

public class SortMergeReader
implements RecordReader<KeyValue> {
    private final List<RecordReader<KeyValue>> nextBatchReaders;
    private final Comparator<RowData> userKeyComparator;
    private final MergeFunctionHelper mergeFunctionHelper;
    private final PriorityQueue<Element> minHeap;
    private final List<Element> polled;

    protected SortMergeReader(List<RecordReader<KeyValue>> readers, Comparator<RowData> userKeyComparator, MergeFunction mergeFunction) {
        this.nextBatchReaders = new ArrayList<RecordReader<KeyValue>>(readers);
        this.userKeyComparator = userKeyComparator;
        this.mergeFunctionHelper = new MergeFunctionHelper(mergeFunction);
        this.minHeap = new PriorityQueue((e1, e2) -> {
            int result = userKeyComparator.compare(((Element)e1).kv.key(), ((Element)e2).kv.key());
            if (result != 0) {
                return result;
            }
            return Long.compare(((Element)e1).kv.sequenceNumber(), ((Element)e2).kv.sequenceNumber());
        });
        this.polled = new ArrayList<Element>();
    }

    public static RecordReader<KeyValue> create(List<RecordReader<KeyValue>> readers, Comparator<RowData> userKeyComparator, MergeFunction mergeFunction) {
        return readers.size() == 1 ? readers.get(0) : new SortMergeReader(readers, userKeyComparator, mergeFunction);
    }

    @Override
    @Nullable
    public RecordReader.RecordIterator<KeyValue> readBatch() throws IOException {
        block0: for (RecordReader<KeyValue> reader : this.nextBatchReaders) {
            KeyValue kv;
            RecordReader.RecordIterator<KeyValue> iterator;
            while (true) {
                if ((iterator = reader.readBatch()) == null) {
                    reader.close();
                    continue block0;
                }
                kv = iterator.next();
                if (kv != null) break;
                iterator.releaseBatch();
            }
            this.minHeap.offer(new Element(kv, iterator, reader));
        }
        this.nextBatchReaders.clear();
        return this.minHeap.isEmpty() ? null : new SortMergeIterator();
    }

    @Override
    public void close() throws IOException {
        for (RecordReader<KeyValue> reader : this.nextBatchReaders) {
            reader.close();
        }
        for (Element element : this.minHeap) {
            element.iterator.releaseBatch();
            element.reader.close();
        }
        for (Element element : this.polled) {
            element.iterator.releaseBatch();
            element.reader.close();
        }
    }

    private static class Element {
        private KeyValue kv;
        private final RecordReader.RecordIterator<KeyValue> iterator;
        private final RecordReader<KeyValue> reader;

        private Element(KeyValue kv, RecordReader.RecordIterator<KeyValue> iterator, RecordReader<KeyValue> reader) {
            this.kv = kv;
            this.iterator = iterator;
            this.reader = reader;
        }

        private boolean update() throws IOException {
            KeyValue nextKv = this.iterator.next();
            if (nextKv == null) {
                return false;
            }
            this.kv = nextKv;
            return true;
        }
    }

    private class SortMergeIterator
    implements RecordReader.RecordIterator<KeyValue> {
        private boolean released = false;

        private SortMergeIterator() {
        }

        @Override
        public KeyValue next() throws IOException {
            RowData mergedValue;
            do {
                boolean hasMore;
                if (hasMore = this.nextImpl()) continue;
                return null;
            } while ((mergedValue = SortMergeReader.this.mergeFunctionHelper.getValue()) == null);
            return ((Element)SortMergeReader.this.polled.get(SortMergeReader.this.polled.size() - 1)).kv.setValue(mergedValue);
        }

        private boolean nextImpl() throws IOException {
            Preconditions.checkState(!this.released, "SortMergeIterator#advanceNext is called after release");
            Preconditions.checkState(SortMergeReader.this.nextBatchReaders.isEmpty(), "SortMergeIterator#advanceNext is called even if the last call returns null. This is a bug.");
            for (Element element : SortMergeReader.this.polled) {
                if (element.update()) {
                    SortMergeReader.this.minHeap.offer(element);
                    continue;
                }
                element.iterator.releaseBatch();
                SortMergeReader.this.nextBatchReaders.add(element.reader);
            }
            SortMergeReader.this.polled.clear();
            if (!SortMergeReader.this.nextBatchReaders.isEmpty()) {
                return false;
            }
            SortMergeReader.this.mergeFunctionHelper.reset();
            RowData key = ((Element)Preconditions.checkNotNull(SortMergeReader.this.minHeap.peek(), "Min heap is empty. This is a bug.")).kv.key();
            while (!SortMergeReader.this.minHeap.isEmpty()) {
                Element element;
                element = (Element)SortMergeReader.this.minHeap.peek();
                if (SortMergeReader.this.userKeyComparator.compare(key, element.kv.key()) != 0) break;
                SortMergeReader.this.minHeap.poll();
                SortMergeReader.this.mergeFunctionHelper.add(element.kv.value());
                SortMergeReader.this.polled.add(element);
            }
            return true;
        }

        @Override
        public void releaseBatch() {
            this.released = true;
        }
    }
}

