Skip to content

KAFKA-20292 [2/N]: Handle membership changes during offloaded streams assignments#21727

Open
squah-confluent wants to merge 1 commit intoapache:trunkfrom
confluentinc:squah-kip-1263-handle-assignment-offload-membership-changes-streams
Open

KAFKA-20292 [2/N]: Handle membership changes during offloaded streams assignments#21727
squah-confluent wants to merge 1 commit intoapache:trunkfrom
confluentinc:squah-kip-1263-handle-assignment-offload-membership-changes-streams

Conversation

@squah-confluent
Copy link
Copy Markdown
Contributor

When a member leaves a group, its target assignment is directly removed.
When a static member is replaced, its target assignment is moved to the
new member. These target assignment updates can happen while an
offloaded assignment is being computed.

When an offloaded assignment has finished, we must not emit records for
members that have left the group and we must apply any static member
replacements that occurred to the emitted records, to match the
behavior when assignments are not offloaded. That is, we act as if the
member removals and static member replacements are reordered after the
assignment.

We break up the assignment process into two parts: computing the target
assignment map and generating the records. Only the former can run on
the executor. To generate the records without removed members and with
static member replacements applied we need to use the current state of
the group.

… assignments

When a member leaves a group, its target assignment is directly removed.
When a static member is replaced, its target assignment is moved to the
new member. These target assignment updates can happen while an
offloaded assignment is being computed.

When an offloaded assignment has finished, we must not emit records for
members that have left the group and we must apply any static member
replacements that occurred to the emitted records, to match the
behavior when assignments are not offloaded. That is, we act as if the
member removals and static member replacements are reordered after the
assignment.

We break up the assignment process into two parts: computing the target
assignment map and generating the records. Only the former can run on
the executor. To generate the records without removed members and with
static member replacements applied we need to use the current state of
the group.
@github-actions github-actions bot added triage PRs from the community group-coordinator labels Mar 12, 2026
@github-actions
Copy link
Copy Markdown

A label of 'needs-attention' was automatically added to this PR in order to raise the
attention of the committers. Once this issue has been triaged, the triage label
should be removed to prevent this automation from happening again.

Copy link
Copy Markdown
Contributor

@chickenchickenlove chickenchickenlove left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@squah-confluent
Thanks for the patch!
I left some comments. When you get a chance, please take a look!

Optional<Set<String>> currentMemberIds,
Optional<Map<String, String>> currentStaticMembers
) {
Set<String> memberIds = new HashSet<>(members.keySet());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I may be missing something, but could we guarantee that members.keySet() still reflects the membership at the time the assignment was computed?

Since members comes from StreamsGroup#members() and appears to be a view rather than a snapshot, it seems possible that a member joining after assignment computation could be included when buildRecords() later evaluates new HashSet<>(members.keySet()).

This is likely not an issue today because buildTargetAssignment() and buildRecords() are called synchronously back-to-back, but I was wondering whether this might be worth considering if assignment computation is later offloaded or otherwise made asynchronous.

What do you think?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a very valid point. We also do not want any of the maps and sets given to the assignor to be concurrently modified while computing the assignment, once that is done on a different thread.

I'm planning on having the caller clone all the collections when offloading is enabled, like so

        TargetAssignmentBuilder.ConsumerTargetAssignmentBuilder assignmentResultBuilder =
            new TargetAssignmentBuilder.ConsumerTargetAssignmentBuilder(logContext, group.groupId(), groupEpoch, consumerGroupAssignors.get(preferredServerAssignor))
                .withTime(time)
                .withMembers(offloadAssignor ? new HashMap<>(group.members()) : group.members())
                .withStaticMembers(offloadAssignor ? new HashMap<>(group.staticMembers()) : group.staticMembers())
                .withSubscriptionType(subscriptionType)
                .withTargetAssignment(offloadAssignor ? new HashMap<>(group.targetAssignment()) : group.targetAssignment())
                .withInvertedTargetAssignment(invertedTargetAssignment)
                .withMetadataImage(metadataImage)
                .withResolvedRegularExpressions(offloadAssignor ? new HashMap<>(group.resolvedRegularExpressions()) : group.resolvedRegularExpressions())
                .addOrUpdateMember(updatedMember.memberId(), updatedMember);

The target assignment also has to be cloned, since it will be concurrently modified when a member leaves the group or a static member is replaced.

Additionally the caller will have to discard the offloaded assignment entirely if the group is deleted or empty, since we immediately update the assignment epoch to latest when the last member leaves.

If you can think of a nicer way to structure the code, let me know!

// Build map of replacement member ids for static members that have churned.
Map<String, String> staticMemberIdRemapping = new HashMap<>();
if (currentStaticMembers.isPresent()) {
for (Map.Entry<String, String> entry : staticMembers.entrySet()) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

staticMembers.entrySet() can also be affected from the same reason.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants