feat: add IMAP historical email import rake task (#234)
* feat: add IMAP historical email import rake task Adds `imap:import` rake task to bulk-import emails from IMAP into a Chatwoot inbox with parallel workers, progress tracking, and proper backdating of message/conversation timestamps. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * fix: address PR review feedback for IMAP import task - Use IMAP UIDs instead of sequence numbers for stable cross-session refs - Respect channel.imap_enable_ssl instead of hardcoding ssl: true - Add ensure block to guarantee IMAP logout on worker errors - Add open_timeout to prevent hanging connections Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
parent
477e0ad445
commit
9a157a260a
225
lib/tasks/imap_import.rake
Normal file
225
lib/tasks/imap_import.rake
Normal file
@ -0,0 +1,225 @@
|
||||
# frozen_string_literal: true
|
||||
|
||||
require 'net/imap'
|
||||
require 'concurrent'
|
||||
|
||||
def connect_imap(channel, folder)
|
||||
imap = Net::IMAP.new(channel.imap_address, port: channel.imap_port, ssl: channel.imap_enable_ssl, open_timeout: 30)
|
||||
|
||||
if channel.google?
|
||||
token = Google::RefreshOauthTokenService.new(channel: channel).access_token
|
||||
imap.authenticate('XOAUTH2', channel.imap_login, token)
|
||||
elsif channel.microsoft?
|
||||
token = Microsoft::RefreshOauthTokenService.new(channel: channel).access_token
|
||||
imap.authenticate('XOAUTH2', channel.imap_login, token)
|
||||
else
|
||||
imap.authenticate('PLAIN', channel.imap_login, channel.imap_password)
|
||||
end
|
||||
|
||||
imap.select(folder)
|
||||
imap
|
||||
end
|
||||
|
||||
def format_duration(seconds)
|
||||
seconds = seconds.to_i
|
||||
if seconds < 60
|
||||
"#{seconds}s"
|
||||
elsif seconds < 3600
|
||||
"#{seconds / 60}m#{seconds % 60}s"
|
||||
else
|
||||
"#{seconds / 3600}h#{(seconds % 3600) / 60}m"
|
||||
end
|
||||
end
|
||||
|
||||
def validate_inbox(inbox_id)
|
||||
inbox = Inbox.find_by(id: inbox_id)
|
||||
abort "ERROR: Inbox #{inbox_id} not found." unless inbox
|
||||
|
||||
channel = inbox.channel
|
||||
abort "ERROR: Inbox #{inbox_id} is not an email channel." unless channel.is_a?(Channel::Email)
|
||||
abort "ERROR: IMAP is not enabled for inbox #{inbox_id}." unless channel.imap_enabled?
|
||||
|
||||
[inbox, channel]
|
||||
end
|
||||
|
||||
def scan_new_email_uids(imap, channel, uids)
|
||||
new_uids = []
|
||||
skipped = 0
|
||||
|
||||
uids.each_slice(100) do |batch|
|
||||
headers = imap.uid_fetch(batch, 'BODY.PEEK[HEADER.FIELDS (MESSAGE-ID)]')
|
||||
next if headers.blank?
|
||||
|
||||
headers.each do |data|
|
||||
msg_id = Mail.read_from_string(data.attr['BODY[HEADER.FIELDS (MESSAGE-ID)]']).message_id
|
||||
if msg_id.blank? || channel.inbox.messages.exists?(source_id: msg_id)
|
||||
skipped += 1
|
||||
else
|
||||
new_uids << data.attr['UID']
|
||||
end
|
||||
end
|
||||
|
||||
print "\r Scanning headers... #{new_uids.size} new, #{skipped} skipped (#{new_uids.size + skipped}/#{uids.size}) "
|
||||
end
|
||||
|
||||
puts ''
|
||||
[new_uids, skipped]
|
||||
end
|
||||
|
||||
def import_single_email(imap, channel, uid)
|
||||
mail_data = imap.uid_fetch(uid, 'RFC822')
|
||||
return false if mail_data.blank?
|
||||
|
||||
mail_str = mail_data[0].attr['RFC822']
|
||||
return false if mail_str.blank?
|
||||
|
||||
inbound_mail = Mail.read_from_string(mail_str)
|
||||
Imap::ImapMailbox.new.process(inbound_mail, channel)
|
||||
|
||||
# Backdate message created_at to the original email date
|
||||
backdate_message(channel, inbound_mail) if inbound_mail.date.present?
|
||||
true
|
||||
end
|
||||
|
||||
def backdate_message(channel, inbound_mail)
|
||||
message = channel.inbox.messages.find_by(source_id: inbound_mail.message_id)
|
||||
return unless message
|
||||
|
||||
original_date = inbound_mail.date.to_datetime
|
||||
message.update_columns(created_at: original_date) # rubocop:disable Rails/SkipsModelValidations
|
||||
end
|
||||
|
||||
# Fixes conversation timestamps after all messages are imported.
|
||||
# Must run after import to avoid after_create_commit callbacks overwriting values.
|
||||
def backdate_conversations(inbox)
|
||||
puts 'Backdating conversation timestamps...'
|
||||
count = 0
|
||||
|
||||
inbox.conversations.find_each do |conversation|
|
||||
oldest = conversation.messages.minimum(:created_at)
|
||||
newest = conversation.messages.maximum(:created_at)
|
||||
next unless oldest
|
||||
|
||||
conversation.update_columns( # rubocop:disable Rails/SkipsModelValidations
|
||||
created_at: oldest,
|
||||
last_activity_at: newest,
|
||||
agent_last_seen_at: newest
|
||||
)
|
||||
count += 1
|
||||
end
|
||||
|
||||
puts " Updated #{count} conversations."
|
||||
end
|
||||
|
||||
def import_worker(channel, folder, uid_batch, progress) # rubocop:disable Metrics/MethodLength
|
||||
ActiveRecord::Base.connection_pool.with_connection do
|
||||
imap = connect_imap(channel, folder)
|
||||
begin
|
||||
uid_batch.each do |uid|
|
||||
import_single_email(imap, channel, uid)
|
||||
|
||||
progress[:mutex].synchronize do
|
||||
progress[:imported] += 1
|
||||
print_import_progress(progress)
|
||||
end
|
||||
rescue StandardError => e
|
||||
progress[:mutex].synchronize do
|
||||
progress[:errors] += 1
|
||||
print_import_progress(progress)
|
||||
end
|
||||
warn "\n [ERROR] uid #{uid}: #{e.message}"
|
||||
end
|
||||
ensure
|
||||
imap&.logout
|
||||
end
|
||||
end
|
||||
rescue StandardError => e
|
||||
warn "\n [WORKER ERROR] #{e.message}"
|
||||
end
|
||||
|
||||
def print_import_progress(progress)
|
||||
processed = progress[:imported] + progress[:skipped] + progress[:errors]
|
||||
total = progress[:total]
|
||||
pct = ((processed.to_f / total) * 100).round(1)
|
||||
elapsed = Time.zone.now - progress[:started_at]
|
||||
rate = processed.positive? ? (elapsed / processed) : 0
|
||||
eta = rate.positive? ? ((total - processed) * rate) : 0
|
||||
|
||||
print "\r [#{pct}%] #{processed}/#{total} | imported: #{progress[:imported]} | " \
|
||||
"skipped: #{progress[:skipped]} | errors: #{progress[:errors]} | " \
|
||||
"elapsed: #{format_duration(elapsed)} | ETA: #{format_duration(eta)} "
|
||||
end
|
||||
|
||||
namespace :imap do # rubocop:disable Metrics/BlockLength
|
||||
desc 'Import all historical emails from IMAP into a Chatwoot inbox'
|
||||
task :import, %i[inbox_id days folder workers] => :environment do |_task, args| # rubocop:disable Metrics/BlockLength
|
||||
inbox_id = args[:inbox_id]
|
||||
days = (args[:days] || 7).to_i
|
||||
folder = args[:folder] || 'INBOX'
|
||||
workers = (args[:workers] || 4).to_i
|
||||
|
||||
if inbox_id.blank?
|
||||
puts 'Usage: rails imap:import[<inbox_id>,<days>,<folder>,<workers>]'
|
||||
puts ' days: how far back to look (default: 7)'
|
||||
puts ' folder: IMAP folder (default: INBOX)'
|
||||
puts ' workers: parallel connections (default: 4)'
|
||||
puts ''
|
||||
puts 'Tip: set RAILS_MAX_THREADS above worker count to avoid DB pool exhaustion:'
|
||||
puts ' RAILS_MAX_THREADS=10 bundle exec rails imap:import[81,3650,INBOX,8]'
|
||||
next
|
||||
end
|
||||
|
||||
inbox, channel = validate_inbox(inbox_id)
|
||||
|
||||
puts "Inbox: #{inbox.name} (ID: #{inbox.id})"
|
||||
puts "Email: #{channel.email}"
|
||||
puts "Server: #{channel.imap_address}:#{channel.imap_port}"
|
||||
puts "Folder: #{folder}"
|
||||
puts "Lookback: #{days} days"
|
||||
puts "Workers: #{workers}"
|
||||
puts '-' * 60
|
||||
|
||||
# Phase 1: scan headers with a single connection to find new emails
|
||||
imap = connect_imap(channel, folder)
|
||||
since_date = (Time.zone.today - days).strftime('%d-%b-%Y')
|
||||
|
||||
puts "Searching emails since #{since_date}..."
|
||||
uids = imap.uid_search(['SINCE', since_date])
|
||||
puts "Found #{uids.length} emails in #{folder}."
|
||||
|
||||
new_uids, skipped = scan_new_email_uids(imap, channel, uids)
|
||||
imap.logout
|
||||
|
||||
if new_uids.empty?
|
||||
puts 'Nothing new to import.'
|
||||
next
|
||||
end
|
||||
|
||||
puts "Importing #{new_uids.size} emails with #{workers} parallel workers..."
|
||||
progress = {
|
||||
imported: 0, skipped: skipped, errors: 0,
|
||||
total: new_uids.size + skipped, mutex: Mutex.new, started_at: Time.zone.now
|
||||
}
|
||||
|
||||
# Phase 2: split work across N threads, each with its own IMAP connection
|
||||
chunks = new_uids.each_slice((new_uids.size.to_f / workers).ceil).to_a
|
||||
threads = chunks.map do |chunk|
|
||||
Thread.new { import_worker(channel, folder, chunk, progress) }
|
||||
end
|
||||
threads.each(&:join)
|
||||
|
||||
# Phase 3: fix conversation timestamps after all callbacks have settled
|
||||
backdate_conversations(inbox)
|
||||
|
||||
elapsed = Time.zone.now - progress[:started_at]
|
||||
|
||||
puts ''
|
||||
puts '=' * 60
|
||||
puts 'Import complete!'
|
||||
puts " Imported: #{progress[:imported]}"
|
||||
puts " Skipped: #{progress[:skipped]} (already present)"
|
||||
puts " Errors: #{progress[:errors]}"
|
||||
puts " Time: #{format_duration(elapsed)}"
|
||||
puts '=' * 60
|
||||
end
|
||||
end
|
||||
Loading…
Reference in New Issue
Block a user