Fallback Routing for Offline Sync

Pharmacy inventory systems operate in a zero-tolerance environment for unlogged dispensing events. Network degradation, ISP outages, or POS hardware failures cannot interrupt perpetual inventory track

Pharmacy inventory systems operate in a zero-tolerance environment for unlogged dispensing events. Network degradation, ISP outages, or POS hardware failures cannot interrupt perpetual inventory tracking or fracture the chain of custody for DEA-controlled substances. The fallback routing workflow detailed here establishes a deterministic, locally-buffered synchronization path that maintains DEA 21 CFR 1304 recordkeeping requirements, FDA 21 CFR 211 batch traceability, and HIPAA Security Rule encryption standards during connectivity loss. This operational pattern integrates directly into the broader Core Architecture & DEA Compliance Frameworks and ensures that offline transaction routing never compromises audit integrity or diversion detection thresholds.

1. Network State Detection & Threshold Trigger

The pharmacy management system (PMS) must continuously monitor upstream connectivity without introducing latency to dispensing workflows. A lightweight, asynchronous health probe targets the primary EDI ingestion endpoint using TCP keep-alives and HTTPS HEAD requests. When consecutive probe failures exceed a configurable threshold (default: 3 failures within 15 seconds), the routing state machine transitions to OFFLINE_ROUTING. All outbound inventory events are immediately intercepted by a local middleware layer before reaching the network stack.

python
import asyncio
import aiohttp
from enum import Enum

class SyncState(Enum):
    ONLINE = "ONLINE"
    OFFLINE_ROUTING = "OFFLINE_ROUTING"
    RECONCILING = "RECONCILING"

class NetworkMonitor:
    def __init__(self, endpoint: str, threshold: int = 3, window_sec: int = 15):
        self.endpoint = endpoint
        self.threshold = threshold
        self.window_sec = window_sec
        self.state = SyncState.ONLINE
        self._failure_count = 0
        self._last_failure_ts = 0.0

    async def probe(self) -> None:
        try:
            async with aiohttp.ClientSession() as session:
                async with session.head(self.endpoint, timeout=3.0) as resp:
                    if resp.status == 200:
                        self._reset_failures()
                        return
        except (aiohttp.ClientError, asyncio.TimeoutError):
            self._record_failure()

    def _record_failure(self) -> None:
        import time
        now = time.time()
        if now - self._last_failure_ts > self.window_sec:
            self._failure_count = 0
        self._failure_count += 1
        self._last_failure_ts = now
        if self._failure_count >= self.threshold:
            self.state = SyncState.OFFLINE_ROUTING

    def _reset_failures(self) -> None:
        self._failure_count = 0
        self.state = SyncState.ONLINE

2. Local Buffer Initialization & Cryptographic Enforcement

When the state transitions to OFFLINE_ROUTING, a secure, encrypted local buffer is instantiated on the pharmacy workstation or edge server. HIPAA §164.312(a)(2)(iv) mandates encryption of ePHI at rest. To satisfy this requirement while maintaining high-throughput write performance, the architecture employs SQLite with Write-Ahead Logging (WAL) and application-level AES-256-GCM payload encryption. The buffer schema enforces strict column typing, transactional isolation, and append-only write semantics to prevent retroactive modification or unauthorized deletion.

python
import sqlite3
import os
from cryptography.hazmat.primitives.ciphers.aead import AESGCM

class SecureBuffer:
    def __init__(self, db_path: str, master_key: bytes):
        self.db_path = db_path
        self.aesgcm = AESGCM(master_key)
        self._init_db()

    def _init_db(self) -> None:
        os.makedirs(os.path.dirname(self.db_path), exist_ok=True)
        with sqlite3.connect(self.db_path) as conn:
            conn.execute("PRAGMA journal_mode=WAL;")
            conn.execute("""
                CREATE TABLE IF NOT EXISTS offline_txns (
                    seq_id INTEGER PRIMARY KEY AUTOINCREMENT,
                    encrypted_payload BLOB NOT NULL,
                    nonce BLOB NOT NULL,
                    hash_chain_prev TEXT NOT NULL,
                    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
                ) STRICT;
            """)

    def insert_encrypted(self, payload: bytes, prev_hash: str) -> int:
        nonce = os.urandom(12)
        ciphertext = self.aesgcm.encrypt(nonce, payload, None)
        with sqlite3.connect(self.db_path) as conn:
            cursor = conn.execute(
                "INSERT INTO offline_txns (encrypted_payload, nonce, hash_chain_prev) VALUES (?, ?, ?)",
                (ciphertext, nonce, prev_hash)
            )
            return cursor.lastrowid

3. Transaction Interception & Schema Validation

Every dispensing, receiving, or inventory adjustment event is serialized and validated against the pharmacy’s master drug file before entering the local buffer. NDC formatting must be normalized to the standard 11-digit representation to prevent reconciliation mismatches during sync. Adherence to NDC-11 vs NDC-10 Parsing Standards ensures consistent hyphenation and zero-padding logic across legacy and modern POS systems. DEA schedule classification is resolved concurrently, and lot/expiration metadata is verified against the manufacturer’s certificate of analysis. Invalid payloads are quarantined to a REJECTED_QUEUE with explicit error codes; valid payloads proceed to the transaction log.

python
import json
import re
from pydantic import BaseModel, field_validator
from typing import Optional

class InventoryEvent(BaseModel):
    ndc: str
    schedule: Optional[str] = None
    lot_number: str
    expiration_date: str
    quantity_dispensed: int
    timestamp: str

    @field_validator("ndc")
    @classmethod
    def normalize_ndc(cls, v: str) -> str:
        digits = re.sub(r"\D", "", v)
        if len(digits) == 10:
            return f"0{digits[:5]}-{digits[5:7]}-{digits[7:]}"
        elif len(digits) == 11:
            return f"{digits[:5]}-{digits[5:7]}-{digits[7:]}"
        raise ValueError("NDC must be 10 or 11 digits")

    def to_payload(self) -> bytes:
        return json.dumps(self.model_dump(mode="json")).encode("utf-8")

4. Fallback Routing Execution & Priority Weighting

Validated transactions are written to the local buffer with a monotonically increasing sequence ID and a cryptographic hash chain pointer. The routing daemon applies deterministic priority weighting: Schedule II events are flagged for immediate reconciliation upon restoration, while Schedule III-V and OTC events follow standard FIFO ordering. This tiered approach aligns with Fallback sync architecture for disconnected POS systems and ensures high-risk substances never experience sync latency beyond operational thresholds. Schedule classification logic references the authoritative DEA Schedule II-V Classification Mapping to guarantee accurate diversion risk scoring.

python
import hashlib
from dataclasses import dataclass
from typing import List

# Defined elsewhere on this page (see the surrounding blocks):
# - InventoryEvent
# - SecureBuffer

@dataclass
class PriorityTransaction:
    seq_id: int
    payload_hash: str
    schedule: str
    raw_payload: bytes

class FallbackRouter:
    def __init__(self, buffer: SecureBuffer):
        self.buffer = buffer
        self.prev_hash = "0" * 64  # Genesis hash
        self.priority_queue: List[PriorityTransaction] = []

    def route_transaction(self, event: InventoryEvent) -> None:
        payload = event.to_payload()
        current_hash = hashlib.sha256(payload).hexdigest()
        chain_hash = hashlib.sha256(f"{self.prev_hash}{current_hash}".encode()).hexdigest()
        
        seq_id = self.buffer.insert_encrypted(payload, self.prev_hash)
        self.prev_hash = chain_hash
        
        self.priority_queue.append(PriorityTransaction(
            seq_id=seq_id,
            payload_hash=current_hash,
            schedule=event.schedule or "OTC",
            raw_payload=payload
        ))
        self._sort_by_priority()

    def _sort_by_priority(self) -> None:
        # Schedule II first, then III-V, then OTC
        schedule_order = {"II": 0, "III": 1, "IV": 2, "V": 3, "OTC": 4}
        self.priority_queue.sort(key=lambda x: schedule_order.get(x.schedule, 5))

5. Connectivity Restoration & Idempotent Reconciliation

Upon successful health check restoration, the routing daemon initiates a batched, idempotent upload sequence. Each transaction is transmitted with its original timestamp, cryptographic nonce, and hash chain pointer to guarantee exactly-once delivery semantics. The reconciliation engine validates server-side acknowledgments against the local sequence IDs. If a transmission fails mid-batch, the daemon resumes from the last confirmed seq_id without duplicating records. This resilience pattern is critical for environments with intermittent backhaul, as detailed in Designing offline sync fallbacks for rural pharmacies.

python
import httpx
from typing import Dict, Any

# Defined elsewhere on this page (see the surrounding blocks):
# - FallbackRouter

class ReconciliationDaemon:
    def __init__(self, router: FallbackRouter, sync_endpoint: str):
        self.router = router
        self.sync_endpoint = sync_endpoint
        self.last_confirmed_seq = 0

    async def reconcile(self) -> Dict[str, Any]:
        if not self.router.priority_queue:
            return {"status": "idle"}
            
        batch = self.router.priority_queue[self.last_confirmed_seq:]
        results = {"uploaded": 0, "failed": []}
        
        async with httpx.AsyncClient(timeout=10.0) as client:
            for txn in batch:
                headers = {
                    "X-Idempotency-Key": txn.payload_hash,
                    "X-Sequence-ID": str(txn.seq_id),
                    "Content-Type": "application/json"
                }
                try:
                    resp = await client.post(
                        self.sync_endpoint,
                        content=txn.raw_payload,
                        headers=headers
                    )
                    if resp.status_code in (200, 201, 202):
                        results["uploaded"] += 1
                        self.last_confirmed_seq = txn.seq_id
                    elif resp.status_code == 409:
                        # Already processed by server
                        self.last_confirmed_seq = txn.seq_id
                        results["uploaded"] += 1
                    else:
                        results["failed"].append(txn.seq_id)
                except httpx.RequestError:
                    results["failed"].append(txn.seq_id)
                    break  # Halt on network failure to preserve order
                    
        return results

Compliance Mapping & Audit Readiness

This fallback routing architecture is engineered to satisfy explicit regulatory controls without requiring manual intervention during connectivity loss:

Regulation Requirement Implementation Control
DEA 21 CFR 1304.04 Complete, accurate, and contemporaneous records of controlled substances Append-only SQLite buffer with cryptographic hash chaining prevents retroactive alteration. Sequence IDs guarantee chronological ordering.
HIPAA §164.312(a)(2)(iv) Encryption of ePHI at rest AES-256-GCM payload encryption with unique 12-byte nonces per transaction. Key material derived via HKDF and stored in OS-protected keystores.
FDA 21 CFR 211.188 Batch production and control records Lot/expiration validation at interception. Immutable payload serialization ensures traceability from manufacturer receipt to patient dispensing.
NIST SP 800-111 Storage security for sensitive data Hardware-backed TPM/SE integration recommended for master key storage. Buffer auto-purges after successful reconciliation per retention policies.

Audit readiness is maintained through automated log rotation, cryptographic integrity verification, and deterministic reconciliation reporting. The routing layer generates tamper-evident manifests that can be exported for Automated PDF & HTML Report Generation workflows. All offline events retain their original timestamps, ensuring that diversion detection algorithms operate on accurate temporal data regardless of sync latency.

For cryptographic implementation standards, reference the official Python Cryptography Documentation and the NIST SP 800-111 Guide to Storage Security. Regulatory text for controlled substance recordkeeping is codified in 21 CFR Part 1304.

Explore deeper

Related topics