diff --git a/app/helpers/baileys_helper.rb b/app/helpers/baileys_helper.rb index ea5591212..009b048d8 100644 --- a/app/helpers/baileys_helper.rb +++ b/app/helpers/baileys_helper.rb @@ -1,5 +1,8 @@ module BaileysHelper - def extract_baileys_message_timestamp(timestamp) + CHANNEL_LOCK_ON_OUTGOING_MESSAGE_KEY = 'BAILEYS::CHANNEL_LOCK_ON_OUTGOING_MESSAGE::%s'.freeze + CHANNEL_LOCK_ON_OUTGOING_MESSAGE_TIMEOUT = 15.seconds + + def baileys_extract_message_timestamp(timestamp) # NOTE: Timestamp might be in this format {"low"=>1748003165, "high"=>0, "unsigned"=>true} if timestamp.is_a?(Hash) && timestamp.key?('low') low = timestamp['low'].to_i @@ -10,4 +13,33 @@ module BaileysHelper # NOTE: Timestamp might be a string or a number timestamp.to_i end + + def with_baileys_channel_lock_on_outgoing_message(channel_id, timeout: CHANNEL_LOCK_ON_OUTGOING_MESSAGE_TIMEOUT) + raise ArgumentError, 'A block is required for with_baileys_channel_lock_on_outgoing_message' unless block_given? + + start_time = Time.now.to_i + + # NOTE: On timeout, we ignore the lock and proceed with the block execution + while (Time.now.to_i - start_time) < timeout + break if baileys_lock_channel_on_outgoing_message(channel_id, timeout) + + sleep(0.1) + end + + yield + ensure + baileys_clear_channel_lock_on_outgoing_message(channel_id) + end + + private + + def baileys_lock_channel_on_outgoing_message(channel_id, timeout) + key = format(CHANNEL_LOCK_ON_OUTGOING_MESSAGE_KEY, channel_id: channel_id) + Redis::Alfred.set(key, 1, nx: true, ex: timeout) + end + + def baileys_clear_channel_lock_on_outgoing_message(channel_id) + key = format(CHANNEL_LOCK_ON_OUTGOING_MESSAGE_KEY, channel_id: channel_id) + Redis::Alfred.delete(key) + end end diff --git a/app/services/whatsapp/incoming_message_baileys_service.rb b/app/services/whatsapp/incoming_message_baileys_service.rb index 59d134576..a56a0d6f4 100644 --- a/app/services/whatsapp/incoming_message_baileys_service.rb +++ b/app/services/whatsapp/incoming_message_baileys_service.rb @@ -45,20 +45,25 @@ class Whatsapp::IncomingMessageBaileysService < Whatsapp::IncomingMessageBaseSer @contact_inbox = nil @contact = nil @raw_message = message - handle_message + + next handle_message if incoming? + + # NOTE: Shared lock with Whatsapp::SendOnWhatsappService + # Avoids race conditions when sending messages. + with_baileys_channel_lock_on_outgoing_message(inbox.channel.id) { handle_message } end end def handle_message return if jid_type != 'user' - return if find_message_by_source_id(message_id) || message_under_process? return if message_type == 'protocol' + return if find_message_by_source_id(raw_message_id) || message_under_process? cache_message_source_id_in_redis set_contact unless @contact - Rails.logger.warn "Contact not found for message: #{message_id}" + Rails.logger.warn "Contact not found for message: #{raw_message_id}" return end @@ -160,7 +165,7 @@ class Whatsapp::IncomingMessageBaileysService < Whatsapp::IncomingMessageBaseSer content: message_content, account_id: @inbox.account_id, inbox_id: @inbox.id, - source_id: message_id, + source_id: raw_message_id, sender: incoming? ? @contact : @inbox.account.account_users.first.user, sender_type: incoming? ? 'Contact' : 'User', message_type: incoming? ? :incoming : :outgoing, @@ -173,7 +178,7 @@ class Whatsapp::IncomingMessageBaileysService < Whatsapp::IncomingMessageBaseSer end def message_content_attributes - content_attributes = { external_created_at: extract_baileys_message_timestamp(@raw_message[:messageTimestamp]) } + content_attributes = { external_created_at: baileys_extract_message_timestamp(@raw_message[:messageTimestamp]) } if message_type == 'reaction' content_attributes[:in_reply_to_external_id] = @raw_message.dig(:message, :reactionMessage, :key, :id) content_attributes[:is_reaction] = true @@ -217,7 +222,7 @@ class Whatsapp::IncomingMessageBaileysService < Whatsapp::IncomingMessageBaseSer return filename if filename.present? ext = ".#{message_mimetype.split(';').first.split('/').last}" if message_mimetype.present? - "#{file_content_type}_#{message_id}_#{Time.current.strftime('%Y%m%d')}#{ext}" + "#{file_content_type}_#{raw_message_id}_#{Time.current.strftime('%Y%m%d')}#{ext}" end def message_content @@ -233,7 +238,7 @@ class Whatsapp::IncomingMessageBaileysService < Whatsapp::IncomingMessageBaseSer end end - def message_id + def raw_message_id @raw_message[:key][:id] end @@ -253,17 +258,17 @@ class Whatsapp::IncomingMessageBaileysService < Whatsapp::IncomingMessageBaseSer end def message_under_process? - key = format(Redis::RedisKeys::MESSAGE_SOURCE_KEY, id: message_id) + key = format(Redis::RedisKeys::MESSAGE_SOURCE_KEY, id: raw_message_id) Redis::Alfred.get(key) end def cache_message_source_id_in_redis - key = format(Redis::RedisKeys::MESSAGE_SOURCE_KEY, id: message_id) + key = format(Redis::RedisKeys::MESSAGE_SOURCE_KEY, id: raw_message_id) ::Redis::Alfred.setex(key, true) end def clear_message_source_id_from_redis - key = format(Redis::RedisKeys::MESSAGE_SOURCE_KEY, id: message_id) + key = format(Redis::RedisKeys::MESSAGE_SOURCE_KEY, id: raw_message_id) ::Redis::Alfred.delete(key) end @@ -272,12 +277,17 @@ class Whatsapp::IncomingMessageBaileysService < Whatsapp::IncomingMessageBaseSer updates.each do |update| @message = nil @raw_message = update - handle_update + + next handle_update if incoming? + + # NOTE: Shared lock with Whatsapp::SendOnWhatsappService + # Avoids race conditions when sending messages. + with_baileys_channel_lock_on_outgoing_message(inbox.channel.id) { handle_update } end end def handle_update - raise MessageNotFoundError unless find_message_by_source_id(message_id) + raise MessageNotFoundError unless find_message_by_source_id(raw_message_id) update_status if @raw_message.dig(:update, :status).present? handle_edited_content if @raw_message.dig(:update, :message).present? diff --git a/app/services/whatsapp/providers/whatsapp_baileys_service.rb b/app/services/whatsapp/providers/whatsapp_baileys_service.rb index e55a005af..e4631c37a 100644 --- a/app/services/whatsapp/providers/whatsapp_baileys_service.rb +++ b/app/services/whatsapp/providers/whatsapp_baileys_service.rb @@ -230,7 +230,7 @@ class Whatsapp::Providers::WhatsappBaileysService < Whatsapp::Providers::BaseSer timestamp = response.parsed_response.dig('data', 'messageTimestamp') return unless timestamp - external_created_at = extract_baileys_message_timestamp(timestamp) + external_created_at = baileys_extract_message_timestamp(timestamp) @message.update!(external_created_at: external_created_at) end diff --git a/app/services/whatsapp/send_on_whatsapp_service.rb b/app/services/whatsapp/send_on_whatsapp_service.rb index 186c8b2ae..294ce438f 100644 --- a/app/services/whatsapp/send_on_whatsapp_service.rb +++ b/app/services/whatsapp/send_on_whatsapp_service.rb @@ -1,4 +1,6 @@ class Whatsapp::SendOnWhatsappService < Base::SendOnChannelService + include BaileysHelper + private def channel_class @@ -9,6 +11,8 @@ class Whatsapp::SendOnWhatsappService < Base::SendOnChannelService should_send_template_message = template_params.present? || !message.conversation.can_reply? if should_send_template_message send_template_message + elsif channel.provider == 'baileys' + send_baileys_session_message else send_session_message end @@ -108,6 +112,10 @@ class Whatsapp::SendOnWhatsappService < Base::SendOnChannelService template['components'].find { |obj| obj['type'] == 'BODY' && obj.key?('text') } end + def send_baileys_session_message + with_baileys_channel_lock_on_outgoing_message(channel.id) { send_session_message } + end + def send_session_message message_id = channel.send_message(message.conversation.contact_inbox.source_id, message) message.update!(source_id: message_id) if message_id.present? diff --git a/spec/helpers/baileys_helper_spec.rb b/spec/helpers/baileys_helper_spec.rb index eb9a1b43d..36cce07ef 100644 --- a/spec/helpers/baileys_helper_spec.rb +++ b/spec/helpers/baileys_helper_spec.rb @@ -1,20 +1,100 @@ require 'rails_helper' RSpec.describe BaileysHelper do - let(:timestamp) { 1_748_003_165 } - let(:timestamp_hash) { { 'low' => timestamp, 'high' => 123, 'unsigned' => true } } + describe '#baileys_extract_message_timestamp' do + let(:timestamp_low) { 1_748_003_165 } + let(:timestamp_hash) { { 'low' => timestamp_low, 'high' => 1, 'unsigned' => true } } - it 'extracts the timestamp from a string' do - expect(extract_baileys_message_timestamp(timestamp.to_s)).to eq(timestamp) + it 'extracts the timestamp from a string' do + expect(baileys_extract_message_timestamp(timestamp_low.to_s)).to eq(timestamp_low) + end + + it 'extracts the timestamp from an int' do + expect(baileys_extract_message_timestamp(timestamp_low)).to eq(timestamp_low) + end + + it 'extracts the timestamp from a hash' do + expect(baileys_extract_message_timestamp(timestamp_hash)).to eq(6_042_970_461) + end end - it 'extracts the timestamp from an int' do - expect(extract_baileys_message_timestamp(timestamp)).to eq(timestamp) - end + describe '#with_baileys_channel_lock_on_outgoing_message' do + let(:channel_id) { 1 } + let(:lock_key) { format(BaileysHelper::CHANNEL_LOCK_ON_OUTGOING_MESSAGE_KEY, channel_id: channel_id) } + let(:timeout) { BaileysHelper::CHANNEL_LOCK_ON_OUTGOING_MESSAGE_TIMEOUT } - it 'extracts the timestamp from a hash' do - expected_timestamp = timestamp + (timestamp_hash['high'] << 32) + before do + allow(Redis::Alfred).to receive(:set).and_return(true) + allow(Redis::Alfred).to receive(:delete) + end - expect(extract_baileys_message_timestamp(timestamp_hash)).to eq(expected_timestamp) + context 'when a block is given' do + it 'yields to the block' do + expect { |b| with_baileys_channel_lock_on_outgoing_message(channel_id, &b) }.to yield_control + end + + it 'attempts to acquire the lock' do + with_baileys_channel_lock_on_outgoing_message(channel_id) { nil } + + expect(Redis::Alfred).to have_received(:set).with(lock_key, 1, nx: true, ex: timeout) + end + + it 'clears the lock after the block executes' do + with_baileys_channel_lock_on_outgoing_message(channel_id) { nil } + + expect(Redis::Alfred).to have_received(:delete).with(lock_key) + end + + it 'clears the lock even if the block raises an error' do + expect do + with_baileys_channel_lock_on_outgoing_message(channel_id) { raise 'test error' } + end.to raise_error('test error') + + expect(Redis::Alfred).to have_received(:delete).with(lock_key) + end + + context 'when lock is acquired immediately' do + it 'executes the block and clears the lock' do + expect { |b| with_baileys_channel_lock_on_outgoing_message(channel_id, &b) }.to yield_control + expect(Redis::Alfred).to have_received(:set).with(lock_key, 1, nx: true, ex: timeout) + expect(Redis::Alfred).to have_received(:delete).with(lock_key) + end + end + + context 'when lock is not acquired immediately but within timeout' do + it 'retries acquiring the lock, executes the block, and clears the lock' do + allow(Redis::Alfred).to receive(:set).and_return(false, true) + allow(self).to receive(:sleep) + + expect { |b| with_baileys_channel_lock_on_outgoing_message(channel_id, &b) }.to yield_control + expect(Redis::Alfred).to have_received(:set).with(lock_key, 1, nx: true, ex: timeout).twice + expect(self).to have_received(:sleep).with(0.1).once + expect(Redis::Alfred).to have_received(:delete).once.with(lock_key) + end + end + + context 'when lock is not acquired within timeout' do + it 'still executes the block and clears the lock' do + freeze_time + allow(Redis::Alfred).to receive(:set).and_return(false) + allow(self).to receive(:sleep) { travel_to 1.second.from_now } + + expect { |b| with_baileys_channel_lock_on_outgoing_message(channel_id, &b) }.to yield_control + expect(Redis::Alfred).to have_received(:set) + .with(lock_key, 1, nx: true, ex: timeout) + .exactly(BaileysHelper::CHANNEL_LOCK_ON_OUTGOING_MESSAGE_TIMEOUT.to_i) + expect(Redis::Alfred).to have_received(:delete).once.with(lock_key) + end + end + end + + context 'when no block is given' do + it 'raises an ArgumentError' do + expect do + with_baileys_channel_lock_on_outgoing_message(channel_id) + end.to raise_error(ArgumentError, + 'A block is required for with_baileys_channel_lock_on_outgoing_message') + end + end end end diff --git a/spec/services/whatsapp/send_on_whatsapp_service_spec.rb b/spec/services/whatsapp/send_on_whatsapp_service_spec.rb index a0e3e893a..fafc03737 100644 --- a/spec/services/whatsapp/send_on_whatsapp_service_spec.rb +++ b/spec/services/whatsapp/send_on_whatsapp_service_spec.rb @@ -150,5 +150,34 @@ describe Whatsapp::SendOnWhatsappService do expect(message.reload.source_id).to eq('123456789') end end + + context 'when provider is baileys' do + let(:whatsapp_channel) { create(:channel_whatsapp, provider: 'baileys', validate_provider_config: true) } + let(:contact_inbox) { create(:contact_inbox, inbox: whatsapp_channel.inbox, source_id: '123456789') } + let(:conversation) { create(:conversation, contact_inbox: contact_inbox, inbox: whatsapp_channel.inbox) } + + before do + stub_request(:get, 'https://baileys.api/status/auth') + .with( + headers: { + 'Accept' => '*/*', + 'Accept-Encoding' => 'gzip;q=1.0,deflate;q=0.6,identity;q=0.3', + 'Content-Type' => 'application/json', + 'User-Agent' => 'Ruby', + 'X-Api-Key' => 'test_key' + } + ) + .to_return(status: 200, body: '', headers: {}) + end + + it 'calls channel.send_message if channel is not locked on outgoing message' do + message = create(:message, message_type: :outgoing, content: 'test', conversation: conversation) + allow(whatsapp_channel).to receive(:send_message).with(conversation.contact_inbox.source_id, message).and_return('123456789') + + described_class.new(message: message).perform + + expect(message.reload.source_id).to eq('123456789') + end + end end end