2025-04-24 13:11:28 +08:00

430 lines
12 KiB
JavaScript

import Queue from 'bull'
import config from 'config'
import commandLineArgs from 'command-line-args'
import logger from '@overleaf/logger'
import {
listPendingBackups,
listUninitializedBackups,
getBackupStatus,
} from '../lib/backup_store/index.js'
logger.initialize('backup-queue')
// Use the same redis config as backup_worker
const redisOptions = config.get('redis.queue')
// Create a Bull queue named 'backup'
const backupQueue = new Queue('backup', {
redis: redisOptions,
defaultJobOptions: {
removeOnComplete: true,
removeOnFail: true,
},
})
// Define command-line options
const optionDefinitions = [
{ name: 'clean', type: Boolean },
{ name: 'status', type: Boolean },
{
name: 'add',
type: String,
multiple: true,
description: 'Project IDs or date range in YYYY-MM-DD:YYYY-MM-DD format',
},
{ name: 'monitor', type: Boolean },
{
name: 'queue-pending',
type: Number,
description:
'Find projects with pending changes older than N seconds and add them to the queue',
},
{
name: 'show-pending',
type: Number,
description:
'Show count of pending projects older than N seconds without adding to queue',
},
{
name: 'limit',
type: Number,
description: 'Limit the number of jobs to be added',
},
{
name: 'interval',
type: Number,
description: 'Time in seconds to spread jobs over (default: 300)',
defaultValue: 300,
},
{
name: 'backoff-delay',
type: Number,
description:
'Backoff delay in milliseconds for failed jobs (default: 1000)',
defaultValue: 1000,
},
{
name: 'attempts',
type: Number,
description: 'Number of retry attempts for failed jobs (default: 3)',
defaultValue: 3,
},
{
name: 'warn-threshold',
type: Number,
description: 'Warn about any project exceeding this pending age',
defaultValue: 2 * 3600, // 2 hours
},
{
name: 'verbose',
alias: 'v',
type: Boolean,
description: 'Show detailed information when used with --show-pending',
},
]
// Parse command line arguments
const options = commandLineArgs(optionDefinitions)
const WARN_THRESHOLD = options['warn-threshold']
// Helper to validate date format
function isValidDateFormat(dateStr) {
return /^\d{4}-\d{2}-\d{2}$/.test(dateStr)
}
// Helper to validate the pending time parameter
function validatePendingTime(option, value) {
if (typeof value !== 'number' || value <= 0) {
console.error(
`Error: --${option} requires a positive numeric TIME argument in seconds`
)
console.error(`Example: --${option} 3600`)
process.exit(1)
}
return value
}
// Helper to format the pending time display
function formatPendingTime(timestamp) {
const now = new Date()
const diffMs = now - timestamp
const seconds = Math.floor(diffMs / 1000)
return `${timestamp.toISOString()} (${seconds} seconds ago)`
}
// Helper to add a job to the queue, checking for duplicates
async function addJobWithCheck(queue, data, options) {
const jobId = options.jobId
// Check if the job already exists
const existingJob = await queue.getJob(jobId)
if (existingJob) {
return { job: existingJob, added: false }
} else {
const job = await queue.add(data, options)
return { job, added: true }
}
}
// Setup queue event listeners
function setupMonitoring() {
console.log('Starting queue monitoring. Press Ctrl+C to exit.')
backupQueue.on('global:error', error => {
logger.info({ error }, 'Queue error')
})
backupQueue.on('global:waiting', jobId => {
logger.info({ jobId }, 'job is waiting')
})
backupQueue.on('global:active', jobId => {
logger.info({ jobId }, 'job is now active')
})
backupQueue.on('global:stalled', jobId => {
logger.info({ jobId }, 'job has stalled')
})
backupQueue.on('global:progress', (jobId, progress) => {
logger.info({ jobId, progress }, 'job progress')
})
backupQueue.on('global:completed', (jobId, result) => {
logger.info({ jobId, result }, 'job completed')
})
backupQueue.on('global:failed', (jobId, err) => {
logger.info({ jobId, err }, 'job failed')
})
backupQueue.on('global:paused', () => {
logger.info({}, 'Queue paused')
})
backupQueue.on('global:resumed', () => {
logger.info({}, 'Queue resumed')
})
backupQueue.on('global:cleaned', (jobs, type) => {
logger.info({ jobsCount: jobs.length, type }, 'Jobs cleaned')
})
backupQueue.on('global:drained', () => {
logger.info({}, 'Queue drained')
})
backupQueue.on('global:removed', jobId => {
logger.info({ jobId }, 'Job removed')
})
}
async function addDateRangeJob(input) {
const [startDate, endDate] = input.split(':')
if (!isValidDateFormat(startDate) || !isValidDateFormat(endDate)) {
console.error(
`Invalid date format for "${input}". Use YYYY-MM-DD:YYYY-MM-DD`
)
return
}
const jobId = `backup-${startDate}-to-${endDate}`
const { job, added } = await addJobWithCheck(
backupQueue,
{ startDate, endDate },
{ jobId }
)
console.log(
`${added ? 'Added' : 'Already exists'}: date range backup job: ${startDate} to ${endDate}, job ID: ${job.id}`
)
}
// Helper to list pending and uninitialized backups
// This function combines the two cursors into a single generator
// to yield projects from both lists
async function* pendingCursor(timeIntervalMs, limit) {
for await (const project of listPendingBackups(timeIntervalMs, limit)) {
yield project
}
for await (const project of listUninitializedBackups(timeIntervalMs, limit)) {
yield project
}
}
// Process pending projects with changes older than the specified seconds
async function processPendingProjects(
age,
showOnly,
limit,
verbose,
jobInterval,
jobOpts = {}
) {
const timeIntervalMs = age * 1000
console.log(
`Finding projects with pending changes older than ${age} seconds${showOnly ? ' (count only)' : ''}`
)
let count = 0
let addedCount = 0
let existingCount = 0
// Pass the limit directly to MongoDB query for better performance
const changeTimes = []
for await (const project of pendingCursor(timeIntervalMs, limit)) {
const projectId = project._id.toHexString()
const pendingAt =
project.overleaf?.backup?.pendingChangeAt || project._id.getTimestamp()
if (pendingAt) {
changeTimes.push(pendingAt)
const pendingAge = Math.floor((Date.now() - pendingAt.getTime()) / 1000)
if (pendingAge > WARN_THRESHOLD) {
try {
const backupStatus = await getBackupStatus(projectId)
logger.warn(
{
projectId,
pendingAt,
pendingAge,
backupStatus,
warnThreshold: WARN_THRESHOLD,
},
`pending change exceeds rpo warning threshold`
)
} catch (err) {
logger.error(
{ projectId, pendingAt, pendingAge },
'Error getting backup status'
)
throw err
}
}
}
if (showOnly && verbose) {
console.log(
`Project: ${projectId} (pending since: ${formatPendingTime(pendingAt)})`
)
} else if (!showOnly) {
const delay = Math.floor(Math.random() * jobInterval * 1000) // add random delay to avoid all jobs running simultaneously
const { job, added } = await addJobWithCheck(
backupQueue,
{ projectId, pendingChangeAt: pendingAt.getTime() },
{ ...jobOpts, delay, jobId: projectId }
)
if (added) {
if (verbose) {
console.log(
`Added job for project: ${projectId}, job ID: ${job.id} (pending since: ${formatPendingTime(pendingAt)})`
)
}
addedCount++
} else {
if (verbose) {
console.log(
`Job already exists for project: ${projectId}, job ID: ${job.id} (pending since: ${formatPendingTime(pendingAt)})`
)
}
existingCount++
}
}
count++
if (count % 1000 === 0) {
console.log(
`Processed ${count} projects`,
showOnly ? '' : `(${addedCount} added, ${existingCount} existing)`
)
}
}
// Set oldestChange to undefined if there are no changes
const oldestChange =
changeTimes.length > 0
? changeTimes.reduce((min, time) => (time < min ? time : min))
: undefined
if (showOnly) {
console.log(
`Found ${count} projects with pending changes (not added to queue)`
)
} else {
console.log(`Found ${count} projects with pending changes:`)
console.log(` ${addedCount} jobs added to queue`)
console.log(` ${existingCount} jobs already existed in queue`)
if (oldestChange) {
console.log(` Oldest pending change: ${formatPendingTime(oldestChange)}`)
}
}
}
// Main execution block
async function run() {
const optionCount = [
options.clean,
options.status,
options.add,
options.monitor,
options['queue-pending'] !== undefined,
options['show-pending'] !== undefined,
].filter(Boolean).length
if (optionCount > 1) {
console.error('Only one option can be specified')
process.exit(1)
}
if (options.clean) {
const beforeCounts = await backupQueue.getJobCounts()
console.log('Current queue state:', JSON.stringify(beforeCounts))
console.log('Cleaning completed and failed jobs...')
await backupQueue.clean(1, 'completed')
await backupQueue.clean(1, 'failed')
const afterCounts = await backupQueue.getJobCounts()
console.log('Current queue state:', JSON.stringify(afterCounts))
console.log('Queue cleaned successfully')
} else if (options.status) {
const counts = await backupQueue.getJobCounts()
console.log('Current queue state:', JSON.stringify(counts))
} else if (options.add) {
const inputs = Array.isArray(options.add) ? options.add : [options.add]
for (const input of inputs) {
if (input.includes(':')) {
// Handle date range format
await addDateRangeJob(input)
} else {
// Handle project ID format
const { job, added } = await addJobWithCheck(
backupQueue,
{ projectId: input },
{ jobId: input }
)
console.log(
`${added ? 'Added' : 'Already exists'}: job for project: ${input}, job ID: ${job.id}`
)
}
}
} else if (options.monitor) {
setupMonitoring()
} else if (options['queue-pending'] !== undefined) {
const age = validatePendingTime('queue-pending', options['queue-pending'])
await processPendingProjects(
age,
false,
options.limit,
options.verbose,
options.interval,
{
attempts: options.attempts,
backoff: {
type: 'exponential',
delay: options['backoff-delay'],
},
}
)
} else if (options['show-pending'] !== undefined) {
const age = validatePendingTime('show-pending', options['show-pending'])
await processPendingProjects(age, true, options.limit, options.verbose)
} else {
console.log('Usage:')
console.log(' --clean Clean up completed and failed jobs')
console.log(' --status Show current job counts')
console.log(' --add [projectId] Add a job for the specified projectId')
console.log(
' --add [YYYY-MM-DD:YYYY-MM-DD] Add a job for the specified date range'
)
console.log(' --monitor Monitor queue events')
console.log(
' --queue-pending TIME Find projects with changes older than TIME seconds and add them to the queue'
)
console.log(
' --show-pending TIME Show count of pending projects older than TIME seconds'
)
console.log(' --limit N Limit the number of jobs to be added')
console.log(
' --interval TIME Time interval in seconds to spread jobs over'
)
console.log(
' --backoff-delay TIME Backoff delay in milliseconds for failed jobs (default: 1000)'
)
console.log(
' --attempts N Number of retry attempts for failed jobs (default: 3)'
)
console.log(
' --verbose, -v Show detailed information when used with --show-pending'
)
}
}
// Run and handle errors
run()
.catch(err => {
console.error('Error:', err)
process.exit(1)
})
.then(result => {
// Only exit if not in monitor mode
if (!options.monitor) {
process.exit(0)
}
})