/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.partition.hybrid;

import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.locks.Lock;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
import org.apache.flink.runtime.io.network.partition.ResultSubpartition;
import org.apache.flink.runtime.io.network.partition.hybrid.BufferIndexAndChannel;
import org.apache.flink.runtime.io.network.partition.hybrid.BufferWithIdentity;
import org.apache.flink.runtime.io.network.partition.hybrid.HsBufferContext;
import org.apache.flink.runtime.io.network.partition.hybrid.HsDataView;
import org.apache.flink.runtime.io.network.partition.hybrid.HsMemoryDataManagerOperation;
import org.apache.flink.runtime.io.network.partition.hybrid.HsOutputMetrics;
import org.apache.flink.runtime.io.network.partition.hybrid.HsSpillingInfoProvider;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.SupplierWithException;
import org.apache.flink.util.function.ThrowingRunnable;

public class HsSubpartitionMemoryDataManager
implements HsDataView {
    private final int targetChannel;
    private final int bufferSize;
    private final HsMemoryDataManagerOperation memoryDataManagerOperation;
    private final Queue<BufferBuilder> unfinishedBuffers = new LinkedList<BufferBuilder>();
    private int finishedBufferIndex;
    @GuardedBy(value="subpartitionLock")
    private final Deque<HsBufferContext> allBuffers = new LinkedList<HsBufferContext>();
    @GuardedBy(value="subpartitionLock")
    private final Deque<HsBufferContext> unConsumedBuffers = new LinkedList<HsBufferContext>();
    @GuardedBy(value="subpartitionLock")
    private final Map<Integer, HsBufferContext> bufferIndexToContexts = new HashMap<Integer, HsBufferContext>();
    private final Lock resultPartitionLock;
    private final Object subpartitionLock = new Object();
    @Nullable
    private final BufferCompressor bufferCompressor;
    @Nullable
    private HsOutputMetrics outputMetrics;

    HsSubpartitionMemoryDataManager(int targetChannel, int bufferSize, Lock resultPartitionLock, @Nullable BufferCompressor bufferCompressor, HsMemoryDataManagerOperation memoryDataManagerOperation) {
        this.targetChannel = targetChannel;
        this.bufferSize = bufferSize;
        this.resultPartitionLock = resultPartitionLock;
        this.memoryDataManagerOperation = memoryDataManagerOperation;
        this.bufferCompressor = bufferCompressor;
    }

    @Override
    public Buffer.DataType peekNextToConsumeDataType(int nextToConsumeIndex) {
        return this.callWithLock(() -> this.peekNextToConsumeDataTypeInternal(nextToConsumeIndex));
    }

    @Override
    public Optional<ResultSubpartition.BufferAndBacklog> consumeBuffer(int toConsumeIndex) {
        Optional bufferAndNextDataType = this.callWithLock(() -> {
            if (!this.checkFirstUnConsumedBufferIndex(toConsumeIndex)) {
                return Optional.empty();
            }
            HsBufferContext bufferContext = Preconditions.checkNotNull(this.unConsumedBuffers.pollFirst());
            bufferContext.consumed();
            Buffer.DataType nextDataType = this.peekNextToConsumeDataTypeInternal(toConsumeIndex + 1);
            return Optional.of(Tuple2.of(bufferContext, nextDataType));
        });
        bufferAndNextDataType.ifPresent(tuple -> this.memoryDataManagerOperation.onBufferConsumed(((HsBufferContext)tuple.f0).getBufferIndexAndChannel()));
        return bufferAndNextDataType.map(tuple -> new ResultSubpartition.BufferAndBacklog(((HsBufferContext)tuple.f0).getBuffer().readOnlySlice(), this.getBacklog(), (Buffer.DataType)((Object)((Object)tuple.f1)), toConsumeIndex));
    }

    @Override
    public int getBacklog() {
        return this.unConsumedBuffers.size();
    }

    @Override
    public void releaseDataView() {
    }

    public void append(ByteBuffer record, Buffer.DataType dataType) throws InterruptedException {
        if (dataType.isEvent()) {
            this.writeEvent(record, dataType);
        } else {
            this.writeRecord(record, dataType);
        }
    }

    public Deque<BufferIndexAndChannel> getBuffersSatisfyStatus(HsSpillingInfoProvider.SpillStatus spillStatus, HsSpillingInfoProvider.ConsumeStatus consumeStatus) {
        return this.callWithLock(() -> {
            ArrayDeque targetBuffers = new ArrayDeque();
            this.allBuffers.forEach(bufferContext -> {
                if (this.isBufferSatisfyStatus((HsBufferContext)bufferContext, spillStatus, consumeStatus)) {
                    targetBuffers.add(bufferContext.getBufferIndexAndChannel());
                }
            });
            return targetBuffers;
        });
    }

    public List<BufferWithIdentity> spillSubpartitionBuffers(List<BufferIndexAndChannel> toSpill, CompletableFuture<Void> spillDoneFuture) {
        return this.callWithLock(() -> toSpill.stream().map(indexAndChannel -> {
            int bufferIndex = indexAndChannel.getBufferIndex();
            return this.startSpillingBuffer(bufferIndex, spillDoneFuture).map(context -> new BufferWithIdentity(context.getBuffer(), bufferIndex, this.targetChannel));
        }).filter(Optional::isPresent).map(Optional::get).collect(Collectors.toList()));
    }

    public void releaseSubpartitionBuffers(List<BufferIndexAndChannel> toRelease) {
        this.runWithLock(() -> toRelease.forEach(indexAndChannel -> {
            int bufferIndex = indexAndChannel.getBufferIndex();
            HsBufferContext bufferContext = this.bufferIndexToContexts.get(bufferIndex);
            if (bufferContext != null) {
                this.checkAndMarkBufferReadable(bufferContext);
                this.releaseBuffer(bufferIndex);
            }
        }));
    }

    public void setOutputMetrics(HsOutputMetrics outputMetrics) {
        this.outputMetrics = Preconditions.checkNotNull(outputMetrics);
    }

    private void writeEvent(ByteBuffer event, Buffer.DataType dataType) {
        Preconditions.checkArgument(dataType.isEvent());
        this.finishCurrentWritingBufferIfNotEmpty();
        MemorySegment data = MemorySegmentFactory.wrap(event.array());
        NetworkBuffer buffer = new NetworkBuffer(data, FreeingBufferRecycler.INSTANCE, dataType, data.size());
        HsBufferContext bufferContext = new HsBufferContext(buffer, this.finishedBufferIndex, this.targetChannel);
        this.addFinishedBuffer(bufferContext);
        this.memoryDataManagerOperation.onBufferFinished();
    }

    private void writeRecord(ByteBuffer record, Buffer.DataType dataType) throws InterruptedException {
        Preconditions.checkArgument(!dataType.isEvent());
        this.ensureCapacityForRecord(record);
        this.writeRecord(record);
    }

    private void ensureCapacityForRecord(ByteBuffer record) throws InterruptedException {
        int numRecordBytes = record.remaining();
        for (int availableBytes = Optional.ofNullable(this.unfinishedBuffers.peek()).map(currentWritingBuffer -> currentWritingBuffer.getWritableBytes() + this.bufferSize * (this.unfinishedBuffers.size() - 1)).orElse(0).intValue(); availableBytes < numRecordBytes; availableBytes += this.bufferSize) {
            BufferBuilder bufferBuilder = this.memoryDataManagerOperation.requestBufferFromPool();
            this.unfinishedBuffers.add(bufferBuilder);
        }
    }

    private void writeRecord(ByteBuffer record) {
        while (record.hasRemaining()) {
            BufferBuilder currentWritingBuffer = Preconditions.checkNotNull(this.unfinishedBuffers.peek(), "Expect enough capacity for the record.");
            currentWritingBuffer.append(record);
            if (!currentWritingBuffer.isFull()) continue;
            this.finishCurrentWritingBuffer();
        }
    }

    private void finishCurrentWritingBufferIfNotEmpty() {
        BufferBuilder currentWritingBuffer = this.unfinishedBuffers.peek();
        if (currentWritingBuffer == null || currentWritingBuffer.getWritableBytes() == this.bufferSize) {
            return;
        }
        this.finishCurrentWritingBuffer();
    }

    private void finishCurrentWritingBuffer() {
        BufferBuilder currentWritingBuffer = this.unfinishedBuffers.poll();
        if (currentWritingBuffer == null) {
            return;
        }
        currentWritingBuffer.finish();
        BufferConsumer bufferConsumer = currentWritingBuffer.createBufferConsumerFromBeginning();
        Buffer buffer = bufferConsumer.build();
        currentWritingBuffer.close();
        bufferConsumer.close();
        HsBufferContext bufferContext = new HsBufferContext(this.compressBuffersIfPossible(buffer), this.finishedBufferIndex, this.targetChannel);
        this.addFinishedBuffer(bufferContext);
        this.memoryDataManagerOperation.onBufferFinished();
    }

    private Buffer compressBuffersIfPossible(Buffer buffer) {
        if (!this.canBeCompressed(buffer)) {
            return buffer;
        }
        return Preconditions.checkNotNull(this.bufferCompressor).compressToOriginalBuffer(buffer);
    }

    private boolean canBeCompressed(Buffer buffer) {
        return this.bufferCompressor != null && buffer.isBuffer() && buffer.readableBytes() > 0;
    }

    private void addFinishedBuffer(HsBufferContext bufferContext) {
        ++this.finishedBufferIndex;
        boolean needNotify = this.callWithLock(() -> {
            this.allBuffers.add(bufferContext);
            this.unConsumedBuffers.add(bufferContext);
            this.bufferIndexToContexts.put(bufferContext.getBufferIndexAndChannel().getBufferIndex(), bufferContext);
            this.trimHeadingReleasedBuffers(this.unConsumedBuffers);
            this.updateStatistics(bufferContext.getBuffer());
            return this.unConsumedBuffers.size() <= 1;
        });
        if (needNotify) {
            this.memoryDataManagerOperation.onDataAvailable(this.targetChannel);
        }
    }

    @GuardedBy(value="subpartitionLock")
    private Buffer.DataType peekNextToConsumeDataTypeInternal(int nextToConsumeIndex) {
        return this.checkFirstUnConsumedBufferIndex(nextToConsumeIndex) ? Preconditions.checkNotNull(this.unConsumedBuffers.peekFirst()).getBuffer().getDataType() : Buffer.DataType.NONE;
    }

    @GuardedBy(value="subpartitionLock")
    private boolean checkFirstUnConsumedBufferIndex(int expectedBufferIndex) {
        this.trimHeadingReleasedBuffers(this.unConsumedBuffers);
        return !this.unConsumedBuffers.isEmpty() && this.unConsumedBuffers.peekFirst().getBufferIndexAndChannel().getBufferIndex() == expectedBufferIndex;
    }

    @GuardedBy(value="subpartitionLock")
    private void trimHeadingReleasedBuffers(Deque<HsBufferContext> bufferQueue) {
        while (!bufferQueue.isEmpty() && bufferQueue.peekFirst().isReleased()) {
            bufferQueue.removeFirst();
        }
    }

    @GuardedBy(value="subpartitionLock")
    private void releaseBuffer(int bufferIndex) {
        HsBufferContext bufferContext = this.bufferIndexToContexts.remove(bufferIndex);
        if (bufferContext == null) {
            return;
        }
        bufferContext.release();
        this.trimHeadingReleasedBuffers(this.allBuffers);
    }

    @GuardedBy(value="subpartitionLock")
    private Optional<HsBufferContext> startSpillingBuffer(int bufferIndex, CompletableFuture<Void> spillFuture) {
        HsBufferContext bufferContext = this.bufferIndexToContexts.get(bufferIndex);
        if (bufferContext == null) {
            return Optional.empty();
        }
        return bufferContext.startSpilling(spillFuture) ? Optional.of(bufferContext) : Optional.empty();
    }

    @GuardedBy(value="subpartitionLock")
    private void checkAndMarkBufferReadable(HsBufferContext bufferContext) {
        if (this.isBufferSatisfyStatus(bufferContext, HsSpillingInfoProvider.SpillStatus.SPILL, HsSpillingInfoProvider.ConsumeStatus.ALL)) {
            bufferContext.getSpilledFuture().orElseThrow(() -> new IllegalStateException("Buffer in spill status should already set spilled future.")).thenRun(() -> {
                BufferIndexAndChannel bufferIndexAndChannel = bufferContext.getBufferIndexAndChannel();
                this.memoryDataManagerOperation.markBufferReleasedFromFile(bufferIndexAndChannel.getChannel(), bufferIndexAndChannel.getBufferIndex());
            });
        }
    }

    @GuardedBy(value="subpartitionLock")
    private boolean isBufferSatisfyStatus(HsBufferContext bufferContext, HsSpillingInfoProvider.SpillStatus spillStatus, HsSpillingInfoProvider.ConsumeStatus consumeStatus) {
        if (bufferContext.isReleased()) {
            return false;
        }
        boolean match = true;
        switch (spillStatus) {
            case NOT_SPILL: {
                match = !bufferContext.isSpillStarted();
                break;
            }
            case SPILL: {
                match = bufferContext.isSpillStarted();
            }
        }
        switch (consumeStatus) {
            case NOT_CONSUMED: {
                match &= !bufferContext.isConsumed();
                break;
            }
            case CONSUMED: {
                match &= bufferContext.isConsumed();
            }
        }
        return match;
    }

    private void updateStatistics(Buffer buffer) {
        Preconditions.checkNotNull(this.outputMetrics).getNumBuffersOut().inc();
        Preconditions.checkNotNull(this.outputMetrics).getNumBytesOut().inc((long)buffer.readableBytes());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private <E extends Exception> void runWithLock(ThrowingRunnable<E> runnable) throws E {
        try {
            this.resultPartitionLock.lock();
            Object object = this.subpartitionLock;
            synchronized (object) {
                runnable.run();
            }
        }
        finally {
            this.resultPartitionLock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private <R, E extends Exception> R callWithLock(SupplierWithException<R, E> callable) throws E {
        try {
            this.resultPartitionLock.lock();
            Object object = this.subpartitionLock;
            synchronized (object) {
                R r = callable.get();
                return r;
            }
        }
        finally {
            this.resultPartitionLock.unlock();
        }
    }
}

