Newer
Older
import time
import requests
from datetime import datetime, timedelta
from twilio.rest import Client
from dotenv import load_dotenv
import os
import json
from db.db_query import get_config
import redcap as redcap_module # renamed to avoid conflict with appv2.redcap
import sys
import logging
from typing import Dict, Optional, Any
# Load environment variables from .env file and configuration from appv2
#------------------------------------------------------------------
# Configuration using appv2 values
#------------------------------------------------------------------
class Config:
"""
Centralizes configuration loaded from appv2.
"""
def __init__(self):
# REDCap settings
self.redcap_api_url = appv2.redcap.REDCAP_API_URL
self.redcap_api_token = appv2.redcap.REDCAP_API_TOKEN
self.redcap_event_name = appv2.redcap.REDCAP_EVENT_NAME
self.redcap_repeat_instance = appv2.redcap.REDCAP_REPEAT_INSTANCE
# Twilio settings
self.twilio_account_sid = appv2.twilio.TWILIO_ACCOUNT_SID
self.twilio_auth_token = appv2.twilio.TWILIO_AUTH_TOKEN
self.twilio_phone_number = appv2.twilio.TWILIO_PHONE_NUMBER
# Files for persistence
self.processed_records_file = appv2.var.files.processed_records
self.existing_records_file = appv2.var.files.existing_records
# Timing & messaging
self.registration_delay = getattr(appv2.var.timing, 'registration_delay_minutes', 1)
self.polling_interval = getattr(appv2.var.timing, 'polling_interval_seconds', 60)
self.sms_message_template = getattr(
appv2.var.sms, 'message_template',
"Thank you for registering! Please complete your survey using this link: {survey_link}"
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
self.phone_sanitize_regex = getattr(appv2.var.sms, 'sanitize_regex', r'[^0-9]')
self.phone_default_country_code = getattr(appv2.var.sms, 'default_country_code', '+1')
self._validate()
def _validate(self):
"""Ensure that required configuration values are present."""
required = {
'REDCap API URL': self.redcap_api_url,
'REDCap API Token': self.redcap_api_token,
'Twilio Account SID': self.twilio_account_sid,
'Twilio Auth Token': self.twilio_auth_token,
'Twilio Phone Number': self.twilio_phone_number
}
missing = [name for name, value in required.items() if not value]
if missing:
raise ValueError("Missing required configuration: " + ", ".join(missing))
config = Config()
#------------------------------------------------------------------
# Logging configuration
#------------------------------------------------------------------
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('app.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
#------------------------------------------------------------------
# REDCap API client
#------------------------------------------------------------------
class RedCapClient:
"""Handles interactions with the REDCap API."""
def __init__(self):
self.api_url = config.redcap_api_url
self.token = config.redcap_api_token
def _make_request(self, payload: Dict[str, Any]) -> Optional[Any]:
"""Generic helper to make a POST request to REDCap."""
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
response = requests.post(self.api_url, data=payload)
response.raise_for_status()
try:
return response.json()
except ValueError:
# For surveyLink calls the API may return plain text
return response.text
except requests.exceptions.RequestException as e:
logger.error(f"REDCap API request failed: {str(e)}")
return None
def get_records(self) -> Dict[str, Dict]:
"""Retrieve all records from REDCap as a dict keyed by record_id."""
payload = {
"token": self.token,
"content": "record",
"format": "json",
"type": "flat",
"returnFormat": "json"
}
records = self._make_request(payload)
if records and isinstance(records, list):
return {record['record_id']: record for record in records if 'record_id' in record}
return {}
def get_survey_link(self, record_id: str, instrument: str) -> Optional[str]:
"""
Retrieve a survey link for a given record and instrument.
Returns the link if valid (i.e. starts with "http://" or "https://").
"""
"content": "surveyLink",
"record": record_id,
"instrument": instrument,
"event": config.redcap_event_name,
"repeat_instance": config.redcap_repeat_instance,
payload_debug["token"] = "***TOKEN_HIDDEN***"
logger.info(f"REDCap API payload: {payload_debug}")
response = self._make_request(payload)
if isinstance(response, str):
link = response.strip()
if link and (link.startswith("http://") or link.startswith("https://")):
logger.info(f"Survey link for record {record_id} (instrument '{instrument}') generated successfully.")
logger.warning(f"Survey link for record {record_id} (instrument '{instrument}') is invalid: {link}")
logger.error(f"Error retrieving survey link for record {record_id} with instrument {instrument}")
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
#------------------------------------------------------------------
# Twilio SMS client
#------------------------------------------------------------------
class TwilioClient:
"""Handles sending SMS messages using Twilio."""
def __init__(self):
self.client = Client(config.twilio_account_sid, config.twilio_auth_token)
self.phone_number = config.twilio_phone_number
def send_sms(self, to_number: str, message: str, record_id: Optional[str] = None,
survey_link: Optional[str] = None) -> bool:
"""
Send an SMS message. If record_id and survey_link are provided,
log success or failure to REDCap (using the redcap_module functions).
"""
try:
self.client.messages.create(
body=message,
from_=self.phone_number,
to=to_number
)
if record_id and survey_link:
redcap_module.log_message_sent(record_id, to_number, survey_link)
return True
except Exception as e:
logger.error(f"Error sending SMS to {to_number}: {str(e)}")
if record_id and survey_link:
redcap_module.log_message_failed(record_id, to_number, survey_link, str(e))
return False
#------------------------------------------------------------------
# Record state management (loading/saving JSON files)
#------------------------------------------------------------------
class RecordManager:
"""Handles loading and saving of processed and existing record IDs."""
def __init__(self):
self.processed_records = self._load_json_file(config.processed_records_file)
self.existing_records = self._load_json_file(config.existing_records_file)
@staticmethod
def _load_json_file(filename: str) -> Dict:
try:
if os.path.exists(filename):
with open(filename, 'r') as f:
return json.load(f)
except Exception as e:
logger.error(f"Error loading {filename}: {str(e)}")
return {}
def save_processed_records(self):
self._save_json_file(config.processed_records_file, self.processed_records)
def save_existing_records(self):
self._save_json_file(config.existing_records_file, self.existing_records)
@staticmethod
def _save_json_file(filename: str, data: Dict):
try:
with open(filename, 'w') as f:
json.dump(data, f)
except Exception as e:
logger.error(f"Error saving {filename}: {str(e)}")
#------------------------------------------------------------------
# Survey Service – core workflow orchestration
#------------------------------------------------------------------
class SurveyService:
Orchestrates the survey workflow.
First Run Process (run only once):
1. Retrieve all record IDs from REDCap.
2. Save them to existing_records.json.
3. Exit the program.
Subsequent Runs Process:
1. Check each record:
- If in existing_records.json ➔ Skip.
- If new record ➔ Process.
2. Check Registration Status:
- If complete ➔ Add to processing queue.
- If incomplete ➔ Skip.
3. Check Processing Queue:
- If delay complete ➔ Send SMS, log, and add to existing_records.json.
- If delay not complete ➔ Wait.
def __init__(self):
self.redcap = RedCapClient()
self.twilio = TwilioClient()
self.records = RecordManager()
# Attempt to read survey and phone field settings from appv2.var.redcap-var safely.
redcap_config = getattr(appv2.var, 'redcap-var')
if isinstance(redcap_config, dict):
self.status_fields = redcap_config.get('registration_status_fields',
["registration_complete", "registration_status", "reg_complete", "registration"])
survey_instruments = redcap_config.get('survey_instruments', {})
self.survey_primary = survey_instruments.get('primary', "interest_form")
self.survey_fallback = survey_instruments.get('fallback',
["survey", "participant_survey", "follow_up", "followup", "questionnaire"])
self.phone_fields = redcap_config.get('phone_number_fields',
["phone_number", "phone", "mobile", "mobile_number", "cell", "cell_number", "contact_phone", "participant_phone"])
self.status_fields = getattr(redcap_config, 'registration_status_fields',
["registration_complete", "registration_status", "reg_complete", "registration"])
survey_instruments = getattr(redcap_config, 'survey_instruments', {})
self.survey_primary = getattr(survey_instruments, 'primary', "interest_form")
self.survey_fallback = getattr(survey_instruments, 'fallback',
["survey", "participant_survey", "follow_up", "followup", "questionnaire"])
self.phone_fields = getattr(redcap_config, 'phone_number_fields',
["phone_number", "phone", "mobile", "mobile_number", "cell", "cell_number", "contact_phone", "participant_phone"])
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
# Fallback defaults if redcap-var is not defined
self.status_fields = ["registration_complete", "registration_status", "reg_complete", "registration"]
self.survey_primary = "interest_form"
self.survey_fallback = ["survey", "participant_survey", "follow_up", "followup", "questionnaire"]
self.phone_fields = ["phone_number", "phone", "mobile", "mobile_number", "cell", "cell_number", "contact_phone", "participant_phone"]
def handle_first_run(self):
"""
First Run Process:
- Retrieve all records from REDCap.
- Save all record IDs to existing_records.json.
- Exit the program.
"""
logger.info("First run detected: retrieving all record IDs from REDCap.")
records = self.redcap.get_records()
record_ids = {rid: True for rid in records.keys()}
self.records.existing_records = record_ids
self.records.save_existing_records()
logger.info(f"First run complete: saved {len(record_ids)} record IDs to existing_records.json. Exiting program.")
def process_records(self, force_record_id: Optional[str] = None):
"""Process new records and send pending SMS messages."""
current_records = self.redcap.get_records()
new_records = {rid: rec for rid, rec in current_records.items() if rid not in self.records.existing_records}
if new_records:
logger.info(f"Found {len(new_records)} new record(s): {list(new_records.keys())}")
self._process_new_registrations(new_records, force_record_id)
else:
logger.info("No new records found.")
self._process_pending_messages(current_records, force_record_id)
def _process_new_registrations(self, new_records: Dict[str, Dict], force_record_id: Optional[str]):
"""
For each new record, check if registration is complete (using common fields).
If so, mark the record as pending (storing the registration time).
"""
current_time = datetime.now()
for record_id, record in new_records.items():
# If forced, simply add the record to the processing queue.
if force_record_id and record_id == force_record_id:
self.records.processed_records[record_id] = {'registration_time': current_time.isoformat(), 'status': 'pending'}
logger.info(f"Force processing record {record_id}.")
registration_status = None
for field in self.status_fields:
if field in record:
registration_status = record.get(field)
logger.info(f"Record {record_id}: Found registration status in field '{field}': {registration_status}")
break
# If no registration status is found, assume it’s complete.
if not registration_status:
logger.info(f"Record {record_id}: No registration status found – assuming complete.")
self.records.processed_records[record_id] = {'registration_time': current_time.isoformat(), 'status': 'pending'}
continue
# Check if registration is complete.
if registration_status in ["2", 2] or (isinstance(registration_status, str) and registration_status.lower() in ["complete", "completed", "yes", "true"]):
if record_id not in self.records.processed_records:
self.records.processed_records[record_id] = {'registration_time': current_time.isoformat(), 'status': 'pending'}
logger.info(f"Record {record_id}: Registration complete. Marked for processing.")
logger.info(f"Record {record_id}: Already in pending queue.")
logger.info(f"Record {record_id}: Registration not complete (status: {registration_status}).")
self.records.save_processed_records()
def _process_pending_messages(self, current_records: Dict[str, Dict], force_record_id: Optional[str]):
"""
For each pending record, if the delay has completed then send an SMS.
After sending, add the record to existing_records.json and remove it from the pending queue.
"""
current_time = datetime.now()
for record_id, data in list(self.records.processed_records.items()):
reg_time = datetime.fromisoformat(data['registration_time'])
elapsed = current_time - reg_time
# Bypass delay if force_record_id is provided.
if force_record_id and record_id == force_record_id:
ready = True
logger.info(f"Force sending message for record {record_id}.")
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
ready = elapsed >= timedelta(minutes=config.registration_delay)
if not ready:
remaining = (timedelta(minutes=config.registration_delay) - elapsed).total_seconds()
logger.info(f"Record {record_id} not ready. Waiting {remaining:.1f} more seconds.")
if ready:
self._send_survey_message(record_id, current_records)
# After sending, mark record as processed.
self.records.existing_records[record_id] = True
self.records.processed_records.pop(record_id, None)
self.records.save_processed_records()
self.records.save_existing_records()
def _send_survey_message(self, record_id: str, current_records: Dict[str, Dict]):
"""Extract phone number, retrieve a survey link, and send an SMS message."""
record = current_records.get(record_id)
if not record:
logger.error(f"Record {record_id} not found during processing.")
return
phone = self._extract_phone_number(record)
if not phone:
logger.error(f"Record {record_id}: No valid phone number found.")
return
# Attempt to get the survey link using the primary instrument; if unavailable, try fallback instruments.
survey_link = self.redcap.get_survey_link(record_id, self.survey_primary)
if not survey_link:
logger.info(f"Record {record_id}: Primary survey link not found. Trying fallback instruments...")
for instrument in self.survey_fallback:
survey_link = self.redcap.get_survey_link(record_id, instrument)
if survey_link:
logger.info(f"Record {record_id}: Found survey link using instrument '{instrument}'.")
break
if not survey_link:
logger.error(f"Record {record_id}: Could not retrieve survey link.")
return
try:
message = config.sms_message_template.format(survey_link=survey_link)
except Exception:
message = f"Thank you for registering! Please complete your survey using this link: {survey_link}"
if self.twilio.send_sms(phone, message, record_id, survey_link):
logger.info(f"Record {record_id}: Message sent successfully to {phone}.")
else:
logger.error(f"Record {record_id}: Failed to send message to {phone}.")
def _extract_phone_number(self, record: Dict) -> Optional[str]:
"""Extract and format the phone number from common fields."""
phone = None
for field in self.phone_fields:
if record.get(field):
phone = record.get(field)
logger.info(f"Record {record.get('record_id')}: Found phone number in field '{field}': {phone}")
break
if phone:
phone = re.sub(config.phone_sanitize_regex, '', phone)
if not phone.startswith('+'):
phone = f"{config.phone_default_country_code}{phone}"
return phone
return None
def sync_processed_records_from_redcap(self) -> Dict:
"""
Synchronize local processed records with REDCap's message logs.
Parses the 'message_log' field for evidence of a sent message.
"""
logger.info("Synchronizing processed records with REDCap message logs...")
updated_records = {}
if not redcap_module.check_message_log_field_exists():
logger.warning("REDCap 'message_log' field does not exist. Cannot sync processed records.")
return self.records.processed_records
records = self.redcap.get_records()
logger.info(f"Found {len(records)} record(s) in REDCap.")
updated_records = self.records.processed_records.copy()
records_updated = 0
records_added = 0
for record in records.values():
record_id = record.get("record_id")
if not record_id:
continue
message_log = record.get("message_log", "")
if not message_log:
continue
if "message sent" in message_log.lower():
phone_match = re.search(r"Phone:\s*([+\d\-() ]+)", message_log)
link_match = re.search(r"Link:\s*(https?://[^\s\n]+)", message_log)
time_match = re.search(r"\[([0-9-]+ [0-9:]+)\]", message_log)
phone_number = phone_match.group(1).strip() if phone_match else "unknown"
survey_link = link_match.group(1).strip() if link_match else "unknown"
if time_match:
try:
timestamp = datetime.strptime(time_match.group(1).strip(), "%Y-%m-%d %H:%M:%S").isoformat()
except:
timestamp = datetime.now().isoformat()
else:
timestamp = datetime.now().isoformat()
if record_id in updated_records:
records_updated += 1
rec_data = updated_records[record_id]
if rec_data.get('phone_number', "unknown") == "unknown" and phone_number != "unknown":
rec_data['phone_number'] = phone_number
if rec_data.get('survey_link', "unknown") == "unknown" and survey_link != "unknown":
rec_data['survey_link'] = survey_link
if rec_data.get('sent_time', "unknown") == "unknown":
rec_data['sent_time'] = timestamp
else:
records_added += 1
updated_records[record_id] = {
'phone_number': phone_number,
'survey_link': survey_link,
'sent_time': timestamp
}
self.records.processed_records = updated_records
self.records.save_processed_records()
logger.info(f"Synchronization complete: {records_added} added, {records_updated} updated; total {len(updated_records)} processed record(s).")
return updated_records
#------------------------------------------------------------------
# Main entry point with CLI handling
#------------------------------------------------------------------
service = SurveyService()
#------------------------------------------------------------------
# Validate configurations and test connections
#------------------------------------------------------------------
print("\n==================================================")
print(" REDCap-Twilio Survey Link System")
print("==================================================\n")
print("Validating configurations...")
# REDCap configuration details
print(f" - API URL: {config.redcap_api_url[:40]}..." if config.redcap_api_url else " - API URL: NOT CONFIGURED")
print(f" - API Token: {'*' * 10}..." if config.redcap_api_token else " - API Token: NOT CONFIGURED")
print(f" - Event Name: {config.redcap_event_name if config.redcap_event_name else 'Not specified'}")
print(f" - Repeat Instance: {config.redcap_repeat_instance if config.redcap_repeat_instance else 'Not specified'}")
"content": "version",
"format": "json",
"returnFormat": "json"
}
response = requests.post(config.redcap_api_url, data=test_payload)
print(f" REDCap connection successful! Version: {response.text.strip()}")
print(f" REDCap connection FAILED. Status: {response.status_code}")
print(f" REDCap connection FAILED. Error: {str(e)}")
# Twilio configuration details
print(f" - Account SID: {config.twilio_account_sid[:10]}..." if config.twilio_account_sid else " - Account SID: NOT CONFIGURED")
print(f" - Auth Token: {'*' * 10}..." if config.twilio_auth_token else " - Auth Token: NOT CONFIGURED")
print(f" - Phone Number: {config.twilio_phone_number}" if config.twilio_phone_number else " - Phone Number: NOT CONFIGURED")
account = Client(config.twilio_account_sid, config.twilio_auth_token).api.accounts(config.twilio_account_sid).fetch()
print(f" Twilio connection successful! Account status: {account.status}")
print(f" Twilio connection FAILED. Error: {str(e)}")
#------------------------------------------------------------------
# Handle CLI arguments for force send or synchronization
#------------------------------------------------------------------
if len(sys.argv) > 1:
try:
cli_commands = appv2.var.cli.commands
force_cmd = getattr(cli_commands, 'force', 'force')
sync_cmd = getattr(cli_commands, 'sync', 'sync')
help_cmd = getattr(cli_commands, 'help', 'help')
except:
force_cmd = 'force'
sync_cmd = 'sync'
help_cmd = 'help'
if sys.argv[1].lower() == force_cmd and len(sys.argv) > 2:
logger.info(f"Force sending message to record {record_id}.")
service.process_records(force_record_id=record_id)
elif sys.argv[1].lower() == sync_cmd:
service.sync_processed_records_from_redcap()
print("\nUsage:")
print(" python app.py - Run the monitoring process")
print(f" python app.py {force_cmd} <id> - Force send a message to record <id>")
print(f" python app.py {sync_cmd} - Synchronize processed records with REDCap")
print(f" python app.py {help_cmd} - Show this help message")
return
#------------------------------------------------------------------
# FIRST RUN CHECK: If existing_records.json is empty, run first-run process and exit.
#------------------------------------------------------------------
# FIRST RUN CHECK: If existing_records.json is empty, store existing record IDs.
if not service.records.existing_records:
logger.info("First run detected. Saving existing record IDs.")
service.handle_first_run()
# Note: We do not exit here. The program continues to run.
#------------------------------------------------------------------
# Continuous monitoring loop for subsequent runs
#------------------------------------------------------------------
time.sleep(config.polling_interval)
except Exception as e:
logger.error(f"Error during sleep interval: {str(e)}")