430 lines
12 KiB
JavaScript
430 lines
12 KiB
JavaScript
import Queue from 'bull'
|
|
import config from 'config'
|
|
import commandLineArgs from 'command-line-args'
|
|
import logger from '@overleaf/logger'
|
|
import {
|
|
listPendingBackups,
|
|
listUninitializedBackups,
|
|
getBackupStatus,
|
|
} from '../lib/backup_store/index.js'
|
|
|
|
logger.initialize('backup-queue')
|
|
|
|
// Use the same redis config as backup_worker
|
|
const redisOptions = config.get('redis.queue')
|
|
|
|
// Create a Bull queue named 'backup'
|
|
const backupQueue = new Queue('backup', {
|
|
redis: redisOptions,
|
|
defaultJobOptions: {
|
|
removeOnComplete: true,
|
|
removeOnFail: true,
|
|
},
|
|
})
|
|
|
|
// Define command-line options
|
|
const optionDefinitions = [
|
|
{ name: 'clean', type: Boolean },
|
|
{ name: 'status', type: Boolean },
|
|
{
|
|
name: 'add',
|
|
type: String,
|
|
multiple: true,
|
|
description: 'Project IDs or date range in YYYY-MM-DD:YYYY-MM-DD format',
|
|
},
|
|
{ name: 'monitor', type: Boolean },
|
|
{
|
|
name: 'queue-pending',
|
|
type: Number,
|
|
description:
|
|
'Find projects with pending changes older than N seconds and add them to the queue',
|
|
},
|
|
{
|
|
name: 'show-pending',
|
|
type: Number,
|
|
description:
|
|
'Show count of pending projects older than N seconds without adding to queue',
|
|
},
|
|
{
|
|
name: 'limit',
|
|
type: Number,
|
|
description: 'Limit the number of jobs to be added',
|
|
},
|
|
{
|
|
name: 'interval',
|
|
type: Number,
|
|
description: 'Time in seconds to spread jobs over (default: 300)',
|
|
defaultValue: 300,
|
|
},
|
|
{
|
|
name: 'backoff-delay',
|
|
type: Number,
|
|
description:
|
|
'Backoff delay in milliseconds for failed jobs (default: 1000)',
|
|
defaultValue: 1000,
|
|
},
|
|
{
|
|
name: 'attempts',
|
|
type: Number,
|
|
description: 'Number of retry attempts for failed jobs (default: 3)',
|
|
defaultValue: 3,
|
|
},
|
|
{
|
|
name: 'warn-threshold',
|
|
type: Number,
|
|
description: 'Warn about any project exceeding this pending age',
|
|
defaultValue: 2 * 3600, // 2 hours
|
|
},
|
|
{
|
|
name: 'verbose',
|
|
alias: 'v',
|
|
type: Boolean,
|
|
description: 'Show detailed information when used with --show-pending',
|
|
},
|
|
]
|
|
|
|
// Parse command line arguments
|
|
const options = commandLineArgs(optionDefinitions)
|
|
const WARN_THRESHOLD = options['warn-threshold']
|
|
|
|
// Helper to validate date format
|
|
function isValidDateFormat(dateStr) {
|
|
return /^\d{4}-\d{2}-\d{2}$/.test(dateStr)
|
|
}
|
|
|
|
// Helper to validate the pending time parameter
|
|
function validatePendingTime(option, value) {
|
|
if (typeof value !== 'number' || value <= 0) {
|
|
console.error(
|
|
`Error: --${option} requires a positive numeric TIME argument in seconds`
|
|
)
|
|
console.error(`Example: --${option} 3600`)
|
|
process.exit(1)
|
|
}
|
|
return value
|
|
}
|
|
|
|
// Helper to format the pending time display
|
|
function formatPendingTime(timestamp) {
|
|
const now = new Date()
|
|
const diffMs = now - timestamp
|
|
const seconds = Math.floor(diffMs / 1000)
|
|
return `${timestamp.toISOString()} (${seconds} seconds ago)`
|
|
}
|
|
|
|
// Helper to add a job to the queue, checking for duplicates
|
|
async function addJobWithCheck(queue, data, options) {
|
|
const jobId = options.jobId
|
|
|
|
// Check if the job already exists
|
|
const existingJob = await queue.getJob(jobId)
|
|
|
|
if (existingJob) {
|
|
return { job: existingJob, added: false }
|
|
} else {
|
|
const job = await queue.add(data, options)
|
|
return { job, added: true }
|
|
}
|
|
}
|
|
|
|
// Setup queue event listeners
|
|
function setupMonitoring() {
|
|
console.log('Starting queue monitoring. Press Ctrl+C to exit.')
|
|
|
|
backupQueue.on('global:error', error => {
|
|
logger.info({ error }, 'Queue error')
|
|
})
|
|
|
|
backupQueue.on('global:waiting', jobId => {
|
|
logger.info({ jobId }, 'job is waiting')
|
|
})
|
|
|
|
backupQueue.on('global:active', jobId => {
|
|
logger.info({ jobId }, 'job is now active')
|
|
})
|
|
|
|
backupQueue.on('global:stalled', jobId => {
|
|
logger.info({ jobId }, 'job has stalled')
|
|
})
|
|
|
|
backupQueue.on('global:progress', (jobId, progress) => {
|
|
logger.info({ jobId, progress }, 'job progress')
|
|
})
|
|
|
|
backupQueue.on('global:completed', (jobId, result) => {
|
|
logger.info({ jobId, result }, 'job completed')
|
|
})
|
|
|
|
backupQueue.on('global:failed', (jobId, err) => {
|
|
logger.info({ jobId, err }, 'job failed')
|
|
})
|
|
|
|
backupQueue.on('global:paused', () => {
|
|
logger.info({}, 'Queue paused')
|
|
})
|
|
|
|
backupQueue.on('global:resumed', () => {
|
|
logger.info({}, 'Queue resumed')
|
|
})
|
|
|
|
backupQueue.on('global:cleaned', (jobs, type) => {
|
|
logger.info({ jobsCount: jobs.length, type }, 'Jobs cleaned')
|
|
})
|
|
|
|
backupQueue.on('global:drained', () => {
|
|
logger.info({}, 'Queue drained')
|
|
})
|
|
|
|
backupQueue.on('global:removed', jobId => {
|
|
logger.info({ jobId }, 'Job removed')
|
|
})
|
|
}
|
|
|
|
async function addDateRangeJob(input) {
|
|
const [startDate, endDate] = input.split(':')
|
|
if (!isValidDateFormat(startDate) || !isValidDateFormat(endDate)) {
|
|
console.error(
|
|
`Invalid date format for "${input}". Use YYYY-MM-DD:YYYY-MM-DD`
|
|
)
|
|
return
|
|
}
|
|
|
|
const jobId = `backup-${startDate}-to-${endDate}`
|
|
const { job, added } = await addJobWithCheck(
|
|
backupQueue,
|
|
{ startDate, endDate },
|
|
{ jobId }
|
|
)
|
|
|
|
console.log(
|
|
`${added ? 'Added' : 'Already exists'}: date range backup job: ${startDate} to ${endDate}, job ID: ${job.id}`
|
|
)
|
|
}
|
|
|
|
// Helper to list pending and uninitialized backups
|
|
// This function combines the two cursors into a single generator
|
|
// to yield projects from both lists
|
|
async function* pendingCursor(timeIntervalMs, limit) {
|
|
for await (const project of listPendingBackups(timeIntervalMs, limit)) {
|
|
yield project
|
|
}
|
|
for await (const project of listUninitializedBackups(timeIntervalMs, limit)) {
|
|
yield project
|
|
}
|
|
}
|
|
|
|
// Process pending projects with changes older than the specified seconds
|
|
async function processPendingProjects(
|
|
age,
|
|
showOnly,
|
|
limit,
|
|
verbose,
|
|
jobInterval,
|
|
jobOpts = {}
|
|
) {
|
|
const timeIntervalMs = age * 1000
|
|
console.log(
|
|
`Finding projects with pending changes older than ${age} seconds${showOnly ? ' (count only)' : ''}`
|
|
)
|
|
|
|
let count = 0
|
|
let addedCount = 0
|
|
let existingCount = 0
|
|
// Pass the limit directly to MongoDB query for better performance
|
|
const changeTimes = []
|
|
for await (const project of pendingCursor(timeIntervalMs, limit)) {
|
|
const projectId = project._id.toHexString()
|
|
const pendingAt =
|
|
project.overleaf?.backup?.pendingChangeAt || project._id.getTimestamp()
|
|
if (pendingAt) {
|
|
changeTimes.push(pendingAt)
|
|
const pendingAge = Math.floor((Date.now() - pendingAt.getTime()) / 1000)
|
|
if (pendingAge > WARN_THRESHOLD) {
|
|
try {
|
|
const backupStatus = await getBackupStatus(projectId)
|
|
logger.warn(
|
|
{
|
|
projectId,
|
|
pendingAt,
|
|
pendingAge,
|
|
backupStatus,
|
|
warnThreshold: WARN_THRESHOLD,
|
|
},
|
|
`pending change exceeds rpo warning threshold`
|
|
)
|
|
} catch (err) {
|
|
logger.error(
|
|
{ projectId, pendingAt, pendingAge },
|
|
'Error getting backup status'
|
|
)
|
|
throw err
|
|
}
|
|
}
|
|
}
|
|
if (showOnly && verbose) {
|
|
console.log(
|
|
`Project: ${projectId} (pending since: ${formatPendingTime(pendingAt)})`
|
|
)
|
|
} else if (!showOnly) {
|
|
const delay = Math.floor(Math.random() * jobInterval * 1000) // add random delay to avoid all jobs running simultaneously
|
|
const { job, added } = await addJobWithCheck(
|
|
backupQueue,
|
|
{ projectId, pendingChangeAt: pendingAt.getTime() },
|
|
{ ...jobOpts, delay, jobId: projectId }
|
|
)
|
|
|
|
if (added) {
|
|
if (verbose) {
|
|
console.log(
|
|
`Added job for project: ${projectId}, job ID: ${job.id} (pending since: ${formatPendingTime(pendingAt)})`
|
|
)
|
|
}
|
|
addedCount++
|
|
} else {
|
|
if (verbose) {
|
|
console.log(
|
|
`Job already exists for project: ${projectId}, job ID: ${job.id} (pending since: ${formatPendingTime(pendingAt)})`
|
|
)
|
|
}
|
|
existingCount++
|
|
}
|
|
}
|
|
|
|
count++
|
|
if (count % 1000 === 0) {
|
|
console.log(
|
|
`Processed ${count} projects`,
|
|
showOnly ? '' : `(${addedCount} added, ${existingCount} existing)`
|
|
)
|
|
}
|
|
}
|
|
// Set oldestChange to undefined if there are no changes
|
|
const oldestChange =
|
|
changeTimes.length > 0
|
|
? changeTimes.reduce((min, time) => (time < min ? time : min))
|
|
: undefined
|
|
|
|
if (showOnly) {
|
|
console.log(
|
|
`Found ${count} projects with pending changes (not added to queue)`
|
|
)
|
|
} else {
|
|
console.log(`Found ${count} projects with pending changes:`)
|
|
console.log(` ${addedCount} jobs added to queue`)
|
|
console.log(` ${existingCount} jobs already existed in queue`)
|
|
if (oldestChange) {
|
|
console.log(` Oldest pending change: ${formatPendingTime(oldestChange)}`)
|
|
}
|
|
}
|
|
}
|
|
|
|
// Main execution block
|
|
async function run() {
|
|
const optionCount = [
|
|
options.clean,
|
|
options.status,
|
|
options.add,
|
|
options.monitor,
|
|
options['queue-pending'] !== undefined,
|
|
options['show-pending'] !== undefined,
|
|
].filter(Boolean).length
|
|
if (optionCount > 1) {
|
|
console.error('Only one option can be specified')
|
|
process.exit(1)
|
|
}
|
|
|
|
if (options.clean) {
|
|
const beforeCounts = await backupQueue.getJobCounts()
|
|
console.log('Current queue state:', JSON.stringify(beforeCounts))
|
|
console.log('Cleaning completed and failed jobs...')
|
|
await backupQueue.clean(1, 'completed')
|
|
await backupQueue.clean(1, 'failed')
|
|
const afterCounts = await backupQueue.getJobCounts()
|
|
console.log('Current queue state:', JSON.stringify(afterCounts))
|
|
console.log('Queue cleaned successfully')
|
|
} else if (options.status) {
|
|
const counts = await backupQueue.getJobCounts()
|
|
console.log('Current queue state:', JSON.stringify(counts))
|
|
} else if (options.add) {
|
|
const inputs = Array.isArray(options.add) ? options.add : [options.add]
|
|
for (const input of inputs) {
|
|
if (input.includes(':')) {
|
|
// Handle date range format
|
|
await addDateRangeJob(input)
|
|
} else {
|
|
// Handle project ID format
|
|
const { job, added } = await addJobWithCheck(
|
|
backupQueue,
|
|
{ projectId: input },
|
|
{ jobId: input }
|
|
)
|
|
console.log(
|
|
`${added ? 'Added' : 'Already exists'}: job for project: ${input}, job ID: ${job.id}`
|
|
)
|
|
}
|
|
}
|
|
} else if (options.monitor) {
|
|
setupMonitoring()
|
|
} else if (options['queue-pending'] !== undefined) {
|
|
const age = validatePendingTime('queue-pending', options['queue-pending'])
|
|
await processPendingProjects(
|
|
age,
|
|
false,
|
|
options.limit,
|
|
options.verbose,
|
|
options.interval,
|
|
{
|
|
attempts: options.attempts,
|
|
backoff: {
|
|
type: 'exponential',
|
|
delay: options['backoff-delay'],
|
|
},
|
|
}
|
|
)
|
|
} else if (options['show-pending'] !== undefined) {
|
|
const age = validatePendingTime('show-pending', options['show-pending'])
|
|
await processPendingProjects(age, true, options.limit, options.verbose)
|
|
} else {
|
|
console.log('Usage:')
|
|
console.log(' --clean Clean up completed and failed jobs')
|
|
console.log(' --status Show current job counts')
|
|
console.log(' --add [projectId] Add a job for the specified projectId')
|
|
console.log(
|
|
' --add [YYYY-MM-DD:YYYY-MM-DD] Add a job for the specified date range'
|
|
)
|
|
console.log(' --monitor Monitor queue events')
|
|
console.log(
|
|
' --queue-pending TIME Find projects with changes older than TIME seconds and add them to the queue'
|
|
)
|
|
console.log(
|
|
' --show-pending TIME Show count of pending projects older than TIME seconds'
|
|
)
|
|
console.log(' --limit N Limit the number of jobs to be added')
|
|
console.log(
|
|
' --interval TIME Time interval in seconds to spread jobs over'
|
|
)
|
|
console.log(
|
|
' --backoff-delay TIME Backoff delay in milliseconds for failed jobs (default: 1000)'
|
|
)
|
|
console.log(
|
|
' --attempts N Number of retry attempts for failed jobs (default: 3)'
|
|
)
|
|
console.log(
|
|
' --verbose, -v Show detailed information when used with --show-pending'
|
|
)
|
|
}
|
|
}
|
|
|
|
// Run and handle errors
|
|
run()
|
|
.catch(err => {
|
|
console.error('Error:', err)
|
|
process.exit(1)
|
|
})
|
|
.then(result => {
|
|
// Only exit if not in monitor mode
|
|
if (!options.monitor) {
|
|
process.exit(0)
|
|
}
|
|
})
|