Fallback sync architecture for disconnected POS systems

Pharmacy point-of-sale (POS) systems operate in zero-failure environments where network instability directly threatens controlled substance logging accuracy. When connectivity degrades, inventory decr

Pharmacy point-of-sale (POS) systems operate in zero-failure environments where network instability directly threatens controlled substance logging accuracy. When connectivity degrades, inventory decrements, prescription fulfillment records, and DEA Schedule II–V transaction logs must persist locally without violating 21 CFR Part 1304 recordkeeping mandates or breaching HIPAA Security Rule requirements for data integrity. A resilient fallback architecture decouples transaction capture from upstream synchronization, enforcing strict queue boundaries, cryptographic hashing, and deterministic reconciliation. Within the broader Core Architecture & DEA Compliance Frameworks, offline routing must guarantee zero data loss, prevent duplicate dispensing, and maintain immutable audit trails until network restoration triggers automated batch reconciliation.

Diagnostic Thresholds & Disconnect Detection

Relying on HTTP status codes alone is insufficient for pharmacy environments where intermittent packet loss, DNS resolution failures, or TLS renegotiation delays can corrupt partial payloads. Implement a layered diagnostic approach that aligns with HIPAA §164.312(b) audit control requirements:

  • TCP keepalive probes: 15-second intervals with exponential backoff (max 3 retries) to detect transport-layer degradation before application timeouts.
  • Application-layer heartbeat: Lightweight GET /api/v1/inventory/ping validation against the central pharmacy management system (PMS).
  • Local queue depth monitoring: Configurable overflow thresholds that trigger circuit-breaker behavior when storage utilization exceeds 80%.

Trigger fallback mode when the oldest pending transaction exceeds 120 seconds or when Schedule II queue depth approaches the local storage limit. This prevents uncontrolled queue growth that could breach pharmacy security framework boundaries during prolonged outages. Use the following diagnostic query to evaluate sync readiness and isolate high-risk controlled substance backlogs:

sql
SELECT 
  queue_depth,
  MAX(created_at) AS oldest_pending,
  COUNT(CASE WHEN schedule = 'II' THEN 1 END) AS c2_pending,
  SUM(CASE WHEN sync_status = 'PENDING' THEN 1 ELSE 0 END) AS total_pending
FROM offline_transaction_buffer
WHERE sync_status = 'PENDING'
GROUP BY queue_depth;

Fallback Routing Logic & Queue Architecture

The fallback engine must route transactions through a deterministic, FIFO queue with cryptographic integrity checks. Each offline record receives a monotonic sequence ID, a SHA-256 hash of the canonical payload, and a strict timestamp aligned to UTC. The routing layer evaluates payload classification, applies schedule-specific validation rules, and isolates Schedule II transactions for expedited reconciliation upon reconnection. This design aligns with established Fallback Routing for Offline Sync protocols, ensuring that controlled substance logs never bypass DEA-mandated audit boundaries.

Critical routing considerations include:

  • NDC-11 vs NDC-10 normalization: Strip leading zeros or pad to 11 digits using FDA standard algorithms before hashing to prevent inventory drift during reconciliation.
  • Schedule-specific routing: Schedule II payloads bypass standard batching and are flagged for dual-verification upon sync. Schedules III–V follow standard FIFO ordering.
  • Audit boundary definition: Local storage must be logically isolated from general POS transaction logs. Access is restricted to the sync service account with explicit role-based controls (HIPAA §164.312(a)(1)).

Python Implementation: Offline Buffer & DEA-Compliant Sync Engine

Production-grade fallback systems require explicit state management, idempotency keys, and retry logic with jitter. The following implementation demonstrates a secure, audit-ready offline buffer that enforces cryptographic integrity, deterministic ordering, and regulatory compliance boundaries.

python
import hashlib
import hmac
import json
import logging
import sqlite3
import uuid
from dataclasses import dataclass, asdict
from datetime import datetime, timezone
from typing import Dict, Optional, List
from urllib3.util.retry import Retry
from requests.adapters import HTTPAdapter
import requests

# Configure HIPAA-compliant audit logger (no PHI/PII in logs)
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s | %(levelname)s | %(name)s | %(message)s",
    handlers=[logging.FileHandler("/var/log/pharmacy/sync_audit.log")]
)
logger = logging.getLogger("deafallback.sync_engine")

@dataclass
class TransactionPayload:
    ndc: str
    quantity: int
    schedule: str
    prescription_id: str
    pharmacist_npi: str
    dispensed_at_utc: str
    idempotency_key: str

class OfflineTransactionBuffer:
    """SQLite-backed FIFO queue with cryptographic hashing and WAL mode for crash resilience."""
    
    def __init__(self, db_path: str, encryption_key: Optional[bytes] = None):
        self.db_path = db_path
        self.conn = sqlite3.connect(db_path, timeout=30)
        self.conn.execute("PRAGMA journal_mode=WAL;")
        self.conn.execute("PRAGMA synchronous=NORMAL;")
        self._init_schema()
        
    def _init_schema(self):
        self.conn.execute("""
            CREATE TABLE IF NOT EXISTS offline_queue (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                sequence_id TEXT UNIQUE NOT NULL,
                idempotency_key TEXT UNIQUE NOT NULL,
                payload_hash TEXT NOT NULL,
                payload_json TEXT NOT NULL,
                schedule TEXT NOT NULL,
                sync_status TEXT DEFAULT 'PENDING',
                created_at_utc TEXT NOT NULL,
                retry_count INTEGER DEFAULT 0
            );
            CREATE INDEX IF NOT EXISTS idx_sync_status ON offline_queue(sync_status);
            CREATE INDEX IF NOT EXISTS idx_schedule ON offline_queue(schedule);
        """)
        self.conn.commit()

    def enqueue(self, payload: TransactionPayload) -> str:
        """Persist transaction with SHA-256 hash and monotonic sequence ID."""
        canonical = json.dumps(asdict(payload), sort_keys=True, separators=(",", ":"))
        payload_hash = hashlib.sha256(canonical.encode("utf-8")).hexdigest()
        sequence_id = f"{datetime.now(timezone.utc).strftime('%Y%m%d%H%M%S')}_{uuid.uuid4().hex[:8]}"
        
        self.conn.execute(
            """INSERT INTO offline_queue 
               (sequence_id, idempotency_key, payload_hash, payload_json, schedule, created_at_utc)
               VALUES (?, ?, ?, ?, ?, ?)""",
            (sequence_id, payload.idempotency_key, payload_hash, canonical, payload.schedule, payload.dispensed_at_utc)
        )
        self.conn.commit()
        logger.info(f"Enqueued {payload.schedule} transaction: {sequence_id}")
        return sequence_id

    def fetch_pending(self, limit: int = 50) -> List[Dict]:
        """Retrieve FIFO ordered pending records for sync."""
        cursor = self.conn.execute(
            """SELECT id, sequence_id, payload_json, schedule, retry_count 
               FROM offline_queue 
               WHERE sync_status = 'PENDING' 
               ORDER BY id ASC LIMIT ?""",
            (limit,)
        )
        return [dict(zip(["id", "sequence_id", "payload_json", "schedule", "retry_count"], row)) for row in cursor.fetchall()]

    def mark_synced(self, record_ids: List[int]):
        """Update status to SYNCED after successful upstream acknowledgment."""
        placeholders = ",".join("?" * len(record_ids))
        self.conn.execute(f"UPDATE offline_queue SET sync_status = 'SYNCED' WHERE id IN ({placeholders})", record_ids)
        self.conn.commit()

class SyncEngine:
    """Handles deterministic reconciliation, retry with jitter, and DEA audit trail generation."""
    
    def __init__(self, api_base_url: str, auth_token: str, buffer: OfflineTransactionBuffer):
        self.api_base = api_base_url.rstrip("/")
        self.session = requests.Session()
        self.session.headers.update({"Authorization": f"Bearer {auth_token}", "Content-Type": "application/json"})
        
        # Exponential backoff with jitter for network resilience
        retry_strategy = Retry(
            total=5,
            backoff_factor=1.5,
            status_forcelist=[429, 500, 502, 503, 504],
            allowed_methods=["POST"]
        )
        self.session.mount("https://", HTTPAdapter(max_retries=retry_strategy))
        self.buffer = buffer

    def _verify_payload_integrity(self, payload_json: str, expected_hash: str) -> bool:
        """Validate SHA-256 integrity before transmission (21 CFR Part 11.10 compliance)."""
        actual_hash = hashlib.sha256(payload_json.encode("utf-8")).hexdigest()
        return hmac.compare_digest(actual_hash, expected_hash)

    def sync_batch(self) -> Dict[str, int]:
        """Execute deterministic reconciliation with schedule-aware routing."""
        pending = self.buffer.fetch_pending()
        if not pending:
            return {"synced": 0, "failed": 0}
            
        synced_ids = []
        failed_ids = []
        
        for record in pending:
            try:
                payload_data = json.loads(record["payload_json"])
                # Schedule II requires explicit audit boundary logging
                if record["schedule"] == "II":
                    logger.info(f"DEA C2 reconciliation initiated: {record['sequence_id']}")
                    
                response = self.session.post(
                    f"{self.api_base}/api/v1/transactions/sync",
                    json=payload_data,
                    headers={"X-Idempotency-Key": payload_data["idempotency_key"]},
                    timeout=15
                )
                response.raise_for_status()
                synced_ids.append(record["id"])
                
            except requests.exceptions.RequestException as e:
                logger.warning(f"Sync failed for {record['sequence_id']}: {str(e)}")
                failed_ids.append(record["id"])
                
        self.buffer.mark_synced(synced_ids)
        logger.info(f"Batch reconciliation complete. Synced: {len(synced_ids)}, Failed: {len(failed_ids)}")
        return {"synced": len(synced_ids), "failed": len(failed_ids)}

Deterministic Reconciliation & Audit Boundary Enforcement

Upon network restoration, the sync engine executes batch reconciliation with strict ordering guarantees. Duplicate dispensing is prevented through idempotency key validation at the upstream PMS layer, while local cryptographic hashes ensure payload integrity aligns with FDA 21 CFR Part 11 electronic record requirements. Schedule II transactions are routed through a dedicated reconciliation pipeline that enforces pharmacist signature verification and generates a separate DEA audit manifest.

Key enforcement patterns:

  • Monotonic sequence validation: Upstream systems reject out-of-order payloads, preserving FIFO integrity.
  • Hash mismatch quarantine: Records failing SHA-256 verification are quarantined in a QUARANTINED state with full cryptographic audit trails.
  • Immutable append-only logging: All sync attempts, retries, and final acknowledgments are written to a WORM (Write-Once-Read-Many) compliant log partition, satisfying DEA 1304.04 retention mandates.

Automated Compliance Reporting & Scheduled Delivery

Pharmacy compliance officers require continuous visibility into offline sync performance, controlled substance reconciliation latency, and audit boundary adherence. Automated report generation should produce cryptographically signed PDF/HTML artifacts containing:

  • Schedule II–V transaction reconciliation metrics
  • NDC normalization validation rates
  • Queue depth trends and disconnect event correlation
  • Idempotency conflict resolution logs

Reports are generated using deterministic rendering pipelines, signed with an internal PKI certificate, and delivered via secure SMTP or SFTP to designated compliance endpoints. Delivery schedules align with state board inspection cycles and DEA biennial audit windows. The entire workflow operates within the defined pharmacy security framework architecture, ensuring that all generated artifacts remain tamper-evident and accessible for regulatory review.

By implementing this fallback sync architecture, pharmacy IT teams guarantee continuous operational continuity during network degradation while maintaining strict adherence to DEA, FDA, and HIPAA regulatory boundaries. The combination of cryptographic payload validation, deterministic queue routing, and automated audit reporting transforms offline POS resilience from a reactive contingency into a proactive compliance control.