fix(whatsapp): fix conversation duplication on race condition (#193)

* fix(whatsapp): update message source ID handling and improve Redis key formatting

* fix(whatsapp): prevent updating contact_inbox on identifier conflict

* fix(whatsapp): refactor conversation routing tests to use shared examples for better clarity and maintainability
This commit is contained in:
Gabriel Jablonski 2026-01-24 18:13:35 -03:00 committed by GitHub
parent b29637c93a
commit ec8366aabd
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 422 additions and 37 deletions

View File

@ -54,7 +54,12 @@ module Whatsapp::BaileysHandlers::MessagesUpsert # rubocop:disable Metrics/Modul
source_id = extract_from_jid(type: 'lid')
identifier = "#{source_id}@lid"
update_existing_contact_inbox(phone, source_id, identifier) if phone
Whatsapp::ContactInboxConsolidationService.new(
inbox: inbox,
phone: phone,
lid: source_id,
identifier: identifier
).perform
contact_inbox = ::ContactInboxWithContactBuilder.new(
source_id: source_id,
@ -68,26 +73,6 @@ module Whatsapp::BaileysHandlers::MessagesUpsert # rubocop:disable Metrics/Modul
update_contact_info(phone, source_id, identifier)
end
def update_existing_contact_inbox(phone, source_id, identifier)
# NOTE: This is useful when we create a new contact manually, so we don't have information about contact LID;
# With this, when we receive a message from that contact, we can link it properly.
existing_contact_inbox = inbox.contact_inboxes.find_by(source_id: phone)
return unless existing_contact_inbox
return if inbox.contact_inboxes.exists?(source_id: source_id)
existing_contact = existing_contact_inbox.contact
conflicting_identifier = inbox.account.contacts.find_by(identifier: identifier)
conflicting_phone = inbox.account.contacts.find_by(phone_number: "+#{phone}")
return if conflicting_identifier && conflicting_identifier.id != existing_contact.id
return if conflicting_phone && conflicting_phone.id != existing_contact.id
ActiveRecord::Base.transaction do
existing_contact_inbox.update!(source_id: source_id)
existing_contact.update!(identifier: identifier, phone_number: "+#{phone}")
end
end
def update_contact_info(phone, source_id, identifier)
update_params = {}
update_params[:phone_number] = "+#{phone}" if phone

View File

@ -0,0 +1,97 @@
# Handles consolidation of duplicate contact_inboxes for WhatsApp channels.
# This is needed because:
# 1. When a conversation is first created via UI, a contact_inbox is created with source_id = phone
# 2. When the contact responds, the contact_inbox is updated to source_id = LID
# 3. If the conversation is deleted/resolved and a new one is created, a new contact_inbox
# with source_id = phone is created (since the existing one has LID)
# 4. This service consolidates these duplicates when a message arrives
class Whatsapp::ContactInboxConsolidationService
def initialize(inbox:, phone:, lid:, identifier:)
@inbox = inbox
@phone = phone
@lid = lid
@identifier = identifier
end
def perform # rubocop:disable Metrics/CyclomaticComplexity,Metrics/PerceivedComplexity
return unless @phone.present? && @lid.present?
# If phone and lid are the same, no consolidation needed
return if @phone == @lid
phone_contact_inbox = find_phone_contact_inbox
lid_contact_inbox = find_lid_contact_inbox
if phone_contact_inbox && lid_contact_inbox && should_consolidate?(phone_contact_inbox, lid_contact_inbox)
consolidate_contact_inboxes(phone_contact_inbox, lid_contact_inbox)
elsif phone_contact_inbox && lid_contact_inbox.nil?
migrate_phone_to_lid(phone_contact_inbox)
elsif phone_contact_inbox.nil?
# No phone-based contact_inbox exists, try to find contact by phone and update their contact_inbox
update_existing_contact_inbox_by_phone
end
end
private
def find_phone_contact_inbox
@inbox.contact_inboxes.find_by(source_id: @phone)
end
def find_lid_contact_inbox
@inbox.contact_inboxes.find_by(source_id: @lid)
end
def should_consolidate?(phone_contact_inbox, lid_contact_inbox)
phone_contact_inbox.contact_id == lid_contact_inbox.contact_id
end
def consolidate_contact_inboxes(phone_contact_inbox, lid_contact_inbox)
ActiveRecord::Base.transaction do
phone_contact_inbox.conversations.find_each do |conversation|
conversation.update!(contact_inbox_id: lid_contact_inbox.id)
end
phone_contact_inbox.destroy!
end
end
def migrate_phone_to_lid(phone_contact_inbox)
existing_contact = phone_contact_inbox.contact
return if identifier_conflict?(existing_contact)
return if phone_conflict?(existing_contact)
ActiveRecord::Base.transaction do
phone_contact_inbox.update!(source_id: @lid)
existing_contact.update!(identifier: @identifier, phone_number: "+#{@phone}")
end
end
# Find contact by phone number and update their contact_inbox source_id to LID
# This handles the case where contact_inbox has a different source_id (e.g., old format)
def update_existing_contact_inbox_by_phone
existing_contact = @inbox.account.contacts.find_by(phone_number: "+#{@phone}")
return unless existing_contact
existing_contact_inbox = existing_contact.contact_inboxes.find_by(inbox_id: @inbox.id)
return unless existing_contact_inbox
# Don't update if we'd create a duplicate contact_inbox or identifier conflict
return if find_lid_contact_inbox
return if identifier_conflict?(existing_contact)
ActiveRecord::Base.transaction do
existing_contact.update!(identifier: @identifier)
existing_contact_inbox.update!(source_id: @lid)
end
end
def identifier_conflict?(existing_contact)
conflicting = @inbox.account.contacts.find_by(identifier: @identifier)
conflicting.present? && conflicting.id != existing_contact.id
end
def phone_conflict?(existing_contact)
conflicting = @inbox.account.contacts.find_by(phone_number: "+#{@phone}")
conflicting.present? && conflicting.id != existing_contact.id
end
end

View File

@ -76,7 +76,7 @@ module Whatsapp::ZapiHandlers::ReceivedCallback # rubocop:disable Metrics/Module
@raw_message[:senderName] || @raw_message[:chatName] || @raw_message[:phone]
end
def set_contact
def set_contact # rubocop:disable Metrics/MethodLength
push_name = contact_name
source_id = @raw_message[:chatLid].to_s.gsub(/[^\d]/, '')
identifier = @raw_message[:chatLid]
@ -85,7 +85,12 @@ module Whatsapp::ZapiHandlers::ReceivedCallback # rubocop:disable Metrics/Module
unless @raw_message[:phone].ends_with?('@lid')
contact_attributes[:phone_number] = "+#{@raw_message[:phone]}"
update_existing_contact_inbox(@raw_message[:phone], source_id, identifier)
Whatsapp::ContactInboxConsolidationService.new(
inbox: inbox,
phone: @raw_message[:phone],
lid: source_id,
identifier: identifier
).perform
end
contact_inbox = ::ContactInboxWithContactBuilder.new(
@ -102,20 +107,6 @@ module Whatsapp::ZapiHandlers::ReceivedCallback # rubocop:disable Metrics/Module
try_update_contact_avatar
end
def update_existing_contact_inbox(phone, source_id, identifier)
# NOTE: This is useful when we create a new contact manually, so we don't have information about contact LID;
# With this, when we receive a message from that contact, we can link it properly.
existing_contact = inbox.account.contacts.find_by(phone_number: "+#{phone}")
return unless existing_contact
existing_contact_inbox = existing_contact.contact_inboxes.find_by(inbox_id: inbox.id)
ActiveRecord::Base.transaction do
existing_contact.update!(identifier: identifier)
existing_contact_inbox&.update!(source_id: source_id)
end
end
def update_contact_phone_number
return if @contact.phone_number.present?
return if @raw_message[:phone].ends_with?('@lid')

View File

@ -449,4 +449,75 @@ describe Whatsapp::BaileysHandlers::MessagesUpsert do
end
end
end
describe 'conversation duplication after deletion or resolution' do
let(:phone) { '5511912345678' }
let(:lid) { '12345678' }
def build_raw_message(id:, text:)
{
key: { id: id, remoteJid: "#{lid}@lid", remoteJidAlt: "#{phone}@s.whatsapp.net", fromMe: false, addressingMode: 'lid' },
pushName: 'John Doe',
messageTimestamp: timestamp,
message: { conversation: text }
}
end
def build_params(raw_message)
{ webhookVerifyToken: webhook_verify_token, event: 'messages.upsert', data: { type: 'notify', messages: [raw_message] } }
end
shared_examples 'routes messages to the new conversation' do |first_msg_id:, second_msg_id:|
it 'routes incoming messages to the new conversation, not a third one' do
# Step 1: Create contact and first contact_inbox with phone as source_id
contact = create(:contact, account: inbox.account, phone_number: "+#{phone}", identifier: nil)
first_contact_inbox = create(:contact_inbox, inbox: inbox, contact: contact, source_id: phone)
first_conversation = create(:conversation, inbox: inbox, contact: contact, contact_inbox: first_contact_inbox)
# Step 2: Contact responds - this updates contact_inbox source_id from phone to LID
Whatsapp::IncomingMessageBaileysService.new(
inbox: inbox,
params: build_params(build_raw_message(id: first_msg_id, text: 'First response'))
).perform
# Verify message landed in first conversation and source_id migrated
expect(first_conversation.messages.count).to eq(1)
expect(first_contact_inbox.reload.source_id).to eq(lid)
# Step 3: Either delete or resolve the first conversation
close_first_conversation.call(first_conversation)
# Step 4: Create a new conversation (simulating UI creating a new contact_inbox)
second_contact_inbox = create(:contact_inbox, inbox: inbox, contact: contact, source_id: phone)
second_conversation = create(:conversation, inbox: inbox, contact: contact, contact_inbox: second_contact_inbox, status: :open)
expect(inbox.contact_inboxes.where(contact: contact).count).to eq(2)
# Step 5: Contact responds again - should NOT create a third conversation
expect do
Whatsapp::IncomingMessageBaileysService.new(
inbox: inbox,
params: build_params(build_raw_message(id: second_msg_id, text: 'Second response'))
).perform
end.not_to change(Conversation, :count)
# The message should arrive in the second conversation
expect(second_conversation.reload.messages.last.content).to eq('Second response')
# The duplicate contact_inboxes should be consolidated
expect(inbox.contact_inboxes.where(contact: contact).count).to eq(1)
end
end
context 'when a conversation is deleted and a new one is created for the same contact' do
let(:close_first_conversation) { ->(conv) { conv.destroy! } }
it_behaves_like 'routes messages to the new conversation', first_msg_id: 'msg_001', second_msg_id: 'msg_002'
end
context 'when a conversation is resolved and a new one is created for the same contact' do
let(:close_first_conversation) { ->(conv) { conv.update!(status: :resolved) } }
it_behaves_like 'routes messages to the new conversation', first_msg_id: 'msg_003', second_msg_id: 'msg_004'
end
end
end

View File

@ -0,0 +1,170 @@
require 'rails_helper'
describe Whatsapp::ContactInboxConsolidationService do
let!(:whatsapp_channel) do
create(:channel_whatsapp, provider: 'baileys', validate_provider_config: false)
end
let(:inbox) { whatsapp_channel.inbox }
let(:phone) { '5511912345678' }
let(:lid) { '12345678' }
let(:identifier) { "#{lid}@lid" }
describe '#perform' do
context 'when phone is blank' do
it 'does nothing' do
service = described_class.new(inbox: inbox, phone: nil, lid: lid, identifier: identifier)
expect { service.perform }.not_to change(ContactInbox, :count)
end
end
context 'when lid is blank' do
it 'does nothing' do
service = described_class.new(inbox: inbox, phone: phone, lid: nil, identifier: identifier)
expect { service.perform }.not_to change(ContactInbox, :count)
end
end
context 'when phone and lid are the same' do
it 'does nothing' do
contact = create(:contact, account: inbox.account, phone_number: "+#{phone}")
create(:contact_inbox, inbox: inbox, contact: contact, source_id: phone)
service = described_class.new(inbox: inbox, phone: phone, lid: phone, identifier: identifier)
expect { service.perform }.not_to change(ContactInbox, :count)
end
end
context 'when no phone-based contact_inbox exists' do
it 'does nothing' do
service = described_class.new(inbox: inbox, phone: phone, lid: lid, identifier: identifier)
expect { service.perform }.not_to change(ContactInbox, :count)
end
end
context 'when only phone-based contact_inbox exists' do
let!(:contact) { create(:contact, account: inbox.account, phone_number: "+#{phone}") }
let!(:phone_contact_inbox) { create(:contact_inbox, inbox: inbox, contact: contact, source_id: phone) }
it 'migrates the contact_inbox from phone to lid' do
service = described_class.new(inbox: inbox, phone: phone, lid: lid, identifier: identifier)
service.perform
expect(phone_contact_inbox.reload.source_id).to eq(lid)
expect(contact.reload.identifier).to eq(identifier)
expect(contact.phone_number).to eq("+#{phone}")
end
context 'when there is an identifier conflict with a different contact' do
let!(:conflicting_contact) { create(:contact, account: inbox.account, identifier: identifier) } # rubocop:disable RSpec/LetSetup
it 'does not migrate' do
service = described_class.new(inbox: inbox, phone: phone, lid: lid, identifier: identifier)
service.perform
expect(phone_contact_inbox.reload.source_id).to eq(phone)
end
end
context 'when there is a phone conflict with a different contact' do
it 'does not migrate when another contact already has this phone number' do
# Create contact without phone, then create conflicting contact with the phone
contact.update!(phone_number: nil)
create(:contact, account: inbox.account, phone_number: "+#{phone}")
service = described_class.new(inbox: inbox, phone: phone, lid: lid, identifier: identifier)
service.perform
expect(phone_contact_inbox.reload.source_id).to eq(phone)
end
end
end
context 'when both phone and lid contact_inboxes exist for the same contact' do
let!(:contact) { create(:contact, account: inbox.account, phone_number: "+#{phone}", identifier: identifier) }
let!(:lid_contact_inbox) { create(:contact_inbox, inbox: inbox, contact: contact, source_id: lid) }
let!(:phone_contact_inbox) { create(:contact_inbox, inbox: inbox, contact: contact, source_id: phone) }
it 'consolidates by moving conversations and deleting phone-based contact_inbox' do
conversation = create(:conversation, inbox: inbox, contact: contact, contact_inbox: phone_contact_inbox)
service = described_class.new(inbox: inbox, phone: phone, lid: lid, identifier: identifier)
service.perform
expect(conversation.reload.contact_inbox_id).to eq(lid_contact_inbox.id)
expect(ContactInbox.exists?(phone_contact_inbox.id)).to be(false)
expect(inbox.contact_inboxes.where(contact: contact).count).to eq(1)
end
it 'handles multiple conversations on the phone-based contact_inbox' do
conversation1 = create(:conversation, inbox: inbox, contact: contact, contact_inbox: phone_contact_inbox)
conversation2 = create(:conversation, inbox: inbox, contact: contact, contact_inbox: phone_contact_inbox)
conversation3 = create(:conversation, inbox: inbox, contact: contact, contact_inbox: lid_contact_inbox)
service = described_class.new(inbox: inbox, phone: phone, lid: lid, identifier: identifier)
service.perform
expect(conversation1.reload.contact_inbox_id).to eq(lid_contact_inbox.id)
expect(conversation2.reload.contact_inbox_id).to eq(lid_contact_inbox.id)
expect(conversation3.reload.contact_inbox_id).to eq(lid_contact_inbox.id)
expect(ContactInbox.exists?(phone_contact_inbox.id)).to be(false)
end
end
context 'when phone and lid contact_inboxes belong to different contacts' do
let!(:contact1) { create(:contact, account: inbox.account, phone_number: "+#{phone}") }
let!(:contact2) { create(:contact, account: inbox.account, identifier: identifier) }
let!(:phone_contact_inbox) { create(:contact_inbox, inbox: inbox, contact: contact1, source_id: phone) }
let!(:lid_contact_inbox) { create(:contact_inbox, inbox: inbox, contact: contact2, source_id: lid) }
it 'does not consolidate different contacts' do
service = described_class.new(inbox: inbox, phone: phone, lid: lid, identifier: identifier)
service.perform
expect(ContactInbox.exists?(phone_contact_inbox.id)).to be(true)
expect(ContactInbox.exists?(lid_contact_inbox.id)).to be(true)
expect(phone_contact_inbox.reload.source_id).to eq(phone)
end
end
context 'when contact exists by phone but has contact_inbox with different source_id' do
let!(:contact) { create(:contact, account: inbox.account, phone_number: "+#{phone}") }
let!(:old_contact_inbox) { create(:contact_inbox, inbox: inbox, contact: contact, source_id: '999999999') }
it 'updates the existing contact_inbox source_id to lid' do
service = described_class.new(inbox: inbox, phone: phone, lid: lid, identifier: identifier)
expect { service.perform }.not_to change(ContactInbox, :count)
expect(old_contact_inbox.reload.source_id).to eq(lid)
expect(contact.reload.identifier).to eq(identifier)
end
context 'when a lid contact_inbox already exists' do
let!(:lid_contact_inbox) { create(:contact_inbox, inbox: inbox, contact: contact, source_id: lid) } # rubocop:disable RSpec/LetSetup
it 'does not update to avoid duplicate' do
service = described_class.new(inbox: inbox, phone: phone, lid: lid, identifier: identifier)
service.perform
expect(old_contact_inbox.reload.source_id).to eq('999999999')
end
end
context 'when another contact already has the same identifier' do
let!(:conflicting_contact) { create(:contact, account: inbox.account, identifier: identifier) } # rubocop:disable RSpec/LetSetup
it 'does not update to avoid identifier conflict' do
service = described_class.new(inbox: inbox, phone: phone, lid: lid, identifier: identifier)
service.perform
expect(old_contact_inbox.reload.source_id).to eq('999999999')
expect(contact.reload.identifier).not_to eq(identifier)
end
end
end
end
end

View File

@ -470,6 +470,77 @@ describe Whatsapp::ZapiHandlers::ReceivedCallback do
service.perform
end
end
describe 'conversation duplication after deletion or resolution' do
let(:phone) { '5511912345678' }
let(:lid) { '12345678' }
def build_params(message_id:, text:)
{
type: 'ReceivedCallback',
messageId: message_id,
momment: Time.current.to_i * 1000,
fromMe: false,
phone: phone,
chatLid: "#{lid}@lid",
chatName: 'John Doe',
text: { message: text }
}
end
shared_examples 'routes messages to the new conversation' do |first_msg_id:, second_msg_id:|
it 'routes incoming messages to the new conversation, not a third one' do
# Step 1: Create contact and first contact_inbox with phone as source_id
contact = create(:contact, account: inbox.account, phone_number: "+#{phone}", identifier: nil)
first_contact_inbox = create(:contact_inbox, inbox: inbox, contact: contact, source_id: phone)
first_conversation = create(:conversation, inbox: inbox, contact: contact, contact_inbox: first_contact_inbox)
# Step 2: Contact responds - this updates contact_inbox source_id from phone to LID
Whatsapp::IncomingMessageZapiService.new(
inbox: inbox,
params: build_params(message_id: first_msg_id, text: 'First response')
).perform
# Verify message landed in first conversation and source_id migrated
expect(first_conversation.messages.count).to eq(1)
expect(first_contact_inbox.reload.source_id).to eq(lid)
# Step 3: Either delete or resolve the first conversation
close_first_conversation.call(first_conversation)
# Step 4: Create a new conversation (simulating UI creating a new contact_inbox)
second_contact_inbox = create(:contact_inbox, inbox: inbox, contact: contact, source_id: phone)
second_conversation = create(:conversation, inbox: inbox, contact: contact, contact_inbox: second_contact_inbox, status: :open)
expect(inbox.contact_inboxes.where(contact: contact).count).to eq(2)
# Step 5: Contact responds again - should NOT create a third conversation
expect do
Whatsapp::IncomingMessageZapiService.new(
inbox: inbox,
params: build_params(message_id: second_msg_id, text: 'Second response')
).perform
end.not_to change(Conversation, :count)
# The message should arrive in the second conversation
expect(second_conversation.reload.messages.last.content).to eq('Second response')
# The duplicate contact_inboxes should be consolidated
expect(inbox.contact_inboxes.where(contact: contact).count).to eq(1)
end
end
context 'when a conversation is deleted and a new one is created for the same contact' do
let(:close_first_conversation) { ->(conv) { conv.destroy! } }
it_behaves_like 'routes messages to the new conversation', first_msg_id: 'msg_001', second_msg_id: 'msg_002'
end
context 'when a conversation is resolved and a new one is created for the same contact' do
let(:close_first_conversation) { ->(conv) { conv.update!(status: :resolved) } }
it_behaves_like 'routes messages to the new conversation', first_msg_id: 'msg_003', second_msg_id: 'msg_004'
end
end
end
describe '#process_received_callback' do