/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.coordinator.group.assignor;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec;
import org.apache.kafka.coordinator.group.assignor.AssignmentSpec;
import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
import org.apache.kafka.coordinator.group.assignor.MemberAssignment;
import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException;
import org.apache.kafka.coordinator.group.assignor.SubscribedTopicDescriber;

public class RangeAssignor
implements PartitionAssignor {
    public static final String RANGE_ASSIGNOR_NAME = "range";

    @Override
    public String name() {
        return RANGE_ASSIGNOR_NAME;
    }

    private Map<Uuid, List<String>> membersPerTopic(AssignmentSpec assignmentSpec, SubscribedTopicDescriber subscribedTopicDescriber) {
        HashMap<Uuid, List<String>> membersPerTopic = new HashMap<Uuid, List<String>>();
        Map<String, AssignmentMemberSpec> membersData = assignmentSpec.members();
        membersData.forEach((memberId, memberMetadata) -> {
            Collection<Uuid> topics = memberMetadata.subscribedTopicIds();
            for (Uuid topicId : topics) {
                if (subscribedTopicDescriber.numPartitions(topicId) == -1) {
                    throw new PartitionAssignorException("Member is subscribed to a non-existent topic");
                }
                membersPerTopic.computeIfAbsent(topicId, k -> new ArrayList()).add(memberId);
            }
        });
        return membersPerTopic;
    }

    @Override
    public GroupAssignment assign(AssignmentSpec assignmentSpec, SubscribedTopicDescriber subscribedTopicDescriber) throws PartitionAssignorException {
        HashMap<String, MemberAssignment> newAssignment = new HashMap<String, MemberAssignment>();
        Map<Uuid, List<String>> membersPerTopic = this.membersPerTopic(assignmentSpec, subscribedTopicDescriber);
        membersPerTopic.forEach((topicId, membersForTopic) -> {
            int remaining;
            int numPartitionsForTopic = subscribedTopicDescriber.numPartitions((Uuid)topicId);
            int minRequiredQuota = numPartitionsForTopic / membersForTopic.size();
            int numMembersWithExtraPartition = numPartitionsForTopic % membersForTopic.size();
            HashSet<Integer> assignedStickyPartitionsForTopic = new HashSet<Integer>();
            ArrayList<MemberWithRemainingAssignments> potentiallyUnfilledMembers = new ArrayList<MemberWithRemainingAssignments>();
            for (String memberId : membersForTopic) {
                Set assignedPartitionsForTopic = assignmentSpec.members().get(memberId).assignedPartitions().getOrDefault(topicId, Collections.emptySet());
                int currentAssignmentSize = assignedPartitionsForTopic.size();
                ArrayList currentAssignmentListForTopic = new ArrayList(assignedPartitionsForTopic);
                if (currentAssignmentSize > 0) {
                    int retainedPartitionsCount = Math.min(currentAssignmentSize, minRequiredQuota);
                    Collections.sort(currentAssignmentListForTopic);
                    for (int i = 0; i < retainedPartitionsCount; ++i) {
                        assignedStickyPartitionsForTopic.add((Integer)currentAssignmentListForTopic.get(i));
                        newAssignment.computeIfAbsent(memberId, k -> new MemberAssignment(new HashMap<Uuid, Set<Integer>>())).targetPartitions().computeIfAbsent((Uuid)topicId, k -> new HashSet()).add((Integer)currentAssignmentListForTopic.get(i));
                    }
                }
                if ((remaining = minRequiredQuota - currentAssignmentSize) < 0 && numMembersWithExtraPartition > 0) {
                    --numMembersWithExtraPartition;
                    assignedStickyPartitionsForTopic.add((Integer)currentAssignmentListForTopic.get(minRequiredQuota));
                    newAssignment.computeIfAbsent(memberId, k -> new MemberAssignment(new HashMap<Uuid, Set<Integer>>())).targetPartitions().computeIfAbsent((Uuid)topicId, k -> new HashSet()).add((Integer)currentAssignmentListForTopic.get(minRequiredQuota));
                    continue;
                }
                MemberWithRemainingAssignments newPair = new MemberWithRemainingAssignments(memberId, remaining);
                potentiallyUnfilledMembers.add(newPair);
            }
            ArrayList<Integer> unassignedPartitionsForTopic = new ArrayList<Integer>();
            for (int i = 0; i < numPartitionsForTopic; ++i) {
                if (assignedStickyPartitionsForTopic.contains(i)) continue;
                unassignedPartitionsForTopic.add(i);
            }
            int unassignedPartitionsListStartPointer = 0;
            for (MemberWithRemainingAssignments pair : potentiallyUnfilledMembers) {
                String memberId = pair.memberId;
                remaining = pair.remaining;
                if (numMembersWithExtraPartition > 0) {
                    ++remaining;
                    --numMembersWithExtraPartition;
                }
                if (remaining <= 0) continue;
                List partitionsToAssign = unassignedPartitionsForTopic.subList(unassignedPartitionsListStartPointer, unassignedPartitionsListStartPointer + remaining);
                unassignedPartitionsListStartPointer += remaining;
                newAssignment.computeIfAbsent(memberId, k -> new MemberAssignment(new HashMap<Uuid, Set<Integer>>())).targetPartitions().computeIfAbsent((Uuid)topicId, k -> new HashSet()).addAll(partitionsToAssign);
            }
        });
        return new GroupAssignment(newAssignment);
    }

    private static class MemberWithRemainingAssignments {
        private final String memberId;
        private final int remaining;

        public MemberWithRemainingAssignments(String memberId, int remaining) {
            this.memberId = memberId;
            this.remaining = remaining;
        }
    }
}

