* fix(whatsapp): add idempotent message sending to prevent duplicates on timeout retry When sending media messages via Baileys, Net::ReadTimeout causes Sidekiq to retry the job, potentially sending the same message multiple times. This adds a chatwootMessageId parameter to the Baileys API request, enabling server-side deduplication via Redis. Also increases HTTP timeout to 120s and channel lock to 130s to reduce false timeouts. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * fix: address review feedback - Use error.class.name assertions for parallel/reloading safety - Assert reconnect endpoint was not called on 409 (stronger assertion) * fix: address review feedback (round 2) - Only release channel lock in ensure if it was actually acquired (prevents clearing another worker's lock on timeout) - Assert chatwootMessageId in reproduction spec body matcher --------- Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
53 lines
1.8 KiB
Ruby
53 lines
1.8 KiB
Ruby
module BaileysHelper
|
|
CHANNEL_LOCK_ON_OUTGOING_MESSAGE_KEY = 'BAILEYS::CHANNEL_LOCK_ON_OUTGOING_MESSAGE::%<channel_id>s'.freeze
|
|
CHANNEL_LOCK_ON_OUTGOING_MESSAGE_TIMEOUT = 130.seconds
|
|
|
|
def baileys_extract_message_timestamp(timestamp)
|
|
# NOTE: Timestamp might be in this format {"low"=>1748003165, "high"=>0, "unsigned"=>true}
|
|
if timestamp.is_a?(Hash) && timestamp.key?('low')
|
|
low = timestamp['low'].to_i
|
|
high = timestamp.fetch('high', 0).to_i
|
|
return (high << 32) | low
|
|
end
|
|
|
|
# NOTE: Timestamp might be a string or a number
|
|
timestamp.to_i
|
|
end
|
|
|
|
def with_baileys_channel_lock_on_outgoing_message(channel_id, timeout: CHANNEL_LOCK_ON_OUTGOING_MESSAGE_TIMEOUT)
|
|
raise ArgumentError, 'A block is required for with_baileys_channel_lock_on_outgoing_message' unless block_given?
|
|
|
|
start_time = Time.now.to_i
|
|
lock_acquired = false
|
|
|
|
# NOTE: On timeout, we log a warning and proceed with the block execution.
|
|
# The re-check inside the contact lock handles potential duplicates.
|
|
while (Time.now.to_i - start_time) < timeout
|
|
if baileys_lock_channel_on_outgoing_message(channel_id, timeout)
|
|
lock_acquired = true
|
|
break
|
|
end
|
|
|
|
sleep(0.1)
|
|
end
|
|
|
|
Rails.logger.warn "Baileys channel lock timeout for channel #{channel_id} after #{timeout}s - proceeding anyway" unless lock_acquired
|
|
|
|
yield
|
|
ensure
|
|
baileys_clear_channel_lock_on_outgoing_message(channel_id) if lock_acquired
|
|
end
|
|
|
|
private
|
|
|
|
def baileys_lock_channel_on_outgoing_message(channel_id, timeout)
|
|
key = format(CHANNEL_LOCK_ON_OUTGOING_MESSAGE_KEY, channel_id: channel_id)
|
|
Redis::Alfred.set(key, 1, nx: true, ex: timeout)
|
|
end
|
|
|
|
def baileys_clear_channel_lock_on_outgoing_message(channel_id)
|
|
key = format(CHANNEL_LOCK_ON_OUTGOING_MESSAGE_KEY, channel_id: channel_id)
|
|
Redis::Alfred.delete(key)
|
|
end
|
|
end
|