fix(baileys): race condition send_message/incoming message (#51)
* fix: race condition send_message/incoming message * fix: handle update race condition as well
This commit is contained in:
parent
c3e0e70490
commit
e188262877
@ -1,5 +1,8 @@
|
||||
module BaileysHelper
|
||||
def extract_baileys_message_timestamp(timestamp)
|
||||
CHANNEL_LOCK_ON_OUTGOING_MESSAGE_KEY = 'BAILEYS::CHANNEL_LOCK_ON_OUTGOING_MESSAGE::%<channel_id>s'.freeze
|
||||
CHANNEL_LOCK_ON_OUTGOING_MESSAGE_TIMEOUT = 15.seconds
|
||||
|
||||
def baileys_extract_message_timestamp(timestamp)
|
||||
# NOTE: Timestamp might be in this format {"low"=>1748003165, "high"=>0, "unsigned"=>true}
|
||||
if timestamp.is_a?(Hash) && timestamp.key?('low')
|
||||
low = timestamp['low'].to_i
|
||||
@ -10,4 +13,33 @@ module BaileysHelper
|
||||
# NOTE: Timestamp might be a string or a number
|
||||
timestamp.to_i
|
||||
end
|
||||
|
||||
def with_baileys_channel_lock_on_outgoing_message(channel_id, timeout: CHANNEL_LOCK_ON_OUTGOING_MESSAGE_TIMEOUT)
|
||||
raise ArgumentError, 'A block is required for with_baileys_channel_lock_on_outgoing_message' unless block_given?
|
||||
|
||||
start_time = Time.now.to_i
|
||||
|
||||
# NOTE: On timeout, we ignore the lock and proceed with the block execution
|
||||
while (Time.now.to_i - start_time) < timeout
|
||||
break if baileys_lock_channel_on_outgoing_message(channel_id, timeout)
|
||||
|
||||
sleep(0.1)
|
||||
end
|
||||
|
||||
yield
|
||||
ensure
|
||||
baileys_clear_channel_lock_on_outgoing_message(channel_id)
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def baileys_lock_channel_on_outgoing_message(channel_id, timeout)
|
||||
key = format(CHANNEL_LOCK_ON_OUTGOING_MESSAGE_KEY, channel_id: channel_id)
|
||||
Redis::Alfred.set(key, 1, nx: true, ex: timeout)
|
||||
end
|
||||
|
||||
def baileys_clear_channel_lock_on_outgoing_message(channel_id)
|
||||
key = format(CHANNEL_LOCK_ON_OUTGOING_MESSAGE_KEY, channel_id: channel_id)
|
||||
Redis::Alfred.delete(key)
|
||||
end
|
||||
end
|
||||
|
||||
@ -45,20 +45,25 @@ class Whatsapp::IncomingMessageBaileysService < Whatsapp::IncomingMessageBaseSer
|
||||
@contact_inbox = nil
|
||||
@contact = nil
|
||||
@raw_message = message
|
||||
handle_message
|
||||
|
||||
next handle_message if incoming?
|
||||
|
||||
# NOTE: Shared lock with Whatsapp::SendOnWhatsappService
|
||||
# Avoids race conditions when sending messages.
|
||||
with_baileys_channel_lock_on_outgoing_message(inbox.channel.id) { handle_message }
|
||||
end
|
||||
end
|
||||
|
||||
def handle_message
|
||||
return if jid_type != 'user'
|
||||
return if find_message_by_source_id(message_id) || message_under_process?
|
||||
return if message_type == 'protocol'
|
||||
return if find_message_by_source_id(raw_message_id) || message_under_process?
|
||||
|
||||
cache_message_source_id_in_redis
|
||||
set_contact
|
||||
|
||||
unless @contact
|
||||
Rails.logger.warn "Contact not found for message: #{message_id}"
|
||||
Rails.logger.warn "Contact not found for message: #{raw_message_id}"
|
||||
return
|
||||
end
|
||||
|
||||
@ -160,7 +165,7 @@ class Whatsapp::IncomingMessageBaileysService < Whatsapp::IncomingMessageBaseSer
|
||||
content: message_content,
|
||||
account_id: @inbox.account_id,
|
||||
inbox_id: @inbox.id,
|
||||
source_id: message_id,
|
||||
source_id: raw_message_id,
|
||||
sender: incoming? ? @contact : @inbox.account.account_users.first.user,
|
||||
sender_type: incoming? ? 'Contact' : 'User',
|
||||
message_type: incoming? ? :incoming : :outgoing,
|
||||
@ -173,7 +178,7 @@ class Whatsapp::IncomingMessageBaileysService < Whatsapp::IncomingMessageBaseSer
|
||||
end
|
||||
|
||||
def message_content_attributes
|
||||
content_attributes = { external_created_at: extract_baileys_message_timestamp(@raw_message[:messageTimestamp]) }
|
||||
content_attributes = { external_created_at: baileys_extract_message_timestamp(@raw_message[:messageTimestamp]) }
|
||||
if message_type == 'reaction'
|
||||
content_attributes[:in_reply_to_external_id] = @raw_message.dig(:message, :reactionMessage, :key, :id)
|
||||
content_attributes[:is_reaction] = true
|
||||
@ -217,7 +222,7 @@ class Whatsapp::IncomingMessageBaileysService < Whatsapp::IncomingMessageBaseSer
|
||||
return filename if filename.present?
|
||||
|
||||
ext = ".#{message_mimetype.split(';').first.split('/').last}" if message_mimetype.present?
|
||||
"#{file_content_type}_#{message_id}_#{Time.current.strftime('%Y%m%d')}#{ext}"
|
||||
"#{file_content_type}_#{raw_message_id}_#{Time.current.strftime('%Y%m%d')}#{ext}"
|
||||
end
|
||||
|
||||
def message_content
|
||||
@ -233,7 +238,7 @@ class Whatsapp::IncomingMessageBaileysService < Whatsapp::IncomingMessageBaseSer
|
||||
end
|
||||
end
|
||||
|
||||
def message_id
|
||||
def raw_message_id
|
||||
@raw_message[:key][:id]
|
||||
end
|
||||
|
||||
@ -253,17 +258,17 @@ class Whatsapp::IncomingMessageBaileysService < Whatsapp::IncomingMessageBaseSer
|
||||
end
|
||||
|
||||
def message_under_process?
|
||||
key = format(Redis::RedisKeys::MESSAGE_SOURCE_KEY, id: message_id)
|
||||
key = format(Redis::RedisKeys::MESSAGE_SOURCE_KEY, id: raw_message_id)
|
||||
Redis::Alfred.get(key)
|
||||
end
|
||||
|
||||
def cache_message_source_id_in_redis
|
||||
key = format(Redis::RedisKeys::MESSAGE_SOURCE_KEY, id: message_id)
|
||||
key = format(Redis::RedisKeys::MESSAGE_SOURCE_KEY, id: raw_message_id)
|
||||
::Redis::Alfred.setex(key, true)
|
||||
end
|
||||
|
||||
def clear_message_source_id_from_redis
|
||||
key = format(Redis::RedisKeys::MESSAGE_SOURCE_KEY, id: message_id)
|
||||
key = format(Redis::RedisKeys::MESSAGE_SOURCE_KEY, id: raw_message_id)
|
||||
::Redis::Alfred.delete(key)
|
||||
end
|
||||
|
||||
@ -272,12 +277,17 @@ class Whatsapp::IncomingMessageBaileysService < Whatsapp::IncomingMessageBaseSer
|
||||
updates.each do |update|
|
||||
@message = nil
|
||||
@raw_message = update
|
||||
handle_update
|
||||
|
||||
next handle_update if incoming?
|
||||
|
||||
# NOTE: Shared lock with Whatsapp::SendOnWhatsappService
|
||||
# Avoids race conditions when sending messages.
|
||||
with_baileys_channel_lock_on_outgoing_message(inbox.channel.id) { handle_update }
|
||||
end
|
||||
end
|
||||
|
||||
def handle_update
|
||||
raise MessageNotFoundError unless find_message_by_source_id(message_id)
|
||||
raise MessageNotFoundError unless find_message_by_source_id(raw_message_id)
|
||||
|
||||
update_status if @raw_message.dig(:update, :status).present?
|
||||
handle_edited_content if @raw_message.dig(:update, :message).present?
|
||||
|
||||
@ -230,7 +230,7 @@ class Whatsapp::Providers::WhatsappBaileysService < Whatsapp::Providers::BaseSer
|
||||
timestamp = response.parsed_response.dig('data', 'messageTimestamp')
|
||||
return unless timestamp
|
||||
|
||||
external_created_at = extract_baileys_message_timestamp(timestamp)
|
||||
external_created_at = baileys_extract_message_timestamp(timestamp)
|
||||
@message.update!(external_created_at: external_created_at)
|
||||
end
|
||||
|
||||
|
||||
@ -1,4 +1,6 @@
|
||||
class Whatsapp::SendOnWhatsappService < Base::SendOnChannelService
|
||||
include BaileysHelper
|
||||
|
||||
private
|
||||
|
||||
def channel_class
|
||||
@ -9,6 +11,8 @@ class Whatsapp::SendOnWhatsappService < Base::SendOnChannelService
|
||||
should_send_template_message = template_params.present? || !message.conversation.can_reply?
|
||||
if should_send_template_message
|
||||
send_template_message
|
||||
elsif channel.provider == 'baileys'
|
||||
send_baileys_session_message
|
||||
else
|
||||
send_session_message
|
||||
end
|
||||
@ -108,6 +112,10 @@ class Whatsapp::SendOnWhatsappService < Base::SendOnChannelService
|
||||
template['components'].find { |obj| obj['type'] == 'BODY' && obj.key?('text') }
|
||||
end
|
||||
|
||||
def send_baileys_session_message
|
||||
with_baileys_channel_lock_on_outgoing_message(channel.id) { send_session_message }
|
||||
end
|
||||
|
||||
def send_session_message
|
||||
message_id = channel.send_message(message.conversation.contact_inbox.source_id, message)
|
||||
message.update!(source_id: message_id) if message_id.present?
|
||||
|
||||
@ -1,20 +1,100 @@
|
||||
require 'rails_helper'
|
||||
|
||||
RSpec.describe BaileysHelper do
|
||||
let(:timestamp) { 1_748_003_165 }
|
||||
let(:timestamp_hash) { { 'low' => timestamp, 'high' => 123, 'unsigned' => true } }
|
||||
describe '#baileys_extract_message_timestamp' do
|
||||
let(:timestamp_low) { 1_748_003_165 }
|
||||
let(:timestamp_hash) { { 'low' => timestamp_low, 'high' => 1, 'unsigned' => true } }
|
||||
|
||||
it 'extracts the timestamp from a string' do
|
||||
expect(extract_baileys_message_timestamp(timestamp.to_s)).to eq(timestamp)
|
||||
it 'extracts the timestamp from a string' do
|
||||
expect(baileys_extract_message_timestamp(timestamp_low.to_s)).to eq(timestamp_low)
|
||||
end
|
||||
|
||||
it 'extracts the timestamp from an int' do
|
||||
expect(baileys_extract_message_timestamp(timestamp_low)).to eq(timestamp_low)
|
||||
end
|
||||
|
||||
it 'extracts the timestamp from a hash' do
|
||||
expect(baileys_extract_message_timestamp(timestamp_hash)).to eq(6_042_970_461)
|
||||
end
|
||||
end
|
||||
|
||||
it 'extracts the timestamp from an int' do
|
||||
expect(extract_baileys_message_timestamp(timestamp)).to eq(timestamp)
|
||||
end
|
||||
describe '#with_baileys_channel_lock_on_outgoing_message' do
|
||||
let(:channel_id) { 1 }
|
||||
let(:lock_key) { format(BaileysHelper::CHANNEL_LOCK_ON_OUTGOING_MESSAGE_KEY, channel_id: channel_id) }
|
||||
let(:timeout) { BaileysHelper::CHANNEL_LOCK_ON_OUTGOING_MESSAGE_TIMEOUT }
|
||||
|
||||
it 'extracts the timestamp from a hash' do
|
||||
expected_timestamp = timestamp + (timestamp_hash['high'] << 32)
|
||||
before do
|
||||
allow(Redis::Alfred).to receive(:set).and_return(true)
|
||||
allow(Redis::Alfred).to receive(:delete)
|
||||
end
|
||||
|
||||
expect(extract_baileys_message_timestamp(timestamp_hash)).to eq(expected_timestamp)
|
||||
context 'when a block is given' do
|
||||
it 'yields to the block' do
|
||||
expect { |b| with_baileys_channel_lock_on_outgoing_message(channel_id, &b) }.to yield_control
|
||||
end
|
||||
|
||||
it 'attempts to acquire the lock' do
|
||||
with_baileys_channel_lock_on_outgoing_message(channel_id) { nil }
|
||||
|
||||
expect(Redis::Alfred).to have_received(:set).with(lock_key, 1, nx: true, ex: timeout)
|
||||
end
|
||||
|
||||
it 'clears the lock after the block executes' do
|
||||
with_baileys_channel_lock_on_outgoing_message(channel_id) { nil }
|
||||
|
||||
expect(Redis::Alfred).to have_received(:delete).with(lock_key)
|
||||
end
|
||||
|
||||
it 'clears the lock even if the block raises an error' do
|
||||
expect do
|
||||
with_baileys_channel_lock_on_outgoing_message(channel_id) { raise 'test error' }
|
||||
end.to raise_error('test error')
|
||||
|
||||
expect(Redis::Alfred).to have_received(:delete).with(lock_key)
|
||||
end
|
||||
|
||||
context 'when lock is acquired immediately' do
|
||||
it 'executes the block and clears the lock' do
|
||||
expect { |b| with_baileys_channel_lock_on_outgoing_message(channel_id, &b) }.to yield_control
|
||||
expect(Redis::Alfred).to have_received(:set).with(lock_key, 1, nx: true, ex: timeout)
|
||||
expect(Redis::Alfred).to have_received(:delete).with(lock_key)
|
||||
end
|
||||
end
|
||||
|
||||
context 'when lock is not acquired immediately but within timeout' do
|
||||
it 'retries acquiring the lock, executes the block, and clears the lock' do
|
||||
allow(Redis::Alfred).to receive(:set).and_return(false, true)
|
||||
allow(self).to receive(:sleep)
|
||||
|
||||
expect { |b| with_baileys_channel_lock_on_outgoing_message(channel_id, &b) }.to yield_control
|
||||
expect(Redis::Alfred).to have_received(:set).with(lock_key, 1, nx: true, ex: timeout).twice
|
||||
expect(self).to have_received(:sleep).with(0.1).once
|
||||
expect(Redis::Alfred).to have_received(:delete).once.with(lock_key)
|
||||
end
|
||||
end
|
||||
|
||||
context 'when lock is not acquired within timeout' do
|
||||
it 'still executes the block and clears the lock' do
|
||||
freeze_time
|
||||
allow(Redis::Alfred).to receive(:set).and_return(false)
|
||||
allow(self).to receive(:sleep) { travel_to 1.second.from_now }
|
||||
|
||||
expect { |b| with_baileys_channel_lock_on_outgoing_message(channel_id, &b) }.to yield_control
|
||||
expect(Redis::Alfred).to have_received(:set)
|
||||
.with(lock_key, 1, nx: true, ex: timeout)
|
||||
.exactly(BaileysHelper::CHANNEL_LOCK_ON_OUTGOING_MESSAGE_TIMEOUT.to_i)
|
||||
expect(Redis::Alfred).to have_received(:delete).once.with(lock_key)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
context 'when no block is given' do
|
||||
it 'raises an ArgumentError' do
|
||||
expect do
|
||||
with_baileys_channel_lock_on_outgoing_message(channel_id)
|
||||
end.to raise_error(ArgumentError,
|
||||
'A block is required for with_baileys_channel_lock_on_outgoing_message')
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
@ -150,5 +150,34 @@ describe Whatsapp::SendOnWhatsappService do
|
||||
expect(message.reload.source_id).to eq('123456789')
|
||||
end
|
||||
end
|
||||
|
||||
context 'when provider is baileys' do
|
||||
let(:whatsapp_channel) { create(:channel_whatsapp, provider: 'baileys', validate_provider_config: true) }
|
||||
let(:contact_inbox) { create(:contact_inbox, inbox: whatsapp_channel.inbox, source_id: '123456789') }
|
||||
let(:conversation) { create(:conversation, contact_inbox: contact_inbox, inbox: whatsapp_channel.inbox) }
|
||||
|
||||
before do
|
||||
stub_request(:get, 'https://baileys.api/status/auth')
|
||||
.with(
|
||||
headers: {
|
||||
'Accept' => '*/*',
|
||||
'Accept-Encoding' => 'gzip;q=1.0,deflate;q=0.6,identity;q=0.3',
|
||||
'Content-Type' => 'application/json',
|
||||
'User-Agent' => 'Ruby',
|
||||
'X-Api-Key' => 'test_key'
|
||||
}
|
||||
)
|
||||
.to_return(status: 200, body: '', headers: {})
|
||||
end
|
||||
|
||||
it 'calls channel.send_message if channel is not locked on outgoing message' do
|
||||
message = create(:message, message_type: :outgoing, content: 'test', conversation: conversation)
|
||||
allow(whatsapp_channel).to receive(:send_message).with(conversation.contact_inbox.source_id, message).and_return('123456789')
|
||||
|
||||
described_class.new(message: message).perform
|
||||
|
||||
expect(message.reload.source_id).to eq('123456789')
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
Loading…
Reference in New Issue
Block a user