From ec8366aabd7c5774a81cff601ed433bde10cff31 Mon Sep 17 00:00:00 2001 From: Gabriel Jablonski Date: Sat, 24 Jan 2026 18:13:35 -0300 Subject: [PATCH] fix(whatsapp): fix conversation duplication on race condition (#193) * fix(whatsapp): update message source ID handling and improve Redis key formatting * fix(whatsapp): prevent updating contact_inbox on identifier conflict * fix(whatsapp): refactor conversation routing tests to use shared examples for better clarity and maintainability --- .../baileys_handlers/messages_upsert.rb | 27 +-- .../contact_inbox_consolidation_service.rb | 97 ++++++++++ .../zapi_handlers/received_callback.rb | 23 +-- .../baileys_handlers/messages_upsert_spec.rb | 71 ++++++++ ...ontact_inbox_consolidation_service_spec.rb | 170 ++++++++++++++++++ .../zapi_handlers/received_callback_spec.rb | 71 ++++++++ 6 files changed, 422 insertions(+), 37 deletions(-) create mode 100644 app/services/whatsapp/contact_inbox_consolidation_service.rb create mode 100644 spec/services/whatsapp/contact_inbox_consolidation_service_spec.rb diff --git a/app/services/whatsapp/baileys_handlers/messages_upsert.rb b/app/services/whatsapp/baileys_handlers/messages_upsert.rb index e54b4aff7..129cbb34d 100644 --- a/app/services/whatsapp/baileys_handlers/messages_upsert.rb +++ b/app/services/whatsapp/baileys_handlers/messages_upsert.rb @@ -54,7 +54,12 @@ module Whatsapp::BaileysHandlers::MessagesUpsert # rubocop:disable Metrics/Modul source_id = extract_from_jid(type: 'lid') identifier = "#{source_id}@lid" - update_existing_contact_inbox(phone, source_id, identifier) if phone + Whatsapp::ContactInboxConsolidationService.new( + inbox: inbox, + phone: phone, + lid: source_id, + identifier: identifier + ).perform contact_inbox = ::ContactInboxWithContactBuilder.new( source_id: source_id, @@ -68,26 +73,6 @@ module Whatsapp::BaileysHandlers::MessagesUpsert # rubocop:disable Metrics/Modul update_contact_info(phone, source_id, identifier) end - def update_existing_contact_inbox(phone, source_id, identifier) - # NOTE: This is useful when we create a new contact manually, so we don't have information about contact LID; - # With this, when we receive a message from that contact, we can link it properly. - existing_contact_inbox = inbox.contact_inboxes.find_by(source_id: phone) - return unless existing_contact_inbox - return if inbox.contact_inboxes.exists?(source_id: source_id) - - existing_contact = existing_contact_inbox.contact - conflicting_identifier = inbox.account.contacts.find_by(identifier: identifier) - conflicting_phone = inbox.account.contacts.find_by(phone_number: "+#{phone}") - - return if conflicting_identifier && conflicting_identifier.id != existing_contact.id - return if conflicting_phone && conflicting_phone.id != existing_contact.id - - ActiveRecord::Base.transaction do - existing_contact_inbox.update!(source_id: source_id) - existing_contact.update!(identifier: identifier, phone_number: "+#{phone}") - end - end - def update_contact_info(phone, source_id, identifier) update_params = {} update_params[:phone_number] = "+#{phone}" if phone diff --git a/app/services/whatsapp/contact_inbox_consolidation_service.rb b/app/services/whatsapp/contact_inbox_consolidation_service.rb new file mode 100644 index 000000000..188430be6 --- /dev/null +++ b/app/services/whatsapp/contact_inbox_consolidation_service.rb @@ -0,0 +1,97 @@ +# Handles consolidation of duplicate contact_inboxes for WhatsApp channels. +# This is needed because: +# 1. When a conversation is first created via UI, a contact_inbox is created with source_id = phone +# 2. When the contact responds, the contact_inbox is updated to source_id = LID +# 3. If the conversation is deleted/resolved and a new one is created, a new contact_inbox +# with source_id = phone is created (since the existing one has LID) +# 4. This service consolidates these duplicates when a message arrives +class Whatsapp::ContactInboxConsolidationService + def initialize(inbox:, phone:, lid:, identifier:) + @inbox = inbox + @phone = phone + @lid = lid + @identifier = identifier + end + + def perform # rubocop:disable Metrics/CyclomaticComplexity,Metrics/PerceivedComplexity + return unless @phone.present? && @lid.present? + # If phone and lid are the same, no consolidation needed + return if @phone == @lid + + phone_contact_inbox = find_phone_contact_inbox + lid_contact_inbox = find_lid_contact_inbox + + if phone_contact_inbox && lid_contact_inbox && should_consolidate?(phone_contact_inbox, lid_contact_inbox) + consolidate_contact_inboxes(phone_contact_inbox, lid_contact_inbox) + elsif phone_contact_inbox && lid_contact_inbox.nil? + migrate_phone_to_lid(phone_contact_inbox) + elsif phone_contact_inbox.nil? + # No phone-based contact_inbox exists, try to find contact by phone and update their contact_inbox + update_existing_contact_inbox_by_phone + end + end + + private + + def find_phone_contact_inbox + @inbox.contact_inboxes.find_by(source_id: @phone) + end + + def find_lid_contact_inbox + @inbox.contact_inboxes.find_by(source_id: @lid) + end + + def should_consolidate?(phone_contact_inbox, lid_contact_inbox) + phone_contact_inbox.contact_id == lid_contact_inbox.contact_id + end + + def consolidate_contact_inboxes(phone_contact_inbox, lid_contact_inbox) + ActiveRecord::Base.transaction do + phone_contact_inbox.conversations.find_each do |conversation| + conversation.update!(contact_inbox_id: lid_contact_inbox.id) + end + + phone_contact_inbox.destroy! + end + end + + def migrate_phone_to_lid(phone_contact_inbox) + existing_contact = phone_contact_inbox.contact + + return if identifier_conflict?(existing_contact) + return if phone_conflict?(existing_contact) + + ActiveRecord::Base.transaction do + phone_contact_inbox.update!(source_id: @lid) + existing_contact.update!(identifier: @identifier, phone_number: "+#{@phone}") + end + end + + # Find contact by phone number and update their contact_inbox source_id to LID + # This handles the case where contact_inbox has a different source_id (e.g., old format) + def update_existing_contact_inbox_by_phone + existing_contact = @inbox.account.contacts.find_by(phone_number: "+#{@phone}") + return unless existing_contact + + existing_contact_inbox = existing_contact.contact_inboxes.find_by(inbox_id: @inbox.id) + return unless existing_contact_inbox + # Don't update if we'd create a duplicate contact_inbox or identifier conflict + return if find_lid_contact_inbox + return if identifier_conflict?(existing_contact) + + ActiveRecord::Base.transaction do + existing_contact.update!(identifier: @identifier) + existing_contact_inbox.update!(source_id: @lid) + end + end + + def identifier_conflict?(existing_contact) + conflicting = @inbox.account.contacts.find_by(identifier: @identifier) + conflicting.present? && conflicting.id != existing_contact.id + end + + def phone_conflict?(existing_contact) + conflicting = @inbox.account.contacts.find_by(phone_number: "+#{@phone}") + conflicting.present? && conflicting.id != existing_contact.id + end +end diff --git a/app/services/whatsapp/zapi_handlers/received_callback.rb b/app/services/whatsapp/zapi_handlers/received_callback.rb index 4feb1b821..8a1484591 100644 --- a/app/services/whatsapp/zapi_handlers/received_callback.rb +++ b/app/services/whatsapp/zapi_handlers/received_callback.rb @@ -76,7 +76,7 @@ module Whatsapp::ZapiHandlers::ReceivedCallback # rubocop:disable Metrics/Module @raw_message[:senderName] || @raw_message[:chatName] || @raw_message[:phone] end - def set_contact + def set_contact # rubocop:disable Metrics/MethodLength push_name = contact_name source_id = @raw_message[:chatLid].to_s.gsub(/[^\d]/, '') identifier = @raw_message[:chatLid] @@ -85,7 +85,12 @@ module Whatsapp::ZapiHandlers::ReceivedCallback # rubocop:disable Metrics/Module unless @raw_message[:phone].ends_with?('@lid') contact_attributes[:phone_number] = "+#{@raw_message[:phone]}" - update_existing_contact_inbox(@raw_message[:phone], source_id, identifier) + Whatsapp::ContactInboxConsolidationService.new( + inbox: inbox, + phone: @raw_message[:phone], + lid: source_id, + identifier: identifier + ).perform end contact_inbox = ::ContactInboxWithContactBuilder.new( @@ -102,20 +107,6 @@ module Whatsapp::ZapiHandlers::ReceivedCallback # rubocop:disable Metrics/Module try_update_contact_avatar end - def update_existing_contact_inbox(phone, source_id, identifier) - # NOTE: This is useful when we create a new contact manually, so we don't have information about contact LID; - # With this, when we receive a message from that contact, we can link it properly. - existing_contact = inbox.account.contacts.find_by(phone_number: "+#{phone}") - return unless existing_contact - - existing_contact_inbox = existing_contact.contact_inboxes.find_by(inbox_id: inbox.id) - - ActiveRecord::Base.transaction do - existing_contact.update!(identifier: identifier) - existing_contact_inbox&.update!(source_id: source_id) - end - end - def update_contact_phone_number return if @contact.phone_number.present? return if @raw_message[:phone].ends_with?('@lid') diff --git a/spec/services/whatsapp/baileys_handlers/messages_upsert_spec.rb b/spec/services/whatsapp/baileys_handlers/messages_upsert_spec.rb index 4eb9e1ede..576ac17cb 100644 --- a/spec/services/whatsapp/baileys_handlers/messages_upsert_spec.rb +++ b/spec/services/whatsapp/baileys_handlers/messages_upsert_spec.rb @@ -449,4 +449,75 @@ describe Whatsapp::BaileysHandlers::MessagesUpsert do end end end + + describe 'conversation duplication after deletion or resolution' do + let(:phone) { '5511912345678' } + let(:lid) { '12345678' } + + def build_raw_message(id:, text:) + { + key: { id: id, remoteJid: "#{lid}@lid", remoteJidAlt: "#{phone}@s.whatsapp.net", fromMe: false, addressingMode: 'lid' }, + pushName: 'John Doe', + messageTimestamp: timestamp, + message: { conversation: text } + } + end + + def build_params(raw_message) + { webhookVerifyToken: webhook_verify_token, event: 'messages.upsert', data: { type: 'notify', messages: [raw_message] } } + end + + shared_examples 'routes messages to the new conversation' do |first_msg_id:, second_msg_id:| + it 'routes incoming messages to the new conversation, not a third one' do + # Step 1: Create contact and first contact_inbox with phone as source_id + contact = create(:contact, account: inbox.account, phone_number: "+#{phone}", identifier: nil) + first_contact_inbox = create(:contact_inbox, inbox: inbox, contact: contact, source_id: phone) + first_conversation = create(:conversation, inbox: inbox, contact: contact, contact_inbox: first_contact_inbox) + + # Step 2: Contact responds - this updates contact_inbox source_id from phone to LID + Whatsapp::IncomingMessageBaileysService.new( + inbox: inbox, + params: build_params(build_raw_message(id: first_msg_id, text: 'First response')) + ).perform + + # Verify message landed in first conversation and source_id migrated + expect(first_conversation.messages.count).to eq(1) + expect(first_contact_inbox.reload.source_id).to eq(lid) + + # Step 3: Either delete or resolve the first conversation + close_first_conversation.call(first_conversation) + + # Step 4: Create a new conversation (simulating UI creating a new contact_inbox) + second_contact_inbox = create(:contact_inbox, inbox: inbox, contact: contact, source_id: phone) + second_conversation = create(:conversation, inbox: inbox, contact: contact, contact_inbox: second_contact_inbox, status: :open) + expect(inbox.contact_inboxes.where(contact: contact).count).to eq(2) + + # Step 5: Contact responds again - should NOT create a third conversation + expect do + Whatsapp::IncomingMessageBaileysService.new( + inbox: inbox, + params: build_params(build_raw_message(id: second_msg_id, text: 'Second response')) + ).perform + end.not_to change(Conversation, :count) + + # The message should arrive in the second conversation + expect(second_conversation.reload.messages.last.content).to eq('Second response') + + # The duplicate contact_inboxes should be consolidated + expect(inbox.contact_inboxes.where(contact: contact).count).to eq(1) + end + end + + context 'when a conversation is deleted and a new one is created for the same contact' do + let(:close_first_conversation) { ->(conv) { conv.destroy! } } + + it_behaves_like 'routes messages to the new conversation', first_msg_id: 'msg_001', second_msg_id: 'msg_002' + end + + context 'when a conversation is resolved and a new one is created for the same contact' do + let(:close_first_conversation) { ->(conv) { conv.update!(status: :resolved) } } + + it_behaves_like 'routes messages to the new conversation', first_msg_id: 'msg_003', second_msg_id: 'msg_004' + end + end end diff --git a/spec/services/whatsapp/contact_inbox_consolidation_service_spec.rb b/spec/services/whatsapp/contact_inbox_consolidation_service_spec.rb new file mode 100644 index 000000000..78cbf2681 --- /dev/null +++ b/spec/services/whatsapp/contact_inbox_consolidation_service_spec.rb @@ -0,0 +1,170 @@ +require 'rails_helper' + +describe Whatsapp::ContactInboxConsolidationService do + let!(:whatsapp_channel) do + create(:channel_whatsapp, provider: 'baileys', validate_provider_config: false) + end + let(:inbox) { whatsapp_channel.inbox } + let(:phone) { '5511912345678' } + let(:lid) { '12345678' } + let(:identifier) { "#{lid}@lid" } + + describe '#perform' do + context 'when phone is blank' do + it 'does nothing' do + service = described_class.new(inbox: inbox, phone: nil, lid: lid, identifier: identifier) + + expect { service.perform }.not_to change(ContactInbox, :count) + end + end + + context 'when lid is blank' do + it 'does nothing' do + service = described_class.new(inbox: inbox, phone: phone, lid: nil, identifier: identifier) + + expect { service.perform }.not_to change(ContactInbox, :count) + end + end + + context 'when phone and lid are the same' do + it 'does nothing' do + contact = create(:contact, account: inbox.account, phone_number: "+#{phone}") + create(:contact_inbox, inbox: inbox, contact: contact, source_id: phone) + + service = described_class.new(inbox: inbox, phone: phone, lid: phone, identifier: identifier) + + expect { service.perform }.not_to change(ContactInbox, :count) + end + end + + context 'when no phone-based contact_inbox exists' do + it 'does nothing' do + service = described_class.new(inbox: inbox, phone: phone, lid: lid, identifier: identifier) + + expect { service.perform }.not_to change(ContactInbox, :count) + end + end + + context 'when only phone-based contact_inbox exists' do + let!(:contact) { create(:contact, account: inbox.account, phone_number: "+#{phone}") } + let!(:phone_contact_inbox) { create(:contact_inbox, inbox: inbox, contact: contact, source_id: phone) } + + it 'migrates the contact_inbox from phone to lid' do + service = described_class.new(inbox: inbox, phone: phone, lid: lid, identifier: identifier) + service.perform + + expect(phone_contact_inbox.reload.source_id).to eq(lid) + expect(contact.reload.identifier).to eq(identifier) + expect(contact.phone_number).to eq("+#{phone}") + end + + context 'when there is an identifier conflict with a different contact' do + let!(:conflicting_contact) { create(:contact, account: inbox.account, identifier: identifier) } # rubocop:disable RSpec/LetSetup + + it 'does not migrate' do + service = described_class.new(inbox: inbox, phone: phone, lid: lid, identifier: identifier) + service.perform + + expect(phone_contact_inbox.reload.source_id).to eq(phone) + end + end + + context 'when there is a phone conflict with a different contact' do + it 'does not migrate when another contact already has this phone number' do + # Create contact without phone, then create conflicting contact with the phone + contact.update!(phone_number: nil) + create(:contact, account: inbox.account, phone_number: "+#{phone}") + + service = described_class.new(inbox: inbox, phone: phone, lid: lid, identifier: identifier) + service.perform + + expect(phone_contact_inbox.reload.source_id).to eq(phone) + end + end + end + + context 'when both phone and lid contact_inboxes exist for the same contact' do + let!(:contact) { create(:contact, account: inbox.account, phone_number: "+#{phone}", identifier: identifier) } + let!(:lid_contact_inbox) { create(:contact_inbox, inbox: inbox, contact: contact, source_id: lid) } + let!(:phone_contact_inbox) { create(:contact_inbox, inbox: inbox, contact: contact, source_id: phone) } + + it 'consolidates by moving conversations and deleting phone-based contact_inbox' do + conversation = create(:conversation, inbox: inbox, contact: contact, contact_inbox: phone_contact_inbox) + + service = described_class.new(inbox: inbox, phone: phone, lid: lid, identifier: identifier) + service.perform + + expect(conversation.reload.contact_inbox_id).to eq(lid_contact_inbox.id) + expect(ContactInbox.exists?(phone_contact_inbox.id)).to be(false) + expect(inbox.contact_inboxes.where(contact: contact).count).to eq(1) + end + + it 'handles multiple conversations on the phone-based contact_inbox' do + conversation1 = create(:conversation, inbox: inbox, contact: contact, contact_inbox: phone_contact_inbox) + conversation2 = create(:conversation, inbox: inbox, contact: contact, contact_inbox: phone_contact_inbox) + conversation3 = create(:conversation, inbox: inbox, contact: contact, contact_inbox: lid_contact_inbox) + + service = described_class.new(inbox: inbox, phone: phone, lid: lid, identifier: identifier) + service.perform + + expect(conversation1.reload.contact_inbox_id).to eq(lid_contact_inbox.id) + expect(conversation2.reload.contact_inbox_id).to eq(lid_contact_inbox.id) + expect(conversation3.reload.contact_inbox_id).to eq(lid_contact_inbox.id) + expect(ContactInbox.exists?(phone_contact_inbox.id)).to be(false) + end + end + + context 'when phone and lid contact_inboxes belong to different contacts' do + let!(:contact1) { create(:contact, account: inbox.account, phone_number: "+#{phone}") } + let!(:contact2) { create(:contact, account: inbox.account, identifier: identifier) } + let!(:phone_contact_inbox) { create(:contact_inbox, inbox: inbox, contact: contact1, source_id: phone) } + let!(:lid_contact_inbox) { create(:contact_inbox, inbox: inbox, contact: contact2, source_id: lid) } + + it 'does not consolidate different contacts' do + service = described_class.new(inbox: inbox, phone: phone, lid: lid, identifier: identifier) + service.perform + + expect(ContactInbox.exists?(phone_contact_inbox.id)).to be(true) + expect(ContactInbox.exists?(lid_contact_inbox.id)).to be(true) + expect(phone_contact_inbox.reload.source_id).to eq(phone) + end + end + + context 'when contact exists by phone but has contact_inbox with different source_id' do + let!(:contact) { create(:contact, account: inbox.account, phone_number: "+#{phone}") } + let!(:old_contact_inbox) { create(:contact_inbox, inbox: inbox, contact: contact, source_id: '999999999') } + + it 'updates the existing contact_inbox source_id to lid' do + service = described_class.new(inbox: inbox, phone: phone, lid: lid, identifier: identifier) + + expect { service.perform }.not_to change(ContactInbox, :count) + + expect(old_contact_inbox.reload.source_id).to eq(lid) + expect(contact.reload.identifier).to eq(identifier) + end + + context 'when a lid contact_inbox already exists' do + let!(:lid_contact_inbox) { create(:contact_inbox, inbox: inbox, contact: contact, source_id: lid) } # rubocop:disable RSpec/LetSetup + + it 'does not update to avoid duplicate' do + service = described_class.new(inbox: inbox, phone: phone, lid: lid, identifier: identifier) + service.perform + + expect(old_contact_inbox.reload.source_id).to eq('999999999') + end + end + + context 'when another contact already has the same identifier' do + let!(:conflicting_contact) { create(:contact, account: inbox.account, identifier: identifier) } # rubocop:disable RSpec/LetSetup + + it 'does not update to avoid identifier conflict' do + service = described_class.new(inbox: inbox, phone: phone, lid: lid, identifier: identifier) + service.perform + + expect(old_contact_inbox.reload.source_id).to eq('999999999') + expect(contact.reload.identifier).not_to eq(identifier) + end + end + end + end +end diff --git a/spec/services/whatsapp/zapi_handlers/received_callback_spec.rb b/spec/services/whatsapp/zapi_handlers/received_callback_spec.rb index 1b0a99918..52c009c5a 100644 --- a/spec/services/whatsapp/zapi_handlers/received_callback_spec.rb +++ b/spec/services/whatsapp/zapi_handlers/received_callback_spec.rb @@ -470,6 +470,77 @@ describe Whatsapp::ZapiHandlers::ReceivedCallback do service.perform end end + + describe 'conversation duplication after deletion or resolution' do + let(:phone) { '5511912345678' } + let(:lid) { '12345678' } + + def build_params(message_id:, text:) + { + type: 'ReceivedCallback', + messageId: message_id, + momment: Time.current.to_i * 1000, + fromMe: false, + phone: phone, + chatLid: "#{lid}@lid", + chatName: 'John Doe', + text: { message: text } + } + end + + shared_examples 'routes messages to the new conversation' do |first_msg_id:, second_msg_id:| + it 'routes incoming messages to the new conversation, not a third one' do + # Step 1: Create contact and first contact_inbox with phone as source_id + contact = create(:contact, account: inbox.account, phone_number: "+#{phone}", identifier: nil) + first_contact_inbox = create(:contact_inbox, inbox: inbox, contact: contact, source_id: phone) + first_conversation = create(:conversation, inbox: inbox, contact: contact, contact_inbox: first_contact_inbox) + + # Step 2: Contact responds - this updates contact_inbox source_id from phone to LID + Whatsapp::IncomingMessageZapiService.new( + inbox: inbox, + params: build_params(message_id: first_msg_id, text: 'First response') + ).perform + + # Verify message landed in first conversation and source_id migrated + expect(first_conversation.messages.count).to eq(1) + expect(first_contact_inbox.reload.source_id).to eq(lid) + + # Step 3: Either delete or resolve the first conversation + close_first_conversation.call(first_conversation) + + # Step 4: Create a new conversation (simulating UI creating a new contact_inbox) + second_contact_inbox = create(:contact_inbox, inbox: inbox, contact: contact, source_id: phone) + second_conversation = create(:conversation, inbox: inbox, contact: contact, contact_inbox: second_contact_inbox, status: :open) + expect(inbox.contact_inboxes.where(contact: contact).count).to eq(2) + + # Step 5: Contact responds again - should NOT create a third conversation + expect do + Whatsapp::IncomingMessageZapiService.new( + inbox: inbox, + params: build_params(message_id: second_msg_id, text: 'Second response') + ).perform + end.not_to change(Conversation, :count) + + # The message should arrive in the second conversation + expect(second_conversation.reload.messages.last.content).to eq('Second response') + + # The duplicate contact_inboxes should be consolidated + expect(inbox.contact_inboxes.where(contact: contact).count).to eq(1) + end + end + + context 'when a conversation is deleted and a new one is created for the same contact' do + let(:close_first_conversation) { ->(conv) { conv.destroy! } } + + it_behaves_like 'routes messages to the new conversation', first_msg_id: 'msg_001', second_msg_id: 'msg_002' + end + + context 'when a conversation is resolved and a new one is created for the same contact' do + let(:close_first_conversation) { ->(conv) { conv.update!(status: :resolved) } } + + it_behaves_like 'routes messages to the new conversation', first_msg_id: 'msg_003', second_msg_id: 'msg_004' + end + end end describe '#process_received_callback' do