/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.clients.consumer.internals;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.FetchSessionHandler;
import org.apache.kafka.clients.consumer.internals.AbstractFetch;
import org.apache.kafka.clients.consumer.internals.ConsumerMetadata;
import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
import org.apache.kafka.clients.consumer.internals.Deserializers;
import org.apache.kafka.clients.consumer.internals.Fetch;
import org.apache.kafka.clients.consumer.internals.FetchBuffer;
import org.apache.kafka.clients.consumer.internals.FetchCollector;
import org.apache.kafka.clients.consumer.internals.FetchConfig;
import org.apache.kafka.clients.consumer.internals.FetchMetricsManager;
import org.apache.kafka.clients.consumer.internals.RequestFuture;
import org.apache.kafka.clients.consumer.internals.RequestFutureListener;
import org.apache.kafka.clients.consumer.internals.SubscriptionState;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Timer;
import org.slf4j.Logger;

public class Fetcher<K, V>
extends AbstractFetch {
    private final Logger log;
    private final ConsumerNetworkClient client;
    private final FetchCollector<K, V> fetchCollector;

    public Fetcher(LogContext logContext, ConsumerNetworkClient client, ConsumerMetadata metadata, SubscriptionState subscriptions, FetchConfig fetchConfig, Deserializers<K, V> deserializers, FetchMetricsManager metricsManager, Time time, ApiVersions apiVersions) {
        super(logContext, metadata, subscriptions, fetchConfig, new FetchBuffer(logContext), metricsManager, time, apiVersions);
        this.log = logContext.logger(Fetcher.class);
        this.client = client;
        this.fetchCollector = new FetchCollector<K, V>(logContext, metadata, subscriptions, fetchConfig, deserializers, metricsManager, time);
    }

    @Override
    protected boolean isUnavailable(Node node) {
        return this.client.isUnavailable(node);
    }

    @Override
    protected void maybeThrowAuthFailure(Node node) {
        this.client.maybeThrowAuthFailure(node);
    }

    public void clearBufferedDataForUnassignedPartitions(Collection<TopicPartition> assignedPartitions) {
        this.fetchBuffer.retainAll(new HashSet<TopicPartition>(assignedPartitions));
    }

    public synchronized int sendFetches() {
        Map<Node, FetchSessionHandler.FetchRequestData> fetchRequests = this.prepareFetchRequests();
        this.sendFetchesInternal(fetchRequests, (fetchTarget, data, clientResponse) -> {
            Fetcher fetcher = this;
            synchronized (fetcher) {
                this.handleFetchSuccess(fetchTarget, data, (ClientResponse)clientResponse);
            }
        }, (fetchTarget, data, error) -> {
            Fetcher fetcher = this;
            synchronized (fetcher) {
                this.handleFetchFailure(fetchTarget, data, (Throwable)error);
            }
        });
        return fetchRequests.size();
    }

    protected void maybeCloseFetchSessions(Timer timer) {
        List<RequestFuture<ClientResponse>> requestFutures = this.sendFetchesInternal(this.prepareCloseFetchSessionRequests(), this::handleCloseFetchSessionSuccess, this::handleCloseFetchSessionFailure);
        while (timer.notExpired() && !requestFutures.stream().allMatch(RequestFuture::isDone)) {
            this.client.poll(timer, null, true);
            timer.update();
        }
        if (!requestFutures.stream().allMatch(RequestFuture::isDone)) {
            this.log.debug("All requests couldn't be sent in the specific timeout period {}ms. This may result in unnecessary fetch sessions at the broker. Consider increasing the timeout passed for KafkaConsumer.close(Duration timeout)", (Object)timer.timeoutMs());
        }
    }

    public Fetch<K, V> collectFetch() {
        return this.fetchCollector.collectFetch(this.fetchBuffer);
    }

    @Override
    protected synchronized void closeInternal(Timer timer) {
        this.client.disableWakeups();
        this.maybeCloseFetchSessions(timer);
        super.closeInternal(timer);
    }

    private List<RequestFuture<ClientResponse>> sendFetchesInternal(Map<Node, FetchSessionHandler.FetchRequestData> fetchRequests, final AbstractFetch.ResponseHandler<ClientResponse> successHandler, final AbstractFetch.ResponseHandler<Throwable> errorHandler) {
        ArrayList<RequestFuture<ClientResponse>> requestFutures = new ArrayList<RequestFuture<ClientResponse>>();
        for (Map.Entry<Node, FetchSessionHandler.FetchRequestData> entry : fetchRequests.entrySet()) {
            final Node fetchTarget = entry.getKey();
            final FetchSessionHandler.FetchRequestData data = entry.getValue();
            FetchRequest.Builder request = this.createFetchRequest(fetchTarget, data);
            RequestFuture<ClientResponse> responseFuture = this.client.send(fetchTarget, request);
            responseFuture.addListener(new RequestFutureListener<ClientResponse>(){

                @Override
                public void onSuccess(ClientResponse resp) {
                    successHandler.handle(fetchTarget, data, resp);
                }

                @Override
                public void onFailure(RuntimeException e) {
                    errorHandler.handle(fetchTarget, data, e);
                }
            });
            requestFutures.add(responseFuture);
        }
        return requestFutures;
    }
}

