Email Fetching System - מערכת שאיבה אוטומטית עם Celery Beat
מערכת שאיבת האימיילים מאפשרת קבלה אוטומטית של חשבוניות ומסמכים מתיבות דואר אלקטרוני. המערכת פועלת ברקע באמצעות Celery + Celery Beat ומעבדת קבצים מצורפים באופן אוטומטי.
שאיבה אוטומטית
פרוטוקולים נתמכים
סוגי קבצים
עיבוד מיידי
מעבד משימות ברקע
מתזמן משימות
from celery import Celery from celery.schedules import crontab from datetime import timedelta # Create Celery instance celery = Celery( 'techlabs', broker='redis://techlabs-redis:6379/0', backend='redis://techlabs-redis:6379/0' ) # Configuration celery.conf.update( task_serializer='json', accept_content=['json'], result_serializer='json', timezone='Asia/Jerusalem', enable_utc=True, # Task settings task_track_started=True, task_time_limit=30 * 60, # 30 minutes task_soft_time_limit=25 * 60, # 25 minutes task_acks_late=True, worker_prefetch_multiplier=1, # Beat schedule - see below beat_schedule={...} )
# Start Celery Worker celery -A app.celery_app worker --loglevel=info --concurrency=4 # Start Celery Beat (scheduler) celery -A app.celery_app beat --loglevel=info # Combined (development only) celery -A app.celery_app worker --beat --loglevel=info
המערכת מריצה משימות ברקע באופן אוטומטי לפי לוח זמנים:
| משימה | תדירות | תיאור | קובץ |
|---|---|---|---|
fetch_all_accounts_task |
כל 5 דקות | שאיבת אימיילים מכל החשבונות הפעילים | tasks_email_integration.py |
poll_all_mailboxes |
כל 5 דקות | סריקת תיבות דואר (IMAP) | tasks/email_tasks.py |
process_pending_ocr_jobs |
כל 2 דקות | עיבוד משימות OCR ממתינות | tasks/ocr_tasks.py |
auto_categorize_orphans |
כל 10 דקות | קטגוריזציה אוטומטית של מסמכים | tasks/categorization_tasks.py |
cleanup_old_attachments |
יומי 01:00 | מחיקת קבצים ישנים (30+ ימים) | tasks_email_integration.py |
send_fetch_summary_email |
יומי 08:00 | שליחת דוח סיכום יומי | tasks_email_integration.py |
cleanup_old_notifications |
שבועי (ראשון) | ניקוי התראות ישנות | tasks/email_tasks.py |
beat_schedule = {
# Email fetching - every 5 minutes
'email-integration-fetch-all-accounts': {
'task': 'app.tasks_email_integration.fetch_all_accounts_task',
'schedule': timedelta(minutes=5),
'options': {
'expires': 240 # 4 minutes
}
},
# OCR processing - every 2 minutes
'process-pending-ocr-jobs': {
'task': 'app.tasks.ocr_tasks.process_pending_ocr_jobs',
'schedule': crontab(minute='*/2')
},
# Cleanup - daily at 1:00 AM
'email-integration-cleanup-attachments': {
'task': 'app.tasks_email_integration.cleanup_old_attachments_task',
'schedule': crontab(hour=1, minute=0),
'kwargs': { 'days_old': 30 }
}
}
חשבונות אימייל נשמרים בטבלת email_accounts ומנוהלים דרך ה-API או ממשק המשתמש.
class EmailAccount(db.Model): __tablename__ = 'email_accounts' id = db.Column(db.Integer, primary_key=True) email = db.Column(db.String(255), nullable=False) provider = db.Column(db.String(50)) # 'gmail', 'imap', 'outlook' # IMAP Configuration imap_server = db.Column(db.String(255)) imap_port = db.Column(db.Integer, default=993) imap_username = db.Column(db.String(255)) imap_password_encrypted = db.Column(db.Text) # AES-256 encrypted # Gmail OAuth gmail_token_encrypted = db.Column(db.Text) gmail_refresh_token_encrypted = db.Column(db.Text) # Auto-fetch settings auto_fetch = db.Column(db.Boolean, default=True) fetch_interval = db.Column(db.Integer, default=300) # seconds last_fetch = db.Column(db.DateTime) is_active = db.Column(db.Boolean, default=True) # Relationships company_id = db.Column(db.Integer, db.ForeignKey('scanning_companies.id')) user_id = db.Column(db.Integer, db.ForeignKey('users.id'))
ENCRYPTION_KEY
להתחברות לשרתי IMAP סטנדרטיים:
| Server: | imap.gmail.com |
| Port: | 993 |
| SSL: | Yes |
| Server: | outlook.office365.com |
| Port: | 993 |
| SSL: | Yes |
import imaplib import email def fetch_imap_messages(account): # Decrypt password password = decrypt(account.imap_password_encrypted) # Connect imap = imaplib.IMAP4_SSL( account.imap_server, account.imap_port ) imap.login(account.imap_username, password) # Select INBOX imap.select('INBOX') # Search for unread messages _, message_numbers = imap.search(None, 'UNSEEN') for num in message_numbers[0].split(): _, msg_data = imap.fetch(num, '(RFC822)') message = email.message_from_bytes(msg_data[0][1]) # Process attachments for part in message.walk(): if part.get_content_disposition() == 'attachment': save_attachment(part) imap.close() imap.logout()
התחברות ל-Gmail באמצעות OAuth 2.0 מאפשרת גישה בטוחה יותר ללא צורך בסיסמה.
צור פרויקט חדש ב-console.cloud.google.com
APIs & Services → Enable Gmail API
Credentials → Create Credentials → OAuth 2.0 Client ID
https://labs.levor.io/api/email-integration/gmail/callback
GMAIL_CLIENT_ID=your_client_id.apps.googleusercontent.com GMAIL_CLIENT_SECRET=your_client_secret GMAIL_REDIRECT_URI=https://labs.levor.io/api/email-integration/gmail/callback
from google.oauth2.credentials import Credentials from googleapiclient.discovery import build def fetch_gmail_messages(account): # Create credentials from stored tokens creds = Credentials( token=decrypt(account.gmail_token_encrypted), refresh_token=decrypt(account.gmail_refresh_token_encrypted), token_uri='https://oauth2.googleapis.com/token', client_id=app.config['GMAIL_CLIENT_ID'], client_secret=app.config['GMAIL_CLIENT_SECRET'] ) # Build Gmail service service = build('gmail', 'v1', credentials=creds) # Fetch messages with attachments results = service.users().messages().list( userId='me', labelIds=['INBOX'], q='has:attachment', maxResults=50 ).execute() for msg_meta in results.get('messages', []): msg = service.users().messages().get( userId='me', id=msg_meta['id'], format='full' ).execute() process_gmail_message(account.id, msg, service)
fetch_all_accounts_task כל 5 דקותprocess_email_taskScannedDocumentservices: # Redis - Message Broker redis: image: redis:7-alpine container_name: techlabs-redis restart: unless-stopped networks: - techlabs_network # Celery Worker celery-worker: build: ./sites/techlabs container_name: techlabs-prod-celery-worker command: celery -A app.celery_app worker --loglevel=info --concurrency=4 restart: unless-stopped depends_on: - redis - db environment: - DATABASE_URL=postgresql://... - CELERY_BROKER_URL=redis://techlabs-redis:6379/0 - CELERY_RESULT_BACKEND=redis://techlabs-redis:6379/0 - GOOGLE_AI_API_KEY=${GOOGLE_AI_API_KEY} networks: - techlabs_network volumes: - ./sites/techlabs:/app # Celery Beat (Scheduler) celery-beat: build: ./sites/techlabs container_name: techlabs-prod-celery-beat command: celery -A app.celery_app beat --loglevel=info restart: unless-stopped depends_on: - redis environment: - CELERY_BROKER_URL=redis://techlabs-redis:6379/0 networks: - techlabs_network volumes: - ./sites/techlabs:/app
# View container status docker ps | grep celery # View worker logs docker logs -f techlabs-prod-celery-worker # View beat logs docker logs -f techlabs-prod-celery-beat # Restart workers docker restart techlabs-prod-celery-worker techlabs-prod-celery-beat # Execute task manually docker exec techlabs-prod-web python -c " from app.tasks_email_integration import fetch_all_accounts_task result = fetch_all_accounts_task.apply() print(result.get()) "
# Real-time worker logs docker logs -f techlabs-prod-celery-worker --tail=100 # Search for specific task docker logs techlabs-prod-celery-worker 2>&1 | grep "fetch_all_accounts" # View errors only docker logs techlabs-prod-celery-worker 2>&1 | grep -i error
# Install Flower pip install flower # Run Flower celery -A app.celery_app flower --port=5555 # Access at: http://localhost:5555
flower.labs.levor.io
בדיקות:
docker ps | grep beatdocker exec techlabs-redis redis-cli pingdocker logs techlabs-prod-celery-beatפתרון:
docker restart techlabs-prod-celery-beat
סיבות אפשריות:
is_active=False)בדיקה:
docker exec techlabs-prod-db psql -U techlab_user -d techlab_db -c " SELECT id, email, provider, is_active, last_fetch FROM email_accounts WHERE is_active = true;"
פתרונות:
--concurrency=8docker exec techlabs-prod-celery-worker celery -A app.celery_app inspect active
docker exec techlabs-redis redis-cli FLUSHALL
docker restart techlabs-prod-celery-worker
שגיאות נפוצות:
invalid_grant - Token פג תוקף, יש לחבר מחדשaccess_denied - המשתמש לא אישר את ההרשאותredirect_uri_mismatch - URI לא תואם ל-GCP Consoleפתרון: