fix: whatsapp race condition (#190)
* fix: whatsapp race condition * fix: prevent race conditions in WhatsApp message handling * fix: improve WhatsApp contact lock handling to prevent race conditions
This commit is contained in:
parent
502f7a470f
commit
b199c2c786
@ -20,26 +20,33 @@ module Whatsapp::BaileysHandlers::MessagesUpsert # rubocop:disable Metrics/Modul
|
||||
end
|
||||
end
|
||||
|
||||
def handle_message
|
||||
def handle_message # rubocop:disable Metrics/CyclomaticComplexity,Metrics/PerceivedComplexity
|
||||
@lock_acquired = false
|
||||
|
||||
return unless %w[lid user].include?(jid_type)
|
||||
return unless extract_from_jid(type: 'lid')
|
||||
return if ignore_message?
|
||||
return if find_message_by_source_id(raw_message_id)
|
||||
|
||||
return unless acquire_message_processing_lock
|
||||
@lock_acquired = acquire_message_processing_lock
|
||||
return unless @lock_acquired
|
||||
|
||||
set_contact
|
||||
# Lock by contact phone to prevent race conditions when multiple messages
|
||||
# from the same contact arrive simultaneously (e.g., WhatsApp albums).
|
||||
contact_phone = extract_from_jid(type: 'pn') || extract_from_jid(type: 'lid')
|
||||
with_contact_lock(contact_phone) do
|
||||
set_contact
|
||||
|
||||
unless @contact
|
||||
clear_message_source_id_from_redis
|
||||
unless @contact
|
||||
Rails.logger.warn "Contact not found for message: #{raw_message_id}"
|
||||
return
|
||||
end
|
||||
|
||||
Rails.logger.warn "Contact not found for message: #{raw_message_id}"
|
||||
return
|
||||
set_conversation
|
||||
handle_create_message
|
||||
end
|
||||
|
||||
set_conversation
|
||||
handle_create_message
|
||||
clear_message_source_id_from_redis
|
||||
ensure
|
||||
clear_message_source_id_from_redis if @lock_acquired
|
||||
end
|
||||
|
||||
def set_contact
|
||||
|
||||
@ -19,6 +19,8 @@ class Whatsapp::IncomingMessageBaseService
|
||||
private
|
||||
|
||||
def process_messages
|
||||
@lock_acquired = false
|
||||
|
||||
# We don't support reactions & ephemeral message now, we need to skip processing the message
|
||||
# if the webhook event is a reaction or an ephermal message or an unsupported message.
|
||||
return if unprocessable_message_type?(message_type)
|
||||
@ -28,16 +30,25 @@ class Whatsapp::IncomingMessageBaseService
|
||||
# there are no duplicate messages created.
|
||||
return if find_message_by_source_id(@processed_params[:messages].first[:id])
|
||||
|
||||
return unless acquire_message_processing_lock
|
||||
@lock_acquired = acquire_message_processing_lock
|
||||
return unless @lock_acquired
|
||||
|
||||
set_contact
|
||||
return unless @contact
|
||||
# Lock by contact phone to prevent race conditions when multiple messages
|
||||
# from the same contact arrive simultaneously (e.g., WhatsApp albums).
|
||||
contact_phone = @processed_params[:messages].first[:from]
|
||||
with_contact_lock(contact_phone) do
|
||||
set_contact
|
||||
return unless @contact
|
||||
|
||||
ActiveRecord::Base.transaction do
|
||||
set_conversation
|
||||
create_messages
|
||||
clear_message_source_id_from_redis
|
||||
ActiveRecord::Base.transaction do
|
||||
set_conversation
|
||||
create_messages
|
||||
end
|
||||
end
|
||||
ensure
|
||||
# Clear lock AFTER transaction commits to prevent race conditions where another request
|
||||
# acquires the lock before this transaction is visible to other connections
|
||||
clear_message_source_id_from_redis if @lock_acquired
|
||||
end
|
||||
|
||||
def process_statuses
|
||||
|
||||
@ -85,4 +85,31 @@ module Whatsapp::IncomingMessageServiceHelpers
|
||||
key = format(Redis::RedisKeys::MESSAGE_SOURCE_KEY, id: "#{inbox.id}_#{@processed_params[:messages].first[:id]}")
|
||||
::Redis::Alfred.delete(key)
|
||||
end
|
||||
|
||||
# Lock by contact phone to prevent race conditions when multiple messages
|
||||
# from the same contact arrive simultaneously (e.g., WhatsApp albums).
|
||||
# Without this, each message could create its own conversation.
|
||||
def with_contact_lock(phone, timeout: 5.seconds)
|
||||
raise ArgumentError, 'A block is required for with_contact_lock' unless block_given?
|
||||
return yield if phone.blank?
|
||||
|
||||
key = "WHATSAPP::CONTACT_LOCK::#{inbox.id}_#{phone}"
|
||||
start_time = Time.now.to_i
|
||||
lock_acquired = false
|
||||
|
||||
while (Time.now.to_i - start_time) < timeout
|
||||
if Redis::Alfred.set(key, 1, nx: true, ex: timeout)
|
||||
lock_acquired = true
|
||||
break
|
||||
end
|
||||
|
||||
sleep(0.1)
|
||||
end
|
||||
|
||||
raise Timeout::Error, "Timeout acquiring contact lock for #{phone}" unless lock_acquired
|
||||
|
||||
yield
|
||||
ensure
|
||||
Redis::Alfred.delete(key) if lock_acquired
|
||||
end
|
||||
end
|
||||
|
||||
@ -463,6 +463,11 @@ describe Whatsapp::IncomingMessageBaileysService do
|
||||
end
|
||||
|
||||
it 'caches and clears message source id in Redis' do
|
||||
# Allow all Redis::Alfred calls (contact lock uses different keys)
|
||||
allow(Redis::Alfred).to receive(:set).and_call_original
|
||||
allow(Redis::Alfred).to receive(:delete).and_call_original
|
||||
|
||||
# Stub message lock specifically
|
||||
allow(Redis::Alfred).to receive(:set)
|
||||
.with(format_message_source_key('msg_123'), true, nx: true, ex: 1.day)
|
||||
.and_return(true)
|
||||
@ -470,8 +475,29 @@ describe Whatsapp::IncomingMessageBaileysService do
|
||||
|
||||
described_class.new(inbox: inbox, params: params).perform
|
||||
|
||||
expect(Redis::Alfred).to have_received(:set)
|
||||
expect(Redis::Alfred).to have_received(:delete)
|
||||
expect(Redis::Alfred).to have_received(:set).with(format_message_source_key('msg_123'), true, nx: true, ex: 1.day)
|
||||
expect(Redis::Alfred).to have_received(:delete).with(format_message_source_key('msg_123'))
|
||||
end
|
||||
|
||||
it 'clears lock even when an exception occurs after acquiring it' do
|
||||
# Bug: no ensure block meant exceptions left lock stuck forever
|
||||
# Fix: use ensure block to always clear lock when acquired
|
||||
# Allow all Redis::Alfred calls (contact lock uses different keys)
|
||||
allow(Redis::Alfred).to receive(:set).and_call_original
|
||||
allow(Redis::Alfred).to receive(:delete).and_call_original
|
||||
|
||||
# Stub message lock specifically
|
||||
allow(Redis::Alfred).to receive(:set)
|
||||
.with(format_message_source_key('msg_123'), true, nx: true, ex: 1.day)
|
||||
.and_return(true)
|
||||
allow(Redis::Alfred).to receive(:delete).with(format_message_source_key('msg_123'))
|
||||
|
||||
service = described_class.new(inbox: inbox, params: params)
|
||||
allow(service).to receive(:handle_create_message).and_raise(StandardError, 'simulated error')
|
||||
|
||||
expect { service.perform }.to raise_error(StandardError, 'simulated error')
|
||||
|
||||
expect(Redis::Alfred).to have_received(:delete).with(format_message_source_key('msg_123'))
|
||||
end
|
||||
end
|
||||
|
||||
|
||||
@ -144,49 +144,64 @@ describe Whatsapp::IncomingMessageService do
|
||||
expect(whatsapp_channel.inbox.messages.count).to eq(1)
|
||||
end
|
||||
|
||||
it 'prevents duplicate when both requests pass the message_under_process? check before cache is set' do
|
||||
# This test explicitly simulates the race condition timing:
|
||||
# 1. Request A calls message_under_process? -> returns nil (no lock)
|
||||
# 2. Request B calls message_under_process? -> returns nil (no lock yet!)
|
||||
# 3. Request A calls cache_message_source_id_in_redis
|
||||
# 4. Request B calls cache_message_source_id_in_redis
|
||||
# 5. Both requests pass find_message_by_source_id (message not yet committed)
|
||||
# 6. Both create messages -> DUPLICATE!
|
||||
#
|
||||
# With atomic lock (SETNX), only one can succeed.
|
||||
|
||||
# Key is scoped by inbox.id to prevent cross-inbox lock collisions
|
||||
it 'only allows one of two concurrent requests to process the same message' do
|
||||
# Simulates atomic SETNX: first caller gets the lock, second is rejected
|
||||
message_source_key = format(Redis::RedisKeys::MESSAGE_SOURCE_KEY, id: "#{whatsapp_channel.inbox.id}_#{params[:messages].first[:id]}")
|
||||
lock_acquired = false
|
||||
|
||||
# Simulate atomic SETNX: only the first call returns true
|
||||
# Allow all Redis::Alfred calls (contact lock uses different keys)
|
||||
allow(Redis::Alfred).to receive(:set).and_call_original
|
||||
allow(Redis::Alfred).to receive(:delete).and_call_original
|
||||
|
||||
# Stub message lock specifically
|
||||
allow(Redis::Alfred).to receive(:set).with(message_source_key, true, nx: true, ex: 1.day) do
|
||||
if lock_acquired
|
||||
false # Second caller fails to acquire lock
|
||||
else
|
||||
lock_acquired = true
|
||||
true # First caller acquires lock
|
||||
end
|
||||
result = !lock_acquired
|
||||
lock_acquired = true
|
||||
result
|
||||
end
|
||||
|
||||
allow(Redis::Alfred).to receive(:delete).with(message_source_key)
|
||||
|
||||
# Run two services "simultaneously" (both pass the check before either sets the lock)
|
||||
service1 = described_class.new(inbox: whatsapp_channel.inbox, params: params)
|
||||
service2 = described_class.new(inbox: whatsapp_channel.inbox, params: params)
|
||||
|
||||
# Mock find_message_by_source_id on specific instances (simulating uncommitted transaction)
|
||||
# Both bypass find_message_by_source_id (simulating race before DB commit)
|
||||
allow(service1).to receive(:find_message_by_source_id).and_return(nil)
|
||||
allow(service2).to receive(:find_message_by_source_id).and_return(nil)
|
||||
|
||||
service1.perform
|
||||
service2.perform
|
||||
|
||||
# With atomic lock, only ONE should create a message
|
||||
expect(whatsapp_channel.inbox.messages.count).to eq(1)
|
||||
expect(whatsapp_channel.inbox.conversations.count).to eq(1)
|
||||
end
|
||||
|
||||
it 'clears lock outside transaction to prevent race conditions' do
|
||||
# Bug: lock was cleared INSIDE transaction before commit, allowing:
|
||||
# 1. Request A creates message, clears lock (transaction uncommitted)
|
||||
# 2. Request B acquires lock, can't see A's uncommitted message
|
||||
# 3. Both create duplicates
|
||||
#
|
||||
# Fix: clear lock in ensure block AFTER transaction commits
|
||||
message_source_key = format(Redis::RedisKeys::MESSAGE_SOURCE_KEY, id: "#{whatsapp_channel.inbox.id}_#{params[:messages].first[:id]}")
|
||||
lock_cleared_at_depth = nil
|
||||
|
||||
# Allow all Redis::Alfred calls (contact lock uses different keys)
|
||||
allow(Redis::Alfred).to receive(:set).and_call_original
|
||||
allow(Redis::Alfred).to receive(:delete).and_call_original
|
||||
|
||||
# Stub message lock specifically
|
||||
allow(Redis::Alfred).to receive(:set).with(message_source_key, true, nx: true, ex: 1.day).and_return(true)
|
||||
allow(Redis::Alfred).to receive(:delete).with(message_source_key) do
|
||||
lock_cleared_at_depth = ActiveRecord::Base.connection.open_transactions
|
||||
end
|
||||
|
||||
described_class.new(inbox: whatsapp_channel.inbox, params: params).perform
|
||||
|
||||
# Depth 1 = only RSpec's wrapper transaction, meaning our transaction completed
|
||||
expect(lock_cleared_at_depth).to eq(1),
|
||||
"Lock cleared at depth #{lock_cleared_at_depth}, expected 1. " \
|
||||
'Lock must be cleared AFTER transaction commits to prevent duplicates.'
|
||||
end
|
||||
|
||||
it 'creates message in second inbox when same source_id exists in different inbox' do
|
||||
# Create a message with same source_id in a different inbox
|
||||
other_whatsapp_channel = create(:channel_whatsapp, sync_templates: false)
|
||||
@ -201,6 +216,21 @@ describe Whatsapp::IncomingMessageService do
|
||||
expect(whatsapp_channel.inbox.conversations.count).to eq(1)
|
||||
expect(whatsapp_channel.inbox.messages.count).to eq(1)
|
||||
end
|
||||
|
||||
it 'acquires contact-level lock to prevent album race conditions' do
|
||||
# When multiple messages from same contact arrive simultaneously (e.g., album),
|
||||
# the contact lock prevents race conditions in conversation creation.
|
||||
# This test verifies the lock is actually being acquired.
|
||||
phone_number = '2423423243'
|
||||
contact_lock_key = "WHATSAPP::CONTACT_LOCK::#{whatsapp_channel.inbox.id}_#{phone_number}"
|
||||
|
||||
allow(Redis::Alfred).to receive(:set).and_call_original
|
||||
allow(Redis::Alfred).to receive(:delete).and_call_original
|
||||
|
||||
described_class.new(inbox: whatsapp_channel.inbox, params: params).perform
|
||||
|
||||
expect(Redis::Alfred).to have_received(:set).with(contact_lock_key, 1, nx: true, ex: 5.seconds)
|
||||
end
|
||||
end
|
||||
|
||||
context 'when unsupported message types' do
|
||||
|
||||
Loading…
Reference in New Issue
Block a user