diff --git a/api-docs/openapi.yaml b/api-docs/openapi.yaml index 50b2fbae..28847fdf 100644 --- a/api-docs/openapi.yaml +++ b/api-docs/openapi.yaml @@ -1,7 +1,7 @@ openapi: 3.1.0 info: title: DBackup API - version: 1.3.0 + version: 1.4.0 description: | REST API for DBackup — a self-hosted database backup automation platform with encryption, compression, and smart retention. diff --git a/package.json b/package.json index 94ec4c51..7749e446 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "dbackup", - "version": "1.3.0", + "version": "1.4.0", "private": true, "scripts": { "dev": "next dev", diff --git a/public/openapi.yaml b/public/openapi.yaml index 50b2fbae..28847fdf 100644 --- a/public/openapi.yaml +++ b/public/openapi.yaml @@ -1,7 +1,7 @@ openapi: 3.1.0 info: title: DBackup API - version: 1.3.0 + version: 1.4.0 description: | REST API for DBackup — a self-hosted database backup automation platform with encryption, compression, and smart retention. diff --git a/scripts/setup-dev-debian.sh b/scripts/setup-dev-debian.sh new file mode 100755 index 00000000..284a7bd7 --- /dev/null +++ b/scripts/setup-dev-debian.sh @@ -0,0 +1,176 @@ +#!/usr/bin/env bash +# ------------------------------------------------------------------- +# DBackup — Install all database client binaries on Debian/Ubuntu +# +# Supported databases: +# MySQL / MariaDB → mysqldump, mysql +# PostgreSQL 14-17 → pg_dump, pg_restore (versioned, like Docker image) +# MongoDB → mongodump, mongorestore, mongosh +# SQLite → sqlite3 +# Redis → redis-cli +# MSSQL → (no binary needed, uses Node.js mssql driver) +# +# Usage: sudo ./scripts/setup-dev-debian.sh +# ------------------------------------------------------------------- +set -euo pipefail + +# Colors +RED='\033[0;31m' +GREEN='\033[0;32m' +YELLOW='\033[1;33m' +NC='\033[0m' + +info() { echo -e "${GREEN}[INFO]${NC} $*"; } +warn() { echo -e "${YELLOW}[WARN]${NC} $*"; } +error() { echo -e "${RED}[ERROR]${NC} $*"; exit 1; } + +# Require root +[[ $EUID -eq 0 ]] || error "This script must be run as root (sudo)." + +# Detect architecture +ARCH=$(dpkg --print-architecture) +info "Detected architecture: $ARCH" + +# ------------------------------------------------------------------- +# 1. Common prerequisites +# ------------------------------------------------------------------- +info "Installing common prerequisites..." +apt-get update -qq +apt-get install -y -qq curl gnupg lsb-release ca-certificates apt-transport-https wget > /dev/null +CODENAME=$(lsb_release -cs) +info "Detected Debian/Ubuntu codename: $CODENAME" + +# ------------------------------------------------------------------- +# 2. MySQL / MariaDB client (mysqldump, mysql) +# ------------------------------------------------------------------- +info "Installing MySQL client tools..." +apt-get install -y -qq default-mysql-client > /dev/null +mysql --version && info "MySQL client installed ✓" || warn "MySQL client check failed" + +# ------------------------------------------------------------------- +# 3. PostgreSQL clients — versioned (14, 16, 17, 18) +# Mirrors the Docker image strategy with /opt/pgXX/bin symlinks +# ------------------------------------------------------------------- +info "Adding PostgreSQL APT repository..." +curl -fsSL https://www.postgresql.org/media/keys/ACCC4CF8.asc | gpg --dearmor -o /usr/share/keyrings/postgresql-archive-keyring.gpg +echo "deb [signed-by=/usr/share/keyrings/postgresql-archive-keyring.gpg] https://apt.postgresql.org/pub/repos/apt ${CODENAME}-pgdg main" > /etc/apt/sources.list.d/pgdg.list +apt-get update -qq + +PG_VERSIONS=(14 16 17 18) +for ver in "${PG_VERSIONS[@]}"; do + info "Installing PostgreSQL $ver client..." + apt-get install -y -qq "postgresql-client-${ver}" > /dev/null 2>&1 || { + warn "PostgreSQL $ver client not available for $CODENAME — skipping" + continue + } + + # Create /opt/pgXX/bin/ symlinks (same layout as Docker image) + mkdir -p "/opt/pg${ver}/bin" + for bin in pg_dump pg_restore psql; do + ln -sf "/usr/lib/postgresql/${ver}/bin/${bin}" "/opt/pg${ver}/bin/${bin}" + done + + "/opt/pg${ver}/bin/pg_dump" --version && info "PostgreSQL $ver client installed ✓" || warn "PostgreSQL $ver validation failed" +done + +# ------------------------------------------------------------------- +# 4. MongoDB Database Tools (mongodump, mongorestore) +# ------------------------------------------------------------------- +info "Installing MongoDB Database Tools..." +if [[ "$ARCH" == "amd64" || "$ARCH" == "arm64" ]]; then + MONGO_INSTALLED=false + + # Detect distro (debian vs ubuntu) and map to a supported codename + DISTRO_ID=$(. /etc/os-release && echo "$ID") + if [[ "$DISTRO_ID" == "ubuntu" ]]; then + # Ubuntu: use codename directly, fallback to noble + MONGO_CODENAME="$CODENAME" + MONGO_REPO_BASE="https://repo.mongodb.org/apt/ubuntu" + else + # Debian: MongoDB only supports specific versions — map to nearest supported + case "$CODENAME" in + bookworm) MONGO_CODENAME="bookworm" ;; + trixie|sid|*) MONGO_CODENAME="bookworm" ;; # Fallback to latest supported + esac + MONGO_REPO_BASE="https://repo.mongodb.org/apt/debian" + fi + + info "Using MongoDB repo for $DISTRO_ID/$MONGO_CODENAME..." + curl -fsSL https://www.mongodb.org/static/pgp/server-8.0.asc | gpg --dearmor --yes -o /usr/share/keyrings/mongodb-server-8.0.gpg + echo "deb [signed-by=/usr/share/keyrings/mongodb-server-8.0.gpg] ${MONGO_REPO_BASE} ${MONGO_CODENAME}/mongodb-org/8.0 main" > /etc/apt/sources.list.d/mongodb-org-8.0.list + apt-get update -qq + apt-get install -y -qq mongodb-database-tools > /dev/null 2>&1 && MONGO_INSTALLED=true + + # Fallback: install via mongodb-org meta-package (includes tools) + if [[ "$MONGO_INSTALLED" == false ]]; then + warn "mongodb-database-tools standalone not available — trying mongodb-org package" + apt-get install -y -qq mongodb-org-tools > /dev/null 2>&1 && MONGO_INSTALLED=true + fi + + if [[ "$MONGO_INSTALLED" == true ]]; then + mongodump --version 2>/dev/null && info "MongoDB Database Tools installed ✓" || warn "mongodump not found in PATH" + else + warn "MongoDB tools could not be installed via APT for $DISTRO_ID/$MONGO_CODENAME" + warn "Install manually: https://www.mongodb.com/try/download/database-tools" + fi + + # Install mongosh (MongoDB Shell) — required for SSH connection tests and database listing + info "Installing MongoDB Shell (mongosh)..." + apt-get install -y -qq mongodb-mongosh > /dev/null 2>&1 && { + mongosh --version 2>/dev/null && info "mongosh installed ✓" + } || { + warn "mongosh not available via APT — trying direct install" + MONGOSH_URL="https://downloads.mongodb.com/compass/mongodb-mongosh_2.5.0_${ARCH}.deb" + wget -q "$MONGOSH_URL" -O /tmp/mongosh.deb 2>/dev/null && dpkg -i /tmp/mongosh.deb > /dev/null 2>&1 && rm -f /tmp/mongosh.deb && { + mongosh --version 2>/dev/null && info "mongosh installed ✓" + } || warn "mongosh installation failed — install manually: https://www.mongodb.com/try/download/shell" + } +else + warn "MongoDB Database Tools: unsupported architecture $ARCH — skipping" +fi + +# ------------------------------------------------------------------- +# 5. SQLite3 +# ------------------------------------------------------------------- +info "Installing SQLite3..." +apt-get install -y -qq sqlite3 > /dev/null +sqlite3 --version && info "SQLite3 installed ✓" || warn "SQLite3 check failed" + +# ------------------------------------------------------------------- +# 6. Redis CLI (redis-cli) +# ------------------------------------------------------------------- +info "Installing Redis tools..." +apt-get install -y -qq redis-tools > /dev/null +redis-cli --version && info "Redis CLI installed ✓" || warn "Redis CLI check failed" + +# ------------------------------------------------------------------- +# 7. Additional tools used by DBackup (SSH, rsync, smbclient) +# ------------------------------------------------------------------- +info "Installing additional tools (SSH, rsync, smbclient)..." +apt-get install -y -qq openssh-client sshpass rsync smbclient openssl zip > /dev/null + +# ------------------------------------------------------------------- +# Summary +# ------------------------------------------------------------------- +echo "" +info "=========================================" +info " DBackup Dev Dependencies — Summary" +info "=========================================" +echo "" +for cmd in mysql mysqldump mongodump mongorestore mongosh sqlite3 redis-cli pg_dump psql rsync smbclient sshpass; do + if command -v "$cmd" &>/dev/null; then + echo -e " ${GREEN}✓${NC} $cmd ($(command -v "$cmd"))" + else + echo -e " ${RED}✗${NC} $cmd (not found)" + fi +done +echo "" +for ver in "${PG_VERSIONS[@]}"; do + if [[ -x "/opt/pg${ver}/bin/pg_dump" ]]; then + echo -e " ${GREEN}✓${NC} /opt/pg${ver}/bin/pg_dump ($("/opt/pg${ver}/bin/pg_dump" --version 2>/dev/null | head -1))" + else + echo -e " ${RED}✗${NC} /opt/pg${ver}/bin/pg_dump (not installed)" + fi +done +echo "" +info "Done. MSSQL uses the Node.js mssql driver — no binary needed." diff --git a/src/app/dashboard/history/page.tsx b/src/app/dashboard/history/page.tsx index cebc78fe..f1f5e2d3 100644 --- a/src/app/dashboard/history/page.tsx +++ b/src/app/dashboard/history/page.tsx @@ -210,6 +210,7 @@ function HistoryContent() { const metadata = selectedLog ? parseMetadata(selectedLog.metadata) : null; const progress = metadata?.progress ?? 0; const stage = metadata?.stage || (selectedLog?.type === "Restore" ? "Restoring..." : "Initializing..."); + const detail = metadata?.detail || null; return (
@@ -291,8 +292,9 @@ function HistoryContent() {
- {selectedLog?.status === "Pending" ? "Waiting in queue..." : stage} - {selectedLog?.status === "Running" && progress > 0 && {progress}%} + {selectedLog?.status === "Pending" ? "Waiting in queue..." : stage} + {detail && — {detail}} + {selectedLog?.status === "Running" && progress > 0 && !detail && {progress}%}
diff --git a/src/components/execution/log-viewer.tsx b/src/components/execution/log-viewer.tsx index c6f9d933..70701800 100644 --- a/src/components/execution/log-viewer.tsx +++ b/src/components/execution/log-viewer.tsx @@ -9,10 +9,11 @@ import { ChevronDown, ArrowDown, Loader2, - Info + Info, + Clock } from "lucide-react"; -import { cn } from "@/lib/utils"; -import { LogEntry } from "@/lib/core/logs"; +import { cn, formatDuration } from "@/lib/utils"; +import { LogEntry, BACKUP_STAGE_ORDER, RESTORE_STAGE_ORDER } from "@/lib/core/logs"; import { Accordion, AccordionContent, @@ -27,6 +28,7 @@ interface LogViewerProps { className?: string; autoScroll?: boolean; status?: string; // Overall job status + executionType?: string; // "Backup" | "Restore" } interface LogGroup { @@ -34,9 +36,11 @@ interface LogGroup { logs: LogEntry[]; status: 'pending' | 'running' | 'success' | 'failed'; startTime?: string; + endTime?: string; + durationMs?: number; } -export function LogViewer({ logs, className, autoScroll = true, status }: LogViewerProps) { +export function LogViewer({ logs, className, autoScroll = true, status, executionType }: LogViewerProps) { const scrollRef = useRef(null); const [shouldAutoScroll, setShouldAutoScroll] = useState(autoScroll); const [activeStages, setActiveStages] = useState([]); @@ -56,47 +60,80 @@ export function LogViewer({ logs, className, autoScroll = true, status }: LogVie level: "info", type: "general", message: parts.slice(1).join(": ") || rawLog, - stage: "General" // Fallback stage + stage: "General" } as LogEntry; }); }, [logs]); - // Grouping Logic + // Grouping Logic — group by stage, sort by stage order, fill pending stages const groupedLogs = useMemo(() => { - const groups: LogGroup[] = []; - let currentGroup: LogGroup | null = null; + // Detect execution type: explicit prop, or infer from stage names + const stageOrder = executionType === "Restore" + ? RESTORE_STAGE_ORDER + : BACKUP_STAGE_ORDER; + // Build a map of stage → logs + const stageMap = new Map(); parsedLogs.forEach(log => { - // Normalize stage name by removing parenthesis content (e.g. "Dumping (50%)" -> "Dumping") - // Also remove trailing ellipsis "..." and trim whitespace - const rawStage = log.stage || "General"; - const stageName = rawStage.replace(/\s*\(.*\).*$/, "").replace(/\.{3,}$/, "").trim(); - - if (!currentGroup || currentGroup.stage !== stageName) { - // Finish previous group status check - if (currentGroup) { - const hasError = currentGroup.logs.some(l => l.level === 'error'); - if (hasError) currentGroup.status = 'failed'; - else currentGroup.status = 'success'; // Assume success if moved to next stage without error - } - - // Start new group - // Start new group - currentGroup = { - stage: stageName, - logs: [], - status: 'running', // Initially running - startTime: log.timestamp - }; - groups.push(currentGroup); - } - - currentGroup.logs.push(log); - if (log.level === 'error') currentGroup.status = 'failed'; + const stage = log.stage || "General"; + if (!stageMap.has(stage)) stageMap.set(stage, []); + stageMap.get(stage)!.push(log); + }); + + // Determine which known stages appeared + const seenStages = new Set(stageMap.keys()); + const isRunning = !status || status === "Running"; + + // Find the furthest known stage reached (by stage order index) + let maxStageIdx = -1; + for (const stage of seenStages) { + const idx = stageOrder.indexOf(stage); + if (idx > maxStageIdx) maxStageIdx = idx; + } + + const groups: LogGroup[] = []; + + // Add known pipeline stages in order + for (let i = 0; i < stageOrder.length; i++) { + const stage = stageOrder[i]; + const stageLogs = stageMap.get(stage); + + if (stageLogs && stageLogs.length > 0) { + const hasError = stageLogs.some(l => l.level === "error"); + const isLast = i === maxStageIdx; + const firstTs = stageLogs[0].timestamp; + const lastTs = stageLogs[stageLogs.length - 1].timestamp; + const duration = new Date(lastTs).getTime() - new Date(firstTs).getTime(); + + groups.push({ + stage, + logs: stageLogs, + status: hasError ? "failed" : (isLast && isRunning) ? "running" : "success", + startTime: firstTs, + endTime: lastTs, + durationMs: duration >= 0 ? duration : undefined, + }); + + stageMap.delete(stage); + } else if (isRunning && i > maxStageIdx && maxStageIdx >= 0) { + // Pending future stage + groups.push({ stage, logs: [], status: "pending" }); + } + } + + // Append any non-pipeline stages (e.g. "General") at the end + for (const [stage, stageLogs] of stageMap) { + const hasError = stageLogs.some(l => l.level === "error"); + groups.push({ + stage, + logs: stageLogs, + status: hasError ? "failed" : "success", + startTime: stageLogs[0].timestamp, }); + } - return groups; - }, [parsedLogs]); + return groups; + }, [parsedLogs, status, executionType]); // Auto-expand latest running stage only if user hasn't manually collapsed/expanded things useEffect(() => { if (userInteracted) return; @@ -157,9 +194,29 @@ export function LogViewer({ logs, className, autoScroll = true, status }: LogVie className="space-y-4" > {groupedLogs.map((group, groupIdx) => { - const isRunning = group.status === 'running' && - groupIdx === groupedLogs.length - 1 && - (!status || status === 'Running'); + const isPending = group.status === "pending"; + + if (isPending) { + return ( +
+
+ + {group.stage} +
+
+ ); + } + + const StageIcon = group.status === "failed" ? AlertCircle + : group.status === "running" ? Loader2 + : CheckCircle2; + + const stageColor = group.status === "failed" ? "text-red-500 dark:text-red-400" + : group.status === "running" ? "text-blue-500 dark:text-blue-400" + : "text-green-500 dark:text-green-400"; return (
- {group.status === 'failed' ? ( - - ) : isRunning ? ( - - ) : ( - - )} - - + + + {group.stage} - - {group.logs.length} logs - +
+ {group.durationMs != null && ( + {formatDuration(group.durationMs)} + )} + {group.logs.length} {group.logs.length === 1 ? "log" : "logs"} +
@@ -259,6 +309,11 @@ function LogItem({ entry }: { entry: LogEntry }) { {entry.message} + {entry.durationMs != null && ( + + {formatDuration(entry.durationMs)} + + )} {hasDetails && ( {isOpen ? : } diff --git a/src/lib/adapters/database/mongodb/dump.ts b/src/lib/adapters/database/mongodb/dump.ts index e35c0d48..2bb8d671 100644 --- a/src/lib/adapters/database/mongodb/dump.ts +++ b/src/lib/adapters/database/mongodb/dump.ts @@ -82,13 +82,18 @@ async function dumpSingleDatabase( log(`Dumping database: ${dbName}`, 'info', 'command', `mongodump ${logArgs.join(' ')}`); const dumpProcess = spawn('mongodump', args); + const stderrBuffer: string[] = []; dumpProcess.stderr.on('data', (data) => { const msg = data.toString().trim(); - if (msg) log(msg, 'info'); + if (msg) stderrBuffer.push(msg); }); await waitForProcess(dumpProcess, 'mongodump'); + + if (stderrBuffer.length > 0) { + log(`mongodump output`, 'info', 'command', stderrBuffer.join('\n')); + } } /** @@ -132,12 +137,16 @@ async function dumpSingleDatabaseSSH( stream.pipe(writeStream); + const stderrChunks: string[] = []; stream.stderr.on('data', (data: any) => { const msg = data.toString().trim(); - if (msg) log(msg, 'info'); + if (msg) stderrChunks.push(msg); }); stream.on('exit', (code: number | null, signal?: string) => { + if (stderrChunks.length > 0) { + log(`mongodump output`, 'info', 'command', stderrChunks.join('\n')); + } if (code === 0) resolve(); else reject(new Error(`Remote mongodump exited with code ${code ?? 'null'}${signal ? ` (signal: ${signal})` : ''}`)); }); @@ -211,14 +220,20 @@ export async function dump( const dumpProcess = spawn('mongodump', args); const writeStream = createWriteStream(destinationPath); + const stderrLines: string[] = []; dumpProcess.stdout.pipe(writeStream); dumpProcess.stderr.on('data', (data) => { - log(data.toString().trim()); + const msg = data.toString().trim(); + if (msg) stderrLines.push(msg); }); await waitForProcess(dumpProcess, 'mongodump'); + + if (stderrLines.length > 0) { + log(`mongodump output`, 'info', 'command', stderrLines.join('\n')); + } } // Case 2: Multiple Databases - TAR archive with individual mongodump per DB else { diff --git a/src/lib/adapters/database/mysql/restore.ts b/src/lib/adapters/database/mysql/restore.ts index 4bef1e10..e360c7c6 100644 --- a/src/lib/adapters/database/mysql/restore.ts +++ b/src/lib/adapters/database/mysql/restore.ts @@ -18,6 +18,7 @@ import { shouldRestoreDatabase, getTargetDatabaseName, } from "../common/tar-utils"; +import { formatBytes } from "@/lib/utils"; import { SshClient, isSSHMode, @@ -134,7 +135,7 @@ async function restoreSingleFile( sourcePath: string, targetDb: string, onLog: (msg: string, level?: LogLevel, type?: LogType, details?: string) => void, - onProgress?: (percentage: number) => void + onProgress?: (percentage: number, detail?: string) => void ): Promise { if (isSSHMode(config)) { return restoreSingleFileSSH(config, sourcePath, targetDb, onLog, onProgress); @@ -188,7 +189,7 @@ async function restoreSingleFileSSH( sourcePath: string, targetDb: string, onLog: (msg: string, level?: LogLevel, type?: LogType, details?: string) => void, - onProgress?: (percentage: number) => void + onProgress?: (percentage: number, detail?: string) => void ): Promise { const stats = await fs.stat(sourcePath); const totalSize = stats.size; @@ -221,7 +222,19 @@ async function restoreSingleFileSSH( // 1. Upload SQL file to remote temp location via SFTP (guarantees data integrity) onLog(`Uploading dump to remote server via SFTP (${(totalSize / 1024 / 1024).toFixed(1)} MB)...`, 'info'); - await ssh.uploadFile(sourcePath, remoteTempFile); + const uploadStart = Date.now(); + await ssh.uploadFile(sourcePath, remoteTempFile, (transferred, total) => { + if (onProgress && total > 0) { + // Upload = 0-90% of total progress + const uploadPercent = Math.round((transferred / total) * 90); + const elapsed = (Date.now() - uploadStart) / 1000; + const speed = elapsed > 0 ? transferred / elapsed : 0; + onProgress(uploadPercent, `${formatBytes(transferred)} / ${formatBytes(total)} — ${formatBytes(speed)}/s`); + } + }); + + // Clear upload progress detail + onProgress?.(90); // Verify upload integrity try { @@ -230,19 +243,18 @@ async function restoreSingleFileSSH( if (remoteSize !== totalSize) { throw new Error(`Upload size mismatch! Local: ${totalSize}, Remote: ${remoteSize}`); } - onLog(`Upload verified: ${(remoteSize / 1024 / 1024).toFixed(1)} MB`); + onLog(`Upload verified: ${(remoteSize / 1024 / 1024).toFixed(1)} MB`, 'success'); } catch (e) { if (e instanceof Error && e.message.includes('mismatch')) throw e; // stat command failed — non-critical } - if (onProgress) onProgress(90); - // 2. Run mysql restore on the remote server from the uploaded file const restoreCmd = remoteEnv(env, `cat ${shellEscape(remoteTempFile)} | ${mysqlBin} ${args.join(" ")}` ); onLog(`Restoring to database (SSH): ${targetDb}`, 'info', 'command', `${mysqlBin} ${args.join(" ")}`); + onProgress?.(95, 'Executing restore command...'); await new Promise((resolve, reject) => { const secrets = [config.password, config.privilegedAuth?.password].filter(Boolean) as string[]; @@ -260,7 +272,7 @@ async function restoreSingleFileSSH( stream.on('exit', (code: number | null, signal?: string) => { stderr.flush(); if (code === 0) { - onProgress?.(100); + onProgress?.(100, ''); resolve(); } else { reject(new Error(`Remote mysql exited with code ${code ?? 'null'}${signal ? ` (signal: ${signal})` : ''}`)); @@ -306,7 +318,7 @@ async function restoreSingleFileSSH( } } -export async function restore(config: MySQLRestoreConfig, sourcePath: string, onLog?: (msg: string, level?: LogLevel, type?: LogType, details?: string) => void, onProgress?: (percentage: number) => void): Promise { +export async function restore(config: MySQLRestoreConfig, sourcePath: string, onLog?: (msg: string, level?: LogLevel, type?: LogType, details?: string) => void, onProgress?: (percentage: number, detail?: string) => void): Promise { const startedAt = new Date(); const logs: string[] = []; const log = (msg: string, level: LogLevel = 'info', type: LogType = 'general', details?: string) => { diff --git a/src/lib/adapters/database/sqlite/dump.ts b/src/lib/adapters/database/sqlite/dump.ts index 5568a7c6..4884a17a 100644 --- a/src/lib/adapters/database/sqlite/dump.ts +++ b/src/lib/adapters/database/sqlite/dump.ts @@ -1,4 +1,5 @@ import { DatabaseAdapter } from "@/lib/core/interfaces"; +import { LogLevel, LogType } from "@/lib/core/logs"; import { spawn } from "child_process"; import fs from "fs"; import { SshClient, shellEscape, extractSqliteSshConfig } from "@/lib/ssh"; @@ -9,9 +10,9 @@ export const dump: DatabaseAdapter["dump"] = async (config, destinationPath, onL const mode = config.mode || "local"; const logs: string[] = []; - const log = (msg: string) => { + const log = (msg: string, level: LogLevel = 'info', type: LogType = 'general', details?: string) => { logs.push(msg); - if (onLog) onLog(msg); + if (onLog) onLog(msg, level, type, details); }; try { @@ -48,12 +49,12 @@ export const dump: DatabaseAdapter["dump"] = async (config, destinationPath, onL } }; -async function dumpLocal(config: SQLiteConfig, destinationPath: string, log: (msg: string) => void, _onProgress?: (percent: number) => void): Promise { +async function dumpLocal(config: SQLiteConfig, destinationPath: string, log: (msg: string, level?: LogLevel, type?: LogType, details?: string) => void, _onProgress?: (percent: number) => void): Promise { const binaryPath = config.sqliteBinaryPath || "sqlite3"; const dbPath = config.path; const writeStream = fs.createWriteStream(destinationPath); - log(`Executing: ${binaryPath} "${dbPath}" .dump`); + log(`Dumping database: ${dbPath}`, 'info', 'command', `${binaryPath} "${dbPath}" .dump`); return new Promise((resolve, reject) => { const child = spawn(binaryPath, [dbPath, ".dump"]); @@ -61,12 +62,12 @@ async function dumpLocal(config: SQLiteConfig, destinationPath: string, log: (ms child.stdout.pipe(writeStream); child.stderr.on("data", (data) => { - log(`[SQLite Stderr]: ${data.toString()}`); + log(`SQLite stderr`, 'warning', 'general', data.toString().trim()); }); child.on("close", (code) => { if (code === 0) { - log("Dump completed successfully."); + log("Dump complete", 'success'); fs.stat(destinationPath, (err, stats) => { if (err) resolve({ success: true }); // Should not happen usually else resolve({ success: true, size: stats.size, path: destinationPath }); @@ -82,7 +83,7 @@ async function dumpLocal(config: SQLiteConfig, destinationPath: string, log: (ms }); } -async function dumpSsh(config: SQLiteConfig, destinationPath: string, log: (msg: string) => void, _onProgress?: (percent: number) => void): Promise { +async function dumpSsh(config: SQLiteConfig, destinationPath: string, log: (msg: string, level?: LogLevel, type?: LogType, details?: string) => void, _onProgress?: (percent: number) => void): Promise { const client = new SshClient(); const writeStream = fs.createWriteStream(destinationPath); const binaryPath = config.sqliteBinaryPath || "sqlite3"; @@ -91,11 +92,11 @@ async function dumpSsh(config: SQLiteConfig, destinationPath: string, log: (msg: const sshConfig = extractSqliteSshConfig(config); if (!sshConfig) throw new Error("SSH host and username are required"); await client.connect(sshConfig); - log("SSH connection established."); + log("SSH connection established"); return new Promise((resolve, reject) => { const command = `${shellEscape(binaryPath)} ${shellEscape(dbPath)} .dump`; - log(`Executing remote command: ${binaryPath} ${dbPath} .dump`); + log(`Dumping database (SSH): ${dbPath}`, 'info', 'command', `${binaryPath} ${dbPath} .dump`); client.execStream(command, (err, stream) => { if (err) { @@ -106,13 +107,13 @@ async function dumpSsh(config: SQLiteConfig, destinationPath: string, log: (msg: stream.pipe(writeStream); stream.stderr.on("data", (data: any) => { - log(`[Remote Stderr]: ${data.toString()}`); + log(`SQLite stderr`, 'warning', 'general', data.toString().trim()); }); stream.on("exit", (code: number | null, signal?: string) => { client.end(); if (code === 0) { - log("Remote dump completed successfully."); + log("Dump complete", 'success'); fs.stat(destinationPath, (err, stats) => { if (err) resolve({ success: true }); else resolve({ success: true, size: stats.size, path: destinationPath }); diff --git a/src/lib/adapters/storage/ftp.ts b/src/lib/adapters/storage/ftp.ts index e674d7e5..59bd1159 100644 --- a/src/lib/adapters/storage/ftp.ts +++ b/src/lib/adapters/storage/ftp.ts @@ -115,11 +115,13 @@ export const FTPAdapter: StorageAdapter = { if (onLog) onLog(`Downloading from FTP: ${source}`, "info", "storage"); - client.trackProgress((info) => { - if (onProgress) { - onProgress(info.bytesOverall, info.bytesOverall); - } - }); + if (onProgress) { + let total = 0; + try { total = await client.size(source); } catch { /* size not supported */ } + client.trackProgress((info) => { + onProgress(info.bytesOverall, total || info.bytesOverall); + }); + } await client.downloadTo(createWriteStream(localPath), source); diff --git a/src/lib/adapters/storage/google-drive.ts b/src/lib/adapters/storage/google-drive.ts index c264c186..0fe2582d 100644 --- a/src/lib/adapters/storage/google-drive.ts +++ b/src/lib/adapters/storage/google-drive.ts @@ -1,7 +1,7 @@ import { StorageAdapter, FileInfo } from "@/lib/core/interfaces"; import { GoogleDriveSchema } from "@/lib/adapters/definitions"; import { google, drive_v3 } from "googleapis"; -import { Readable } from "stream"; +import { Readable, Transform } from "stream"; import fs from "fs/promises"; import { createReadStream, createWriteStream } from "fs"; import path from "path"; @@ -178,8 +178,18 @@ export const GoogleDriveAdapter: StorageAdapter = { const stats = await fs.stat(localPath); const fileSize = stats.size; + // Track upload progress via stream + const fileStream = createReadStream(localPath); + let uploaded = 0; + fileStream.on('data', (chunk) => { + uploaded += chunk.length; + if (onProgress && fileSize > 0) { + onProgress(Math.min(99, Math.round((uploaded / fileSize) * 100))); + } + }); + const media = { - body: createReadStream(localPath), + body: fileStream, }; if (existing) { @@ -242,8 +252,22 @@ export const GoogleDriveAdapter: StorageAdapter = { { responseType: "stream" } ); - const writeStream = createWriteStream(localPath); - await pipeline(res.data as unknown as Readable, writeStream); + const source = res.data as unknown as Readable; + const total = Number(file.size) || 0; + + if (onProgress && total > 0) { + let processed = 0; + const tracker = new Transform({ + transform(chunk, _encoding, callback) { + processed += chunk.length; + onProgress!(processed, total); + callback(null, chunk); + } + }); + await pipeline(source, tracker, createWriteStream(localPath)); + } else { + await pipeline(source, createWriteStream(localPath)); + } if (onLog) onLog(`Google Drive download completed: ${remotePath}`, "info", "storage"); return true; diff --git a/src/lib/adapters/storage/local.ts b/src/lib/adapters/storage/local.ts index e9e14da2..a4262166 100644 --- a/src/lib/adapters/storage/local.ts +++ b/src/lib/adapters/storage/local.ts @@ -40,8 +40,6 @@ export const LocalFileSystemAdapter: StorageAdapter = { try { const destDir = path.dirname(destPath); - if (onLog) onLog(`Preparing local destination: ${destDir}`, 'info', 'general'); - await fs.mkdir(destDir, { recursive: true }); // fs.copyFile does not support progress, so we use streams diff --git a/src/lib/adapters/storage/onedrive.ts b/src/lib/adapters/storage/onedrive.ts index e83b8fdc..0e65b155 100644 --- a/src/lib/adapters/storage/onedrive.ts +++ b/src/lib/adapters/storage/onedrive.ts @@ -4,7 +4,7 @@ import { Client } from "@microsoft/microsoft-graph-client"; import fs from "fs/promises"; import { createReadStream, createWriteStream } from "fs"; import path from "path"; -import { Readable } from "stream"; +import { Readable, Transform } from "stream"; import { pipeline } from "stream/promises"; import { LogLevel, LogType } from "@/lib/core/logs"; import { logger } from "@/lib/logger"; @@ -299,8 +299,22 @@ export const OneDriveAdapter: StorageAdapter = { throw new Error(`Download failed with status ${res.status}`); } - const writeStream = createWriteStream(localPath); - await pipeline(Readable.fromWeb(res.body as any), writeStream); + const total = Number(item.size) || 0; + const source = Readable.fromWeb(res.body as any); + + if (onProgress && total > 0) { + let processed = 0; + const tracker = new Transform({ + transform(chunk, _encoding, callback) { + processed += chunk.length; + onProgress!(processed, total); + callback(null, chunk); + } + }); + await pipeline(source, tracker, createWriteStream(localPath)); + } else { + await pipeline(source, createWriteStream(localPath)); + } if (onLog) onLog(`OneDrive download completed: ${drivePath}`, "info", "storage"); return true; diff --git a/src/lib/adapters/storage/s3.ts b/src/lib/adapters/storage/s3.ts index f1557462..60ab02b0 100644 --- a/src/lib/adapters/storage/s3.ts +++ b/src/lib/adapters/storage/s3.ts @@ -4,6 +4,7 @@ import { S3Client, ListObjectsV2Command, GetObjectCommand, DeleteObjectCommand, import { Upload } from "@aws-sdk/lib-storage"; import { createReadStream, createWriteStream } from "fs"; import { pipeline } from "stream/promises"; +import { Transform } from "stream"; import path from "path"; import { LogLevel, LogType } from "@/lib/core/logs"; import { logger } from "@/lib/logger"; @@ -108,7 +109,7 @@ async function s3Download( internalConfig: S3InternalConfig, remotePath: string, localPath: string, - _onProgress?: (processed: number, total: number) => void, + onProgress?: (processed: number, total: number) => void, _onLog?: (msg: string, level?: LogLevel, type?: LogType, details?: string) => void ): Promise { const client = S3ClientFactory.create(internalConfig); @@ -125,7 +126,21 @@ async function s3Download( if (!webStream) throw new Error("Empty response body"); - await pipeline(webStream, createWriteStream(localPath)); + const total = response.ContentLength ?? 0; + + if (onProgress && total > 0) { + let processed = 0; + const tracker = new Transform({ + transform(chunk, _encoding, callback) { + processed += chunk.length; + onProgress(processed, total); + callback(null, chunk); + } + }); + await pipeline(webStream, tracker, createWriteStream(localPath)); + } else { + await pipeline(webStream, createWriteStream(localPath)); + } return true; } catch (error) { log.error("S3 download failed", { bucket: internalConfig.bucket, targetKey }, wrapError(error)); diff --git a/src/lib/adapters/storage/sftp.ts b/src/lib/adapters/storage/sftp.ts index 56f2d1fa..96fbe49b 100644 --- a/src/lib/adapters/storage/sftp.ts +++ b/src/lib/adapters/storage/sftp.ts @@ -156,7 +156,7 @@ export const SFTPAdapter: StorageAdapter = { } }, - async download(config: SFTPConfig, remotePath: string, localPath: string): Promise { + async download(config: SFTPConfig, remotePath: string, localPath: string, onProgress?: (processed: number, total: number) => void): Promise { let sftp: Client | null = null; try { sftp = await connectSFTP(config); @@ -165,7 +165,19 @@ export const SFTPAdapter: StorageAdapter = { ? path.posix.join(config.pathPrefix, remotePath) : remotePath; - await sftp.get(source, localPath); + if (onProgress) { + const stat = await sftp.stat(source); + const total = stat.size; + let processed = 0; + await sftp.fastGet(source, localPath, { + step: (transferred) => { + processed = transferred; + onProgress(processed, total); + } + }); + } else { + await sftp.get(source, localPath); + } return true; } catch (error) { log.error("SFTP download failed", { host: config.host, remotePath }, wrapError(error)); diff --git a/src/lib/adapters/storage/webdav.ts b/src/lib/adapters/storage/webdav.ts index 57b3241e..7054bfcf 100644 --- a/src/lib/adapters/storage/webdav.ts +++ b/src/lib/adapters/storage/webdav.ts @@ -2,6 +2,7 @@ import { StorageAdapter, FileInfo } from "@/lib/core/interfaces"; import { WebDAVSchema } from "@/lib/adapters/definitions"; import { createClient, WebDAVClient, FileStat } from "webdav"; import { createWriteStream } from "fs"; +import { Transform } from "stream"; import fs from "fs/promises"; import path from "path"; import { pipeline } from "stream/promises"; @@ -71,7 +72,7 @@ export const WebDAVAdapter: StorageAdapter = { } }, - async download(config: WebDAVConfig, remotePath: string, localPath: string, _onProgress?: (processed: number, total: number) => void, onLog?: (msg: string, level?: LogLevel, type?: LogType, details?: string) => void): Promise { + async download(config: WebDAVConfig, remotePath: string, localPath: string, onProgress?: (processed: number, total: number) => void, onLog?: (msg: string, level?: LogLevel, type?: LogType, details?: string) => void): Promise { try { const client = getClient(config); const source = resolvePath(config, remotePath); @@ -79,8 +80,29 @@ export const WebDAVAdapter: StorageAdapter = { if (onLog) onLog(`Downloading from WebDAV: ${source}`, "info", "storage"); const readStream = client.createReadStream(source); - const writeStream = createWriteStream(localPath); - await pipeline(readStream, writeStream); + + if (onProgress) { + try { + const stat = await client.stat(source) as FileStat; + const total = stat.size; + if (total > 0) { + let processed = 0; + const tracker = new Transform({ + transform(chunk, _encoding, callback) { + processed += chunk.length; + onProgress!(processed, total); + callback(null, chunk); + } + }); + await pipeline(readStream, tracker, createWriteStream(localPath)); + return true; + } + } catch { + // stat failed — proceed without progress + } + } + + await pipeline(readStream, createWriteStream(localPath)); return true; } catch (error: unknown) { diff --git a/src/lib/core/interfaces.ts b/src/lib/core/interfaces.ts index be81ce26..0d8782e8 100644 --- a/src/lib/core/interfaces.ts +++ b/src/lib/core/interfaces.ts @@ -119,7 +119,7 @@ export interface DatabaseAdapter extends BaseAdapter { * @param onLog Optional callback for live logs * @param onProgress Optional callback for progress (0-100) */ - restore(config: AdapterConfig, sourcePath: string, onLog?: (msg: string, level?: LogLevel, type?: LogType, details?: string) => void, onProgress?: (percentage: number) => void): Promise; + restore(config: AdapterConfig, sourcePath: string, onLog?: (msg: string, level?: LogLevel, type?: LogType, details?: string) => void, onProgress?: (percentage: number, detail?: string) => void): Promise; /** * Optional method to analyze a dump file and return contained databases diff --git a/src/lib/core/logs.ts b/src/lib/core/logs.ts index 142c5577..80391aa1 100644 --- a/src/lib/core/logs.ts +++ b/src/lib/core/logs.ts @@ -6,8 +6,91 @@ export interface LogEntry { level: LogLevel; type: LogType; message: string; - stage?: string; // High-level stage grouping (e.g. "Initialize", "Dump", "Upload") + stage?: string; // High-level stage grouping — should be a PipelineStage value details?: string; // For long output like stdout/stderr context?: Record; // For metadata durationMs?: number; } + +// --- Pipeline Stage System --- + +export const PIPELINE_STAGES = { + QUEUED: "Queued", + INITIALIZING: "Initializing", + DUMPING: "Dumping Database", + PROCESSING: "Processing", + UPLOADING: "Uploading", + VERIFYING: "Verifying", + RETENTION: "Applying Retention", + NOTIFICATIONS: "Sending Notifications", + COMPLETED: "Completed", + FAILED: "Failed", +} as const; + +export type PipelineStage = typeof PIPELINE_STAGES[keyof typeof PIPELINE_STAGES]; + +/** Restore-specific stages */ +export const RESTORE_STAGES = { + INITIALIZING: "Initializing", + DOWNLOADING: "Downloading", + DECRYPTING: "Decrypting", + DECOMPRESSING: "Decompressing", + RESTORING: "Restoring Database", + COMPLETED: "Completed", + FAILED: "Failed", + CANCELLED: "Cancelled", +} as const; + +export type RestoreStage = typeof RESTORE_STAGES[keyof typeof RESTORE_STAGES]; + +/** Ordered list of backup stages for frontend rendering */ +export const BACKUP_STAGE_ORDER: string[] = [ + PIPELINE_STAGES.QUEUED, + PIPELINE_STAGES.INITIALIZING, + PIPELINE_STAGES.DUMPING, + PIPELINE_STAGES.PROCESSING, + PIPELINE_STAGES.UPLOADING, + PIPELINE_STAGES.VERIFYING, + PIPELINE_STAGES.RETENTION, + PIPELINE_STAGES.NOTIFICATIONS, + PIPELINE_STAGES.COMPLETED, +]; + +/** Ordered list of restore stages for frontend rendering */ +export const RESTORE_STAGE_ORDER: string[] = [ + RESTORE_STAGES.INITIALIZING, + RESTORE_STAGES.DOWNLOADING, + RESTORE_STAGES.DECRYPTING, + RESTORE_STAGES.DECOMPRESSING, + RESTORE_STAGES.RESTORING, + RESTORE_STAGES.COMPLETED, +]; + +/** @deprecated Use BACKUP_STAGE_ORDER instead */ +export const STAGE_ORDER: PipelineStage[] = BACKUP_STAGE_ORDER as PipelineStage[]; + +/** Progress ranges [min, max] for each stage, forming a continuous 0→100 scale */ +export const STAGE_PROGRESS_MAP: Record = { + [PIPELINE_STAGES.QUEUED]: [0, 0], + [PIPELINE_STAGES.INITIALIZING]: [0, 5], + [PIPELINE_STAGES.DUMPING]: [5, 45], + [PIPELINE_STAGES.PROCESSING]: [45, 65], + [PIPELINE_STAGES.UPLOADING]: [65, 88], + [PIPELINE_STAGES.VERIFYING]: [88, 92], + [PIPELINE_STAGES.RETENTION]: [92, 97], + [PIPELINE_STAGES.NOTIFICATIONS]: [97, 100], + [PIPELINE_STAGES.COMPLETED]: [100, 100], + [PIPELINE_STAGES.FAILED]: [100, 100], +}; + +/** + * Calculate global progress (0–100) from a stage and its internal progress (0–100). + * Example: stageProgress("Uploading", 50) → 76 (midpoint of 65..88) + */ +export function stageProgress(stage: PipelineStage, internalPercent: number): number { + const range = STAGE_PROGRESS_MAP[stage]; + if (!range) return 0; + const [min, max] = range; + const clamped = Math.max(0, Math.min(100, internalPercent)); + return Math.round(min + (max - min) * (clamped / 100)); +} diff --git a/src/lib/runner.ts b/src/lib/runner.ts index 7de1a515..ee2e9a3b 100644 --- a/src/lib/runner.ts +++ b/src/lib/runner.ts @@ -6,10 +6,11 @@ import { stepRetention } from "@/lib/runner/steps/05-retention"; import { stepCleanup, stepFinalize } from "@/lib/runner/steps/04-completion"; import prisma from "@/lib/prisma"; import { processQueue } from "@/lib/queue-manager"; -import { LogEntry, LogLevel, LogType } from "@/lib/core/logs"; +import { LogEntry, LogLevel, LogType, PipelineStage, PIPELINE_STAGES, stageProgress } from "@/lib/core/logs"; import { logger } from "@/lib/logger"; import { wrapError } from "@/lib/errors"; import { registerExecution, unregisterExecution } from "@/lib/execution-abort"; +import { formatDuration } from "@/lib/utils"; const log = logger.child({ module: "Runner" }); @@ -73,7 +74,9 @@ export async function performExecution(executionId: string, jobId: string) { let currentProgress = 0; let currentStage = "Initializing"; + let currentDetail = ""; let lastLogUpdate = 0; + const stageStartTimes = new Map(); // Declare ctx early let ctx = { @@ -95,7 +98,10 @@ export async function performExecution(executionId: string, jobId: string) { updateProgress: async (p: number, s?: string) => { if (s) currentStage = s; currentProgress = p; - } + }, + setStage: (_stage: PipelineStage) => {}, + updateDetail: (_detail: string) => {}, + updateStageProgress: (_percent: number) => {}, } as unknown as RunnerContext; // Parse logs and normalize to LogEntry[] @@ -138,7 +144,7 @@ export async function performExecution(executionId: string, jobId: string) { where: { id: id }, data: { logs: JSON.stringify(logs), - metadata: JSON.stringify({ progress: currentProgress, stage: currentStage }) + metadata: JSON.stringify({ progress: currentProgress, stage: currentStage, detail: currentDetail }) } }); } catch (error) { @@ -176,10 +182,50 @@ export async function performExecution(executionId: string, jobId: string) { const updateProgress = (percent: number, stage?: string) => { currentProgress = percent; if (stage) currentStage = stage; + currentDetail = ""; // Clear detail on legacy updateProgress calls if (ctx) ctx.metadata = { ...ctx.metadata, progress: currentProgress, stage: currentStage }; flushLogs(executionId); }; + /** Set the active pipeline stage. Automatically logs stage transition with duration. */ + const setStage = (stage: PipelineStage) => { + // Finalize previous stage with duration + const prevStart = stageStartTimes.get(currentStage); + if (prevStart && currentStage !== stage) { + const durationMs = Date.now() - prevStart; + const entry: LogEntry = { + timestamp: new Date().toISOString(), + level: "success", + type: "general", + message: `${currentStage} completed (${formatDuration(durationMs)})`, + stage: currentStage, + durationMs, + }; + logs.push(entry); + } + + currentStage = stage; + currentDetail = ""; + stageStartTimes.set(stage, Date.now()); + currentProgress = stageProgress(stage, 0); + if (ctx) ctx.metadata = { ...ctx.metadata, progress: currentProgress, stage: currentStage, detail: currentDetail }; + flushLogs(executionId); + }; + + /** Update the live detail text without changing the stage (e.g. "125.5 MB dumped...") */ + const updateDetail = (detail: string) => { + currentDetail = detail; + if (ctx) ctx.metadata = { ...ctx.metadata, detail: currentDetail }; + flushLogs(executionId); + }; + + /** Update internal progress within the current stage (0–100) → maps to global progress */ + const updateStageProgress = (internalPercent: number) => { + currentProgress = stageProgress(currentStage as PipelineStage, internalPercent); + if (ctx) ctx.metadata = { ...ctx.metadata, progress: currentProgress }; + flushLogs(executionId); + }; + // Create Context // We cast initialExe to any because Prisma types might mismatch RunnerContext expectation slightly, // but stepInitialize usually overwrites/fixes it. @@ -188,6 +234,9 @@ export async function performExecution(executionId: string, jobId: string) { logs, log: logEntry, updateProgress, + setStage, + updateDetail, + updateStageProgress, status: "Running", startedAt: new Date(), execution: initialExe as any, @@ -207,23 +256,24 @@ export async function performExecution(executionId: string, jobId: string) { // 1. Initialize (Loads Job Data, Adapters) // This will update ctx.job and refresh ctx.execution + setStage(PIPELINE_STAGES.INITIALIZING); await stepInitialize(ctx); checkCancelled(); - updateProgress(0, "Dumping Database"); // 2. Dump + setStage(PIPELINE_STAGES.DUMPING); await stepExecuteDump(ctx); checkCancelled(); - // 3. Upload (Stage will be set inside stepUpload to correctly distinguish processing/uploading) + // 3. Upload (stepUpload sets PROCESSING / UPLOADING / VERIFYING stages internally) await stepUpload(ctx); checkCancelled(); - updateProgress(90, "Applying Retention Policy"); // 4. Retention + setStage(PIPELINE_STAGES.RETENTION); await stepRetention(ctx); - updateProgress(100, "Completed"); + setStage(PIPELINE_STAGES.COMPLETED); // Upload step may have set status to "Partial" — preserve it if (ctx.status === "Running") { ctx.status = "Success"; diff --git a/src/lib/runner/steps/02-dump.ts b/src/lib/runner/steps/02-dump.ts index 1ec3b139..d34a3129 100644 --- a/src/lib/runner/steps/02-dump.ts +++ b/src/lib/runner/steps/02-dump.ts @@ -7,6 +7,7 @@ import { isMultiDbTar, readTarManifest } from "@/lib/adapters/database/common/ta import { logger } from "@/lib/logger"; import { wrapError } from "@/lib/errors"; import { getBackupFileExtension } from "@/lib/backup-extensions"; +import { formatBytes } from "@/lib/utils"; const log = logger.child({ step: "02-dump" }); @@ -182,14 +183,17 @@ export async function stepExecuteDump(ctx: RunnerContext) { }; // Start monitoring file size for progress updates + const dumpStart = Date.now(); const watcher = setInterval(async () => { // Check if file exists and get size try { - // Note: tempFile might change if adapter appends extension, but initially it starts here const stats = await fs.stat(tempFile).catch(() => null); if (stats && stats.size > 0) { + const elapsed = (Date.now() - dumpStart) / 1000; + const speed = elapsed > 0 ? Math.round(stats.size / elapsed) : 0; const sizeMB = (stats.size / 1024 / 1024).toFixed(2); - ctx.updateProgress(0, `Dumping Database (${sizeMB} MB...)`); + const speedStr = formatBytes(speed); + ctx.updateDetail(`${sizeMB} MB dumped – ${speedStr}/s`); } } catch {} }, 800); diff --git a/src/lib/runner/steps/03-upload.ts b/src/lib/runner/steps/03-upload.ts index 0d329d12..fa440971 100644 --- a/src/lib/runner/steps/03-upload.ts +++ b/src/lib/runner/steps/03-upload.ts @@ -11,6 +11,7 @@ import { ProgressMonitorStream } from "@/lib/streams/progress-monitor"; import { formatBytes } from "@/lib/utils"; import { calculateFileChecksum, verifyFileChecksum } from "@/lib/checksum"; import { getTempDir } from "@/lib/temp-dir"; +import { PIPELINE_STAGES } from "@/lib/core/logs"; export async function stepUpload(ctx: RunnerContext) { if (!ctx.job || ctx.destinations.length === 0 || !ctx.tempFile) throw new Error("Context not ready for upload"); @@ -25,9 +26,9 @@ export async function stepUpload(ctx: RunnerContext) { const processingLabel = actions.length > 0 ? actions.join(" & ") : "Processing"; if (actions.length > 0) { - ctx.updateProgress(0, processingLabel + "..."); + ctx.setStage(PIPELINE_STAGES.PROCESSING); } else { - ctx.updateProgress(0, "Uploading Backup..."); + ctx.setStage(PIPELINE_STAGES.UPLOADING); } // --- PIPELINE CONSTRUCTION (once, shared across all destinations) --- @@ -36,8 +37,9 @@ export async function stepUpload(ctx: RunnerContext) { const sourceStat = await fs.stat(ctx.tempFile); const sourceSize = sourceStat.size; - const progressMonitor = new ProgressMonitorStream(sourceSize, (processed, total, percent) => { - ctx.updateProgress(percent, `${processingLabel} (${formatBytes(processed)} / ${formatBytes(total)})`); + const progressMonitor = new ProgressMonitorStream(sourceSize, (processed, total, percent, speed) => { + ctx.updateDetail(`${processingLabel} (${formatBytes(processed)} / ${formatBytes(total)}) – ${formatBytes(speed)}/s`); + ctx.updateStageProgress(percent); }); // 1. Compression Step @@ -147,16 +149,29 @@ export async function stepUpload(ctx: RunnerContext) { const remotePath = `${job.name}/${path.basename(ctx.tempFile)}`; const totalDests = ctx.destinations.length; - ctx.updateProgress(0, "Uploading Backup..."); + ctx.setStage(PIPELINE_STAGES.UPLOADING); + + // Collect destinations that need post-upload integrity verification + const verifyQueue: { dest: typeof ctx.destinations[number]; destLabel: string }[] = []; for (let i = 0; i < totalDests; i++) { const dest = ctx.destinations[i]; const destLabel = `[${dest.configName}]`; + const uploadStart = Date.now(); const destProgress = (percent: number) => { // Distribute progress across destinations const basePercent = (i / totalDests) * 100; const slicePercent = (percent / totalDests); - ctx.updateProgress(Math.round(basePercent + slicePercent), `Uploading to ${dest.configName} (${percent}%)`); + const combinedPercent = Math.round(basePercent + slicePercent); + if (ctx.dumpSize && ctx.dumpSize > 0) { + const uploadedBytes = Math.round((percent / 100) * ctx.dumpSize); + const elapsed = (Date.now() - uploadStart) / 1000; + const speed = elapsed > 0 ? Math.round(uploadedBytes / elapsed) : 0; + ctx.updateDetail(`${dest.configName} — ${formatBytes(uploadedBytes)} / ${formatBytes(ctx.dumpSize)} – ${formatBytes(speed)}/s`); + } else { + ctx.updateDetail(`${dest.configName} (${percent}%)`); + } + ctx.updateStageProgress(combinedPercent); }; ctx.log(`${destLabel} Starting upload...`); @@ -188,25 +203,9 @@ export async function stepUpload(ctx: RunnerContext) { dest.uploadResult = { success: true, path: remotePath }; ctx.log(`${destLabel} Upload complete: ${remotePath}`); - // Post-upload verification for local storage + // Queue integrity verification for local storage if (dest.adapterId === "local-filesystem") { - try { - ctx.log(`${destLabel} Verifying upload integrity...`); - const verifyPath = path.join(getTempDir(), `verify_${Date.now()}_${path.basename(ctx.tempFile)}`); - const downloadOk = await dest.adapter.download(dest.config, remotePath, verifyPath); - if (downloadOk) { - const result = await verifyFileChecksum(verifyPath, checksum); - if (result.valid) { - ctx.log(`${destLabel} Integrity check passed ✓`); - } else { - ctx.log(`${destLabel} WARNING: Integrity check FAILED! Expected: ${result.expected}, Got: ${result.actual}`, 'warning'); - } - await fs.unlink(verifyPath).catch(() => {}); - } - } catch (e: unknown) { - const message = e instanceof Error ? e.message : String(e); - ctx.log(`${destLabel} Integrity verification skipped: ${message}`, 'warning'); - } + verifyQueue.push({ dest, destLabel }); } } catch (e: unknown) { @@ -219,6 +218,33 @@ export async function stepUpload(ctx: RunnerContext) { // Cleanup temp metadata file await fs.unlink(metaPath).catch(() => {}); + // --- POST-UPLOAD VERIFICATION --- + ctx.setStage(PIPELINE_STAGES.VERIFYING); + + if (verifyQueue.length > 0) { + for (const { dest, destLabel } of verifyQueue) { + try { + ctx.log(`${destLabel} Verifying upload integrity...`); + const verifyPath = path.join(getTempDir(), `verify_${Date.now()}_${path.basename(ctx.tempFile)}`); + const downloadOk = await dest.adapter.download(dest.config, remotePath, verifyPath); + if (downloadOk) { + const result = await verifyFileChecksum(verifyPath, checksum); + if (result.valid) { + ctx.log(`${destLabel} Integrity check passed ✓`, 'success'); + } else { + ctx.log(`${destLabel} WARNING: Integrity check FAILED! Expected: ${result.expected}, Got: ${result.actual}`, 'warning'); + } + await fs.unlink(verifyPath).catch(() => {}); + } + } catch (e: unknown) { + const message = e instanceof Error ? e.message : String(e); + ctx.log(`${destLabel} Integrity verification skipped: ${message}`, 'warning'); + } + } + } else { + ctx.log("No local destinations — skipping integrity verification"); + } + // --- EVALUATE RESULTS --- const successCount = ctx.destinations.filter(d => d.uploadResult?.success).length; const failCount = ctx.destinations.filter(d => d.uploadResult && !d.uploadResult.success).length; diff --git a/src/lib/runner/steps/04-completion.ts b/src/lib/runner/steps/04-completion.ts index dad08a40..937d08d6 100644 --- a/src/lib/runner/steps/04-completion.ts +++ b/src/lib/runner/steps/04-completion.ts @@ -8,6 +8,7 @@ import { logger } from "@/lib/logger"; import { wrapError, getErrorMessage } from "@/lib/errors"; import { renderTemplate, NOTIFICATION_EVENTS } from "@/lib/notifications"; import { recordNotificationLog } from "@/services/notification-log-service"; +import { PIPELINE_STAGES } from "@/lib/core/logs"; const log = logger.child({ step: "04-completion" }); @@ -79,6 +80,7 @@ export async function stepFinalize(ctx: RunnerContext) { if (!shouldNotify) { ctx.log(`Skipping notifications. Condition: ${condition}, Status: ${ctx.status}`); } else { + ctx.setStage(PIPELINE_STAGES.NOTIFICATIONS); ctx.log("Sending notifications..."); for (const channel of ctx.job.notifications) { diff --git a/src/lib/runner/types.ts b/src/lib/runner/types.ts index 3f3c2a2b..1eca9a6c 100644 --- a/src/lib/runner/types.ts +++ b/src/lib/runner/types.ts @@ -1,6 +1,6 @@ import { DatabaseAdapter, StorageAdapter } from "@/lib/core/interfaces"; import { Job, AdapterConfig, Execution, JobDestination } from "@prisma/client"; -import { LogEntry, LogLevel, LogType } from "@/lib/core/logs"; +import { LogEntry, LogLevel, LogType, PipelineStage } from "@/lib/core/logs"; import { RetentionConfiguration } from "@/lib/core/retention"; export type JobDestinationWithConfig = JobDestination & { @@ -38,6 +38,11 @@ export interface RunnerContext { log: (msg: string, level?: LogLevel, type?: LogType, details?: string) => void; updateProgress: (percent: number, stage?: string) => void; + // New structured stage API + setStage: (stage: PipelineStage) => void; + updateDetail: (detail: string) => void; + updateStageProgress: (internalPercent: number) => void; + sourceAdapter?: DatabaseAdapter; destinations: DestinationContext[]; diff --git a/src/lib/ssh/ssh-client.ts b/src/lib/ssh/ssh-client.ts index 04f29dac..8ffe1bf4 100644 --- a/src/lib/ssh/ssh-client.ts +++ b/src/lib/ssh/ssh-client.ts @@ -92,12 +92,19 @@ export class SshClient { * Upload a local file to the remote server via SFTP. * Uses SFTP protocol which guarantees data integrity (unlike piping through exec). */ - public uploadFile(localPath: string, remotePath: string): Promise { + public uploadFile(localPath: string, remotePath: string, onProgress?: (transferred: number, total: number) => void): Promise { return new Promise((resolve, reject) => { this.client.sftp((err, sftp) => { if (err) return reject(new Error(`SFTP session failed: ${err.message}`)); - sftp.fastPut(localPath, remotePath, (err) => { + const opts: Record = {}; + if (onProgress) { + opts.step = (totalTransferred: number, _chunk: number, total: number) => { + onProgress(totalTransferred, total); + }; + } + + sftp.fastPut(localPath, remotePath, opts, (err) => { sftp.end(); if (err) return reject(new Error(`SFTP upload failed: ${err.message}`)); resolve(); diff --git a/src/lib/streams/progress-monitor.ts b/src/lib/streams/progress-monitor.ts index b15bfe85..979b46a3 100644 --- a/src/lib/streams/progress-monitor.ts +++ b/src/lib/streams/progress-monitor.ts @@ -7,13 +7,14 @@ import { Transform, TransformCallback } from 'stream'; export class ProgressMonitorStream extends Transform { private processedBytes = 0; private totalBytes: number; - private onProgress: (processed: number, total: number, percent: number) => void; + private onProgress: (processed: number, total: number, percent: number, speed: number) => void; private lastUpdate = 0; private interval = 300; // ms throttle + private startTime = Date.now(); constructor( totalBytes: number, - onProgress: (processed: number, total: number, percent: number) => void + onProgress: (processed: number, total: number, percent: number, speed: number) => void ) { super(); this.totalBytes = totalBytes; @@ -42,6 +43,8 @@ export class ProgressMonitorStream extends Transform { const percent = this.totalBytes > 0 ? Math.round((this.processedBytes / this.totalBytes) * 100) : 0; - this.onProgress(this.processedBytes, this.totalBytes, percent); + const elapsed = (Date.now() - this.startTime) / 1000; + const speed = elapsed > 0 ? Math.round(this.processedBytes / elapsed) : 0; + this.onProgress(this.processedBytes, this.totalBytes, percent, speed); } } diff --git a/src/services/restore-service.ts b/src/services/restore-service.ts index 6392851e..1a0ccb00 100644 --- a/src/services/restore-service.ts +++ b/src/services/restore-service.ts @@ -3,12 +3,13 @@ import { registry } from "@/lib/core/registry"; import { registerAdapters } from "@/lib/adapters"; import { StorageAdapter, DatabaseAdapter, BackupMetadata } from "@/lib/core/interfaces"; import { decryptConfig } from "@/lib/crypto"; -import { compareVersions } from "@/lib/utils"; +import { compareVersions, formatDuration, formatBytes } from "@/lib/utils"; import { getTempDir } from "@/lib/temp-dir"; import path from "path"; import fs from "fs"; import { pipeline } from "stream/promises"; import { createReadStream, createWriteStream } from "fs"; +import { Transform } from "stream"; import { getProfileMasterKey, getEncryptionProfiles } from "@/services/encryption-service"; import { createDecryptionStream } from "@/lib/crypto-stream"; import { getDecompressionStream, CompressionType } from "@/lib/compression"; @@ -180,6 +181,9 @@ export class RestoreService { let lastLogUpdate = Date.now(); let currentProgress = 0; let currentStage = "Initializing"; + let currentDetail: string | null = null; + const stageStartTimes = new Map(); + stageStartTimes.set("Initializing", Date.now()); const flushLogs = async (force = false) => { const now = Date.now(); @@ -188,7 +192,7 @@ export class RestoreService { where: { id: executionId }, data: { logs: JSON.stringify(internalLogs), - metadata: JSON.stringify({ progress: currentProgress, stage: currentStage }) + metadata: JSON.stringify({ progress: currentProgress, stage: currentStage, detail: currentDetail }) } }).catch(() => {}); lastLogUpdate = now; @@ -208,9 +212,41 @@ export class RestoreService { flushLogs(level === 'error'); // Force flush on error }; - const updateProgress = (p: number, stage?: string) => { + const setStage = (stage: string) => { + // Log duration of previous stage + const prevStart = stageStartTimes.get(currentStage); + if (prevStart && currentStage !== stage) { + const durationMs = Date.now() - prevStart; + const isTerminal = stage === "Cancelled" || stage === "Failed"; + const durationEntry: LogEntry = { + timestamp: new Date().toISOString(), + message: isTerminal + ? `${currentStage} aborted (${formatDuration(durationMs)})` + : `${currentStage} completed (${formatDuration(durationMs)})`, + level: isTerminal ? 'warning' : 'success', + type: 'general', + stage: currentStage, + durationMs + }; + internalLogs.push(durationEntry); + } + + currentStage = stage; + currentDetail = null; + currentProgress = 0; + stageStartTimes.set(stage, Date.now()); + flushLogs(true); + }; + + const updateDetail = (detail: string) => { + currentDetail = detail; + flushLogs(); + }; + + const _updateProgress = (p: number, stage?: string) => { currentProgress = p; - if (stage) currentStage = stage; + if (stage && stage !== currentStage) setStage(stage); + else if (stage) currentStage = stage; flushLogs(); }; @@ -252,7 +288,7 @@ export class RestoreService { } // 3. Download File - updateProgress(5, "Downloading"); + setStage("Downloading"); log(`Downloading backup file: ${file}...`, 'info'); const tempDir = getTempDir(); tempFile = path.join(tempDir, path.basename(file)); @@ -330,13 +366,15 @@ export class RestoreService { // --- END METADATA CHECK --- + const downloadStartTime = Date.now(); const downloadSuccess = await storageAdapter.download(sConf, file, tempFile, (processed, total) => { const percent = total > 0 ? Math.round((processed / total) * 100) : 0; - // Only log sparingly or update progress - if (percent % 10 === 0 && percent > 0) { - // We don't want to spam logs + currentProgress = percent; + if (total > 0) { + const elapsed = (Date.now() - downloadStartTime) / 1000; + const speed = elapsed > 0 ? processed / elapsed : 0; + updateDetail(`${formatBytes(processed)} / ${formatBytes(total)} (${formatBytes(speed)}/s)`); } - updateProgress(Math.floor(percent / 2), "Downloading"); // Map download to 0-50% overall? Or just use stage "Downloading" }); if (!downloadSuccess) { @@ -369,7 +407,7 @@ export class RestoreService { // --- DECRYPTION EXECUTION --- if (isEncrypted && encryptionMeta) { - updateProgress(0, "Decrypting"); // Set stage to Decrypting immediately + setStage("Decrypting"); // Set stage to Decrypting immediately let masterKey: Buffer; @@ -480,8 +518,22 @@ export class RestoreService { decryptedTempFile = tempFile + ".dec"; } + const encFileSize = (await fs.promises.stat(tempFile)).size; + const decryptStart = Date.now(); + let decProcessed = 0; + const decryptTracker = new Transform({ + transform(chunk, _encoding, callback) { + decProcessed += chunk.length; + const elapsed = (Date.now() - decryptStart) / 1000; + const speed = elapsed > 0 ? Math.round(decProcessed / elapsed) : 0; + updateDetail(`${formatBytes(decProcessed)} / ${formatBytes(encFileSize)} – ${formatBytes(speed)}/s`); + callback(null, chunk); + } + }); + await pipeline( createReadStream(tempFile), + decryptTracker, decryptStream, createWriteStream(decryptedTempFile) ); @@ -505,7 +557,7 @@ export class RestoreService { if (compressionMeta && compressionMeta !== 'NONE') { try { log(`Decompressing backup (${compressionMeta})...`, 'info'); - updateProgress(0, "Decompressing"); + setStage("Decompressing"); const decompStream = getDecompressionStream(compressionMeta); if (decompStream) { @@ -517,8 +569,22 @@ export class RestoreService { unpackedFile = tempFile + ".unpacked"; } + const compFileSize = (await fs.promises.stat(tempFile)).size; + const decompStart = Date.now(); + let decompProcessed = 0; + const decompTracker = new Transform({ + transform(chunk, _encoding, callback) { + decompProcessed += chunk.length; + const elapsed = (Date.now() - decompStart) / 1000; + const speed = elapsed > 0 ? Math.round(decompProcessed / elapsed) : 0; + updateDetail(`${formatBytes(decompProcessed)} / ${formatBytes(compFileSize)} – ${formatBytes(speed)}/s`); + callback(null, chunk); + } + }); + await pipeline( createReadStream(tempFile), + decompTracker, decompStream, createWriteStream(unpackedFile) ); @@ -558,7 +624,7 @@ export class RestoreService { // --- END MULTI-DB TAR DETECTION --- // 4. Restore - updateProgress(0, "Restoring Database"); + setStage("Restoring Database"); log(`Starting database restore on ${sourceConfig.name}...`, 'info'); const dbConf = decryptConfig(JSON.parse(sourceConfig.config)); @@ -625,8 +691,10 @@ export class RestoreService { } log(msg, finalLevel, type, details); - }, (p) => { - updateProgress(p, "Restoring Database"); + }, (p, detail) => { + currentProgress = p; + currentDetail = detail || null; + flushLogs(); }); if (!restoreResult.success) { @@ -635,7 +703,7 @@ export class RestoreService { } log(`Restore adapter reported failure. Check logs above.`, 'error'); - updateProgress(100, "Failed"); + setStage("Failed"); await prisma.execution.update({ where: { id: executionId }, @@ -647,7 +715,7 @@ export class RestoreService { }); } else { log(`Restore completed successfully.`, 'success'); - updateProgress(100, "Completed"); + setStage("Completed"); await prisma.execution.update({ where: { id: executionId }, data: { @@ -677,8 +745,8 @@ export class RestoreService { // Distinguish cancellation from real failures if (abortController.signal.aborted) { svcLog.info("Restore cancelled by user", { executionId }); + setStage("Cancelled"); log("Restore was cancelled by user", 'warning'); - updateProgress(100, "Cancelled"); await prisma.execution.update({ where: { id: executionId }, @@ -686,8 +754,8 @@ export class RestoreService { }); } else { svcLog.error("Restore service error", {}, wrapError(error)); + setStage("Failed"); log(`Fatal Error: ${getErrorMessage(error)}`, 'error'); - updateProgress(100, "Failed"); await prisma.execution.update({ where: { id: executionId }, diff --git a/tests/unit/runner/multi-destination-upload.test.ts b/tests/unit/runner/multi-destination-upload.test.ts index e32c2e9f..401fca58 100644 --- a/tests/unit/runner/multi-destination-upload.test.ts +++ b/tests/unit/runner/multi-destination-upload.test.ts @@ -67,6 +67,9 @@ describe('Step 03 - Multi-Destination Upload', () => { logs: [], log: vi.fn(), updateProgress: vi.fn(), + setStage: vi.fn(), + updateDetail: vi.fn(), + updateStageProgress: vi.fn(), execution: { id: 'exec-1' } as any, tempFile, destinations: [], diff --git a/tests/unit/runner/notification-logic.test.ts b/tests/unit/runner/notification-logic.test.ts index b8a8b9b4..ed490ae6 100644 --- a/tests/unit/runner/notification-logic.test.ts +++ b/tests/unit/runner/notification-logic.test.ts @@ -55,6 +55,9 @@ describe('Runner Step: Finalize & Notifications', () => { logs: [], log: vi.fn(), updateProgress: vi.fn(), + setStage: vi.fn(), + updateDetail: vi.fn(), + updateStageProgress: vi.fn(), execution: { id: 'exec-1' } as any, destinations: [], job: { diff --git a/wiki/changelog.md b/wiki/changelog.md index baa770d7..c2e389f3 100644 --- a/wiki/changelog.md +++ b/wiki/changelog.md @@ -2,6 +2,34 @@ All notable changes to DBackup are documented here. +## v1.4.0 - Live History Redesign +*Released: March 31, 2026* + +### ✨ Features + +- **logging**: Pipeline stage system for backups (Queued → Initializing → Dumping → Processing → Uploading → Verifying → Retention → Notifications → Completed) and restores (Downloading → Decrypting → Decompressing → Restoring Database → Completed) with automatic progress calculation and duration tracking per stage +- **ui**: LogViewer redesign with pipeline stage grouping, duration badges, pending stage placeholders, and auto-expanding latest stage during execution +- **ui**: Real-time speed (MB/s) and byte progress display for all backup and restore operations — dump, compress, encrypt, upload, download, decrypt, decompress, and SFTP transfer + +### 🎨 Improvements + +- **logging**: MongoDB adapter now buffers stderr output and emits it as a single structured log entry instead of flooding the log with individual lines +- **logging**: SQLite adapter logs now use typed log levels for consistent display +- **storage**: Google Drive adapter now reports intermediate upload progress instead of only 100% at completion +- **storage**: Download progress tracking added to S3, SFTP, Google Drive, OneDrive, WebDAV, and FTP adapters for restore operations +- **restore**: MySQL/MariaDB SSH restore now shows SFTP upload progress with real-time byte tracking + +### 🐛 Bug Fixes + +- **storage**: Fixed local filesystem adapter logging "Preparing local destination" twice per upload + +### 🐳 Docker + +- **Image**: `skyfay/dbackup:v1.4.0` +- **Also tagged as**: `latest`, `v1` +- **Platforms**: linux/amd64, linux/arm64 + + ## v1.3.0 - SSH Remote Execution *Released: March 29, 2026* diff --git a/wiki/package.json b/wiki/package.json index 8d3b4c4f..817176fc 100644 --- a/wiki/package.json +++ b/wiki/package.json @@ -1,6 +1,6 @@ { "name": "dbackup-wiki", - "version": "1.3.0", + "version": "1.4.0", "private": true, "scripts": { "dev": "vitepress dev",