first commit

This commit is contained in:
2025-04-24 13:11:28 +08:00
commit ff9c54d5e4
5960 changed files with 834111 additions and 0 deletions

View File

@@ -0,0 +1,76 @@
'use strict'
const OError = require('@overleaf/o-error')
const check = require('check-types')
const { Blob } = require('overleaf-editor-core')
const assert = check.assert
const MONGO_ID_REGEXP = /^[0-9a-f]{24}$/
const POSTGRES_ID_REGEXP = /^[1-9][0-9]{0,9}$/
const MONGO_OR_POSTGRES_ID_REGEXP = /^([0-9a-f]{24}|[1-9][0-9]{0,9})$/
function transaction(transaction, message) {
assert.function(transaction, message)
}
function blobHash(arg, message) {
try {
assert.match(arg, Blob.HEX_HASH_RX, message)
} catch (error) {
throw OError.tag(error, message, { arg })
}
}
/**
* A project id is a string that contains either an integer (for projects stored in Postgres) or 24
* hex digits (for projects stored in Mongo)
*/
function projectId(arg, message) {
try {
assert.match(arg, MONGO_OR_POSTGRES_ID_REGEXP, message)
} catch (error) {
throw OError.tag(error, message, { arg })
}
}
/**
* A chunk id is a string that contains either an integer (for projects stored in Postgres) or 24
* hex digits (for projects stored in Mongo)
*/
function chunkId(arg, message) {
try {
assert.match(arg, MONGO_OR_POSTGRES_ID_REGEXP, message)
} catch (error) {
throw OError.tag(error, message, { arg })
}
}
function mongoId(arg, message) {
try {
assert.match(arg, MONGO_ID_REGEXP, message)
} catch (error) {
throw OError.tag(error, message, { arg })
}
}
function postgresId(arg, message) {
try {
assert.match(arg, POSTGRES_ID_REGEXP, message)
} catch (error) {
throw OError.tag(error, message, { arg })
}
}
module.exports = {
...assert,
transaction,
blobHash,
projectId,
chunkId,
mongoId,
postgresId,
MONGO_ID_REGEXP,
POSTGRES_ID_REGEXP,
}

View File

@@ -0,0 +1,251 @@
// @ts-check
import { backupPersistor, projectBlobsBucket } from './backupPersistor.mjs'
import { GLOBAL_BLOBS, makeProjectKey, BlobStore } from './blob_store/index.js'
import Stream from 'node:stream'
import fs from 'node:fs'
import Crypto from 'node:crypto'
import assert from './assert.js'
import { backedUpBlobs, projects } from './mongodb.js'
import { Binary, ObjectId } from 'mongodb'
import logger from '@overleaf/logger/logging-manager.js'
import { AlreadyWrittenError } from '@overleaf/object-persistor/src/Errors.js'
import metrics from '@overleaf/metrics'
import zLib from 'node:zlib'
import Path from 'node:path'
const HIGHWATER_MARK = 1024 * 1024
/**
* @typedef {import("overleaf-editor-core").Blob} Blob
*/
/**
* @typedef {import("@overleaf/object-persistor/src/PerProjectEncryptedS3Persistor").CachedPerProjectEncryptedS3Persistor} CachedPerProjectEncryptedS3Persistor
*/
/**
* Increment a metric to record the outcome of a backup operation.
*
* @param {"success"|"failure"|"skipped"} status
* @param {"global"|"already_backed_up"|"none"} reason
*/
function recordBackupConclusion(status, reason = 'none') {
metrics.inc('blob_backed_up', 1, { status, reason })
}
/**
* Downloads a blob to a specified directory
*
* @param {string} historyId - The history ID of the project the blob belongs to
* @param {Blob} blob - The blob to download
* @param {string} tmpDir - The directory path where the blob will be downloaded
* @returns {Promise<string>} The full path where the blob was downloaded
*/
export async function downloadBlobToDir(historyId, blob, tmpDir) {
const blobStore = new BlobStore(historyId)
const blobHash = blob.getHash()
const src = await blobStore.getStream(blobHash)
const filePath = Path.join(tmpDir, `${historyId}-${blobHash}`)
try {
const dst = fs.createWriteStream(filePath, {
highWaterMark: HIGHWATER_MARK,
flags: 'wx',
})
await Stream.promises.pipeline(src, dst)
return filePath
} catch (error) {
try {
await fs.promises.unlink(filePath)
} catch {}
throw error
}
}
/**
* Performs the actual upload of the blob to the backup storage.
*
* @param {string} historyId - The history ID of the project the blob belongs to
* @param {Blob} blob - The blob being uploaded
* @param {string} path - The path to the file to upload (should have been stored on disk already)
* @return {Promise<void>}
*/
export async function uploadBlobToBackup(historyId, blob, path, persistor) {
const md5 = Crypto.createHash('md5')
const filePathCompressed = path + '.gz'
let backupSource
let contentEncoding
let size
try {
if (blob.getStringLength()) {
backupSource = filePathCompressed
contentEncoding = 'gzip'
size = 0
await Stream.promises.pipeline(
fs.createReadStream(path, { highWaterMark: HIGHWATER_MARK }),
zLib.createGzip(),
async function* (source) {
for await (const chunk of source) {
size += chunk.byteLength
md5.update(chunk)
yield chunk
}
},
fs.createWriteStream(filePathCompressed, {
highWaterMark: HIGHWATER_MARK,
})
)
} else {
backupSource = path
size = blob.getByteLength()
await Stream.promises.pipeline(
fs.createReadStream(path, { highWaterMark: HIGHWATER_MARK }),
md5
)
}
const key = makeProjectKey(historyId, blob.getHash())
await persistor.sendStream(
projectBlobsBucket,
key,
fs.createReadStream(backupSource, { highWaterMark: HIGHWATER_MARK }),
{
contentEncoding,
contentType: 'application/octet-stream',
contentLength: size,
sourceMd5: md5.digest('hex'),
ifNoneMatch: '*',
}
)
} finally {
if (backupSource === filePathCompressed) {
try {
await fs.promises.rm(filePathCompressed, { force: true })
} catch {}
}
}
}
/**
* Converts a legacy (postgres) historyId to a mongo projectId
*
* @param {string} historyId
* @return {Promise<string>}
* @private
*/
async function _convertLegacyHistoryIdToProjectId(historyId) {
const project = await projects.findOne(
{ 'overleaf.history.id': parseInt(historyId) },
{ projection: { _id: 1 } }
)
if (!project?._id) {
throw new Error('Did not find project for history id')
}
return project?._id?.toString()
}
/**
* Records that a blob was backed up for a project.
*
* @param {string} projectId - projectId for a project (mongo format)
* @param {string} hash
* @return {Promise<void>}
*/
export async function storeBlobBackup(projectId, hash) {
await backedUpBlobs.updateOne(
{ _id: new ObjectId(projectId) },
{ $addToSet: { blobs: new Binary(Buffer.from(hash, 'hex')) } },
{ upsert: true }
)
}
/**
* Determine whether a specific blob has been backed up in this project.
*
* @param {string} projectId
* @param {string} hash
* @return {Promise<*>}
* @private
*/
export async function _blobIsBackedUp(projectId, hash) {
const blobs = await backedUpBlobs.findOne(
{
_id: new ObjectId(projectId),
blobs: new Binary(Buffer.from(hash, 'hex')),
},
{ projection: { _id: 1 } }
)
return blobs?._id
}
/**
* Back up a blob to the global storage and record that it was backed up.
*
* @param {string} historyId - history ID for a project (can be postgres format or mongo format)
* @param {Blob} blob - The blob that is being backed up
* @param {string} tmpPath - The path to a temporary file storing the contents of the blob.
* @param {CachedPerProjectEncryptedS3Persistor} [persistor] - The persistor to use (optional)
* @return {Promise<void>}
*/
export async function backupBlob(historyId, blob, tmpPath, persistor) {
const hash = blob.getHash()
let projectId = historyId
if (assert.POSTGRES_ID_REGEXP.test(historyId)) {
projectId = await _convertLegacyHistoryIdToProjectId(historyId)
}
const globalBlob = GLOBAL_BLOBS.get(hash)
if (globalBlob && !globalBlob.demoted) {
recordBackupConclusion('skipped', 'global')
logger.debug({ projectId, hash }, 'Blob is global - skipping backup')
return
}
try {
if (await _blobIsBackedUp(projectId, hash)) {
recordBackupConclusion('skipped', 'already_backed_up')
logger.debug(
{ projectId, hash },
'Blob already backed up - skipping backup'
)
return
}
} catch (error) {
logger.warn({ error }, 'Failed to check if blob is backed up')
// We'll try anyway - we'll catch the error if it was backed up
}
// If we weren't passed a persistor for this project, create one.
// This will fetch the key from AWS, so it's prefereable to use
// the same persistor for all blobs in a project where possible.
if (!persistor) {
logger.debug(
{ historyId, hash },
'warning: persistor not passed to backupBlob'
)
}
persistor ??= await backupPersistor.forProject(
projectBlobsBucket,
makeProjectKey(historyId, '')
)
try {
logger.debug({ projectId, hash }, 'Starting blob backup')
await uploadBlobToBackup(historyId, blob, tmpPath, persistor)
await storeBlobBackup(projectId, hash)
recordBackupConclusion('success')
} catch (error) {
if (error instanceof AlreadyWrittenError) {
logger.debug({ error, projectId, hash }, 'Blob already backed up')
// record that we backed it up already
await storeBlobBackup(projectId, hash)
recordBackupConclusion('failure', 'already_backed_up')
return
}
// eventually queue this for retry - for now this will be fixed by running the script
recordBackupConclusion('failure')
logger.warn({ error, projectId, hash }, 'Failed to upload blob to backup')
} finally {
logger.debug({ projectId, hash }, 'Ended blob backup')
}
}

View File

@@ -0,0 +1,93 @@
// @ts-check
import { callbackify } from 'util'
import { ObjectId } from 'mongodb'
import config from 'config'
import OError from '@overleaf/o-error'
import { db } from './mongodb.js'
import projectKey from './project_key.js'
import chunkStore from '../lib/chunk_store/index.js'
import {
backupPersistor,
chunksBucket,
projectBlobsBucket,
} from './backupPersistor.mjs'
const MS_PER_DAY = 24 * 60 * 60 * 1000
const EXPIRE_PROJECTS_AFTER_MS =
parseInt(config.get('minSoftDeletionPeriodDays'), 10) * MS_PER_DAY
const deletedProjectsCollection = db.collection('deletedProjects')
/**
* @param {string} historyId
* @return {Promise<boolean>}
*/
async function projectHasLatestChunk(historyId) {
const chunk = await chunkStore.getBackend(historyId).getLatestChunk(historyId)
return chunk != null
}
export class NotReadyToDelete extends OError {}
/**
* @param {string} projectId
* @return {Promise<void>}
*/
async function deleteProjectBackup(projectId) {
const deletedProject = await deletedProjectsCollection.findOne(
{ 'deleterData.deletedProjectId': new ObjectId(projectId) },
{
projection: {
'deleterData.deletedProjectOverleafHistoryId': 1,
'deleterData.deletedAt': 1,
},
}
)
if (!deletedProject) {
throw new NotReadyToDelete('refusing to delete non-deleted project')
}
const expiresAt =
deletedProject.deleterData.deletedAt.getTime() + EXPIRE_PROJECTS_AFTER_MS
if (expiresAt > Date.now()) {
throw new NotReadyToDelete('refusing to delete non-expired project')
}
const historyId =
deletedProject.deleterData.deletedProjectOverleafHistoryId?.toString()
if (!historyId) {
throw new NotReadyToDelete(
'refusing to delete project with unknown historyId'
)
}
if (await projectHasLatestChunk(historyId)) {
throw new NotReadyToDelete(
'refusing to delete project with remaining chunks'
)
}
const prefix = projectKey.format(historyId) + '/'
await backupPersistor.deleteDirectory(chunksBucket, prefix)
await backupPersistor.deleteDirectory(projectBlobsBucket, prefix)
}
export async function healthCheck() {
const HEALTH_CHECK_PROJECTS = JSON.parse(config.get('healthCheckProjects'))
if (HEALTH_CHECK_PROJECTS.length !== 2) {
throw new Error('expected 2 healthCheckProjects')
}
if (!HEALTH_CHECK_PROJECTS.some(id => id.length === 24)) {
throw new Error('expected mongo id in healthCheckProjects')
}
if (!HEALTH_CHECK_PROJECTS.some(id => id.length < 24)) {
throw new Error('expected postgres id in healthCheckProjects')
}
for (const historyId of HEALTH_CHECK_PROJECTS) {
if (!(await projectHasLatestChunk(historyId))) {
throw new Error(`project has no history: ${historyId}`)
}
}
}
export const healthCheckCb = callbackify(healthCheck)
export const deleteProjectBackupCb = callbackify(deleteProjectBackup)

View File

@@ -0,0 +1,152 @@
/**
* Provides a generator function to back up project chunks and blobs.
*/
import chunkStore from './chunk_store/index.js'
import {
GLOBAL_BLOBS, // NOTE: must call loadGlobalBlobs() before using this
BlobStore,
} from './blob_store/index.js'
import assert from './assert.js'
async function lookBehindForSeenBlobs(
projectId,
chunk,
lastBackedUpVersion,
seenBlobs
) {
if (chunk.startVersion === 0) {
return // this is the first chunk, no need to check for blobs in the previous chunk
}
if (chunk.startVersion > 0 && lastBackedUpVersion > chunk.startVersion) {
return // the snapshot in this chunk has already been backed up
}
if (
chunk.startVersion > 0 &&
lastBackedUpVersion === chunk.startVersion // same as previousChunk.endVersion
) {
// the snapshot in this chunk has not been backed up
// so we find the set of backed up blobs from the previous chunk
const previousChunk = await chunkStore.loadAtVersion(
projectId,
lastBackedUpVersion
)
const previousChunkHistory = previousChunk.getHistory()
previousChunkHistory.findBlobHashes(seenBlobs)
}
}
/**
* Records blob hashes that have been previously seen in a chunk's history.
*
* @param {Object} chunk - The chunk containing history data
* @param {number} currentBackedUpVersion - The version number that has been backed up
* @param {Set<string>} seenBlobs - Set to collect previously seen blob hashes
* @returns {void}
*/
function recordPreviouslySeenBlobs(chunk, currentBackedUpVersion, seenBlobs) {
// We need to look at the chunk and decide how far we have backed up.
// If we have not backed up this chunk at all, we need to backup the blobs
// in the snapshot. Otherwise we need to backup the blobs in the changes
// that have occurred since the last backup.
const history = chunk.getHistory()
const startVersion = chunk.getStartVersion()
if (currentBackedUpVersion === 0) {
// If we have only backed up version 0 (i.e. the first change)
// then that includes the initial snapshot, so we consider
// the blobs of the initial snapshot as seen. If the project
// has not been backed up at all then currentBackedUpVersion
// will be undefined.
history.snapshot.findBlobHashes(seenBlobs)
} else if (currentBackedUpVersion > startVersion) {
history.snapshot.findBlobHashes(seenBlobs)
for (let i = 0; i < currentBackedUpVersion - startVersion; i++) {
history.changes[i].findBlobHashes(seenBlobs)
}
}
}
/**
* Collects new blob objects that need to be backed up from a given chunk.
*
* @param {Object} chunk - The chunk object containing history data
* @param {Object} blobStore - Storage interface for retrieving blobs
* @param {Set<string>} seenBlobs - Set of blob hashes that have already been processed
* @returns {Promise<Object[]>} Array of blob objects that need to be backed up
* @throws {Error} If blob retrieval fails
*/
async function collectNewBlobsForBackup(chunk, blobStore, seenBlobs) {
/** @type {Set<string>} */
const blobHashes = new Set()
const history = chunk.getHistory()
// Get all the blobs in this chunk, then exclude the seenBlobs and global blobs
history.findBlobHashes(blobHashes)
const blobsToBackup = await blobStore.getBlobs(
[...blobHashes].filter(
hash =>
hash &&
!seenBlobs.has(hash) &&
(!GLOBAL_BLOBS.has(hash) || GLOBAL_BLOBS.get(hash).demoted)
)
)
return blobsToBackup
}
/**
* Asynchronously generates backups for a project based on provided versions.
* @param {string} projectId - The ID of the project's history to back up.
* @param {number} lastBackedUpVersion - The last version that was successfully backed up.
* @yields {AsyncGenerator<{ chunkRecord: object, chunkToBackup: object, chunkBuffer: Buffer, blobsToBackup: object[] }>}
* Yields chunk records and corresponding data needed for backups.
*/
export async function* backupGenerator(projectId, lastBackedUpVersion) {
assert.projectId(projectId, 'bad projectId')
assert.maybe.integer(lastBackedUpVersion, 'bad lastBackedUpVersion')
const blobStore = new BlobStore(projectId)
/** @type {Set<string>} */
const seenBlobs = new Set() // records the blobs that are already backed up
const firstPendingVersion =
lastBackedUpVersion >= 0 ? lastBackedUpVersion + 1 : 0
let isStartingChunk = true
let currentBackedUpVersion = lastBackedUpVersion
const chunkRecordIterator = chunkStore.getProjectChunksFromVersion(
projectId,
firstPendingVersion
)
for await (const chunkRecord of chunkRecordIterator) {
const { chunk, chunkBuffer } = await chunkStore.loadByChunkRecord(
projectId,
chunkRecord
)
if (isStartingChunk) {
await lookBehindForSeenBlobs(
projectId,
chunkRecord,
lastBackedUpVersion,
seenBlobs
)
isStartingChunk = false
}
recordPreviouslySeenBlobs(chunk, currentBackedUpVersion, seenBlobs)
const blobsToBackup = await collectNewBlobsForBackup(
chunk,
blobStore,
seenBlobs
)
yield { chunkRecord, chunkToBackup: chunk, chunkBuffer, blobsToBackup }
// After we generate a backup of this chunk, mark the backed up blobs as seen
blobsToBackup.forEach(blob => seenBlobs.add(blob.getHash()))
currentBackedUpVersion = chunkRecord.endVersion
}
}

View File

@@ -0,0 +1,121 @@
// @ts-check
import fs from 'node:fs'
import Path from 'node:path'
import _ from 'lodash'
import config from 'config'
import { SecretManagerServiceClient } from '@google-cloud/secret-manager'
import OError from '@overleaf/o-error'
import {
PerProjectEncryptedS3Persistor,
RootKeyEncryptionKey,
} from '@overleaf/object-persistor/src/PerProjectEncryptedS3Persistor.js'
import { HistoryStore } from './history_store.js'
const persistorConfig = _.cloneDeep(config.get('backupPersistor'))
const { chunksBucket, deksBucket, globalBlobsBucket, projectBlobsBucket } =
config.get('backupStore')
export { chunksBucket, globalBlobsBucket, projectBlobsBucket }
function convertKey(key, convertFn) {
if (_.has(persistorConfig, key)) {
_.update(persistorConfig, key, convertFn)
}
}
convertKey('s3SSEC.httpOptions.timeout', s => parseInt(s, 10))
convertKey('s3SSEC.maxRetries', s => parseInt(s, 10))
convertKey('s3SSEC.pathStyle', s => s === 'true')
// array of CA, either inlined or on disk
convertKey('s3SSEC.ca', s =>
JSON.parse(s).map(ca => (ca.startsWith('/') ? fs.readFileSync(ca) : ca))
)
/** @type {() => Promise<string>} */
let getRawRootKeyEncryptionKeys
if ((process.env.NODE_ENV || 'production') === 'production') {
;[persistorConfig.s3SSEC.key, persistorConfig.s3SSEC.secret] = (
await loadFromSecretsManager(
process.env.BACKUP_AWS_CREDENTIALS || '',
'BACKUP_AWS_CREDENTIALS'
)
).split(':')
getRawRootKeyEncryptionKeys = () =>
loadFromSecretsManager(
persistorConfig.keyEncryptionKeys,
'BACKUP_KEY_ENCRYPTION_KEYS'
)
} else {
getRawRootKeyEncryptionKeys = () => persistorConfig.keyEncryptionKeys
}
export const DELETION_ONLY = persistorConfig.keyEncryptionKeys === 'none'
if (DELETION_ONLY) {
// For Backup-deleter; should not encrypt or read data; deleting does not need key.
getRawRootKeyEncryptionKeys = () => new Promise(_resolve => {})
}
const PROJECT_FOLDER_REGEX =
/^\d{3}\/\d{3}\/\d{3,}\/|[0-9a-f]{3}\/[0-9a-f]{3}\/[0-9a-f]{18}\/$/
/**
* @param {string} bucketName
* @param {string} path
* @return {string}
*/
export function pathToProjectFolder(bucketName, path) {
switch (bucketName) {
case deksBucket:
case chunksBucket:
case projectBlobsBucket:
const projectFolder = Path.join(...path.split('/').slice(0, 3)) + '/'
if (!PROJECT_FOLDER_REGEX.test(projectFolder)) {
throw new OError('invalid project folder', { bucketName, path })
}
return projectFolder
default:
throw new Error(`${bucketName} does not store per-project files`)
}
}
/**
* @param {string} name
* @param {string} label
* @return {Promise<string>}
*/
async function loadFromSecretsManager(name, label) {
const client = new SecretManagerServiceClient()
const [version] = await client.accessSecretVersion({ name })
if (!version.payload?.data) throw new Error(`empty secret: ${label}`)
return version.payload.data.toString()
}
async function getRootKeyEncryptionKeys() {
return JSON.parse(await getRawRootKeyEncryptionKeys()).map(
({ key, salt }) => {
return new RootKeyEncryptionKey(
Buffer.from(key, 'base64'),
Buffer.from(salt, 'base64')
)
}
)
}
export const backupPersistor = new PerProjectEncryptedS3Persistor({
...persistorConfig.s3SSEC,
disableMultiPartUpload: true,
dataEncryptionKeyBucketName: deksBucket,
pathToProjectFolder,
getRootKeyEncryptionKeys,
storageClass: {
[deksBucket]: 'STANDARD',
[chunksBucket]: persistorConfig.tieringStorageClass,
[projectBlobsBucket]: persistorConfig.tieringStorageClass,
},
})
export const backupHistoryStore = new HistoryStore(
backupPersistor,
chunksBucket
)

View File

@@ -0,0 +1,216 @@
// @ts-check
import OError from '@overleaf/o-error'
import chunkStore from '../lib/chunk_store/index.js'
import {
backupPersistor,
chunksBucket,
projectBlobsBucket,
} from './backupPersistor.mjs'
import { Blob, Chunk, History } from 'overleaf-editor-core'
import { BlobStore, GLOBAL_BLOBS, makeProjectKey } from './blob_store/index.js'
import blobHash from './blob_hash.js'
import { NotFoundError } from '@overleaf/object-persistor/src/Errors.js'
import logger from '@overleaf/logger'
import path from 'node:path'
import projectKey from './project_key.js'
import streams from './streams.js'
import objectPersistor from '@overleaf/object-persistor'
import { getEndDateForRPO } from '../../backupVerifier/utils.mjs'
/**
* @typedef {import("@overleaf/object-persistor/src/PerProjectEncryptedS3Persistor.js").CachedPerProjectEncryptedS3Persistor} CachedPerProjectEncryptedS3Persistor
*/
/**
* @param {string} historyId
* @param {string} hash
*/
export async function verifyBlob(historyId, hash) {
return await verifyBlobs(historyId, [hash])
}
/**
*
* @param {string} historyId
* @return {Promise<CachedPerProjectEncryptedS3Persistor>}
*/
async function getProjectPersistor(historyId) {
try {
return await backupPersistor.forProjectRO(
projectBlobsBucket,
makeProjectKey(historyId, '')
)
} catch (err) {
if (err instanceof NotFoundError) {
throw new BackupCorruptedError('dek does not exist', {}, err)
}
throw err
}
}
/**
* @param {string} historyId
* @param {Array<string>} hashes
* @param {CachedPerProjectEncryptedS3Persistor} [projectCache]
*/
export async function verifyBlobs(historyId, hashes, projectCache) {
if (hashes.length === 0) throw new Error('bug: empty hashes')
if (!projectCache) {
projectCache = await getProjectPersistor(historyId)
}
const blobStore = new BlobStore(historyId)
for (const hash of hashes) {
const path = makeProjectKey(historyId, hash)
const blob = await blobStore.getBlob(hash)
if (!blob) throw new Blob.NotFoundError(hash)
let stream
try {
stream = await projectCache.getObjectStream(projectBlobsBucket, path, {
autoGunzip: true,
})
} catch (err) {
if (err instanceof NotFoundError) {
throw new BackupCorruptedMissingBlobError('missing blob', {
path,
hash,
})
}
throw err
}
const backupHash = await blobHash.fromStream(blob.getByteLength(), stream)
if (backupHash !== hash) {
throw new BackupCorruptedInvalidBlobError(
'hash mismatch for backed up blob',
{
path,
hash,
backupHash,
}
)
}
}
}
/**
* @param {string} historyId
* @param {Date} [endTimestamp]
*/
export async function verifyProjectWithErrorContext(
historyId,
endTimestamp = getEndDateForRPO()
) {
try {
await verifyProject(historyId, endTimestamp)
} catch (err) {
// @ts-ignore err is Error instance
throw OError.tag(err, 'verifyProject', { historyId, endTimestamp })
}
}
/**
*
* @param {string} historyId
* @param {number} startVersion
* @param {CachedPerProjectEncryptedS3Persistor} backupPersistorForProject
* @return {Promise<any>}
*/
async function loadChunk(historyId, startVersion, backupPersistorForProject) {
const key = path.join(
projectKey.format(historyId),
projectKey.pad(startVersion)
)
try {
const buf = await streams.gunzipStreamToBuffer(
await backupPersistorForProject.getObjectStream(chunksBucket, key)
)
return JSON.parse(buf.toString('utf-8'))
} catch (err) {
if (err instanceof objectPersistor.Errors.NotFoundError) {
throw new Chunk.NotPersistedError(historyId)
}
if (err instanceof Error) {
throw OError.tag(err, 'Failed to load chunk', { historyId, startVersion })
}
throw err
}
}
/**
* @param {string} historyId
* @param {Date} endTimestamp
*/
export async function verifyProject(historyId, endTimestamp) {
const backend = chunkStore.getBackend(historyId)
const [first, last] = await Promise.all([
backend.getFirstChunkBeforeTimestamp(historyId, endTimestamp),
backend.getLastActiveChunkBeforeTimestamp(historyId, endTimestamp),
])
const chunksRecordsToVerify = [
{
chunkId: first.id,
chunkLabel: 'first',
},
]
if (first.startVersion !== last.startVersion) {
chunksRecordsToVerify.push({
chunkId: last.id,
chunkLabel: 'last before RPO',
})
}
const projectCache = await getProjectPersistor(historyId)
const chunks = await Promise.all(
chunksRecordsToVerify.map(async chunk => {
try {
return History.fromRaw(
await loadChunk(historyId, chunk.startVersion, projectCache)
)
} catch (err) {
if (err instanceof Chunk.NotPersistedError) {
throw new BackupRPOViolationChunkNotBackedUpError(
'BackupRPOviolation: chunk not backed up',
chunk
)
}
throw err
}
})
)
const seenBlobs = new Set()
const blobsToVerify = []
for (const chunk of chunks) {
/** @type {Set<string>} */
const chunkBlobs = new Set()
chunk.findBlobHashes(chunkBlobs)
let hasAddedBlobFromThisChunk = false
for (const blobHash of chunkBlobs) {
if (seenBlobs.has(blobHash)) continue // old blob
if (GLOBAL_BLOBS.has(blobHash)) continue // global blob
seenBlobs.add(blobHash)
if (!hasAddedBlobFromThisChunk) {
blobsToVerify.push(blobHash)
hasAddedBlobFromThisChunk = true
}
}
}
if (blobsToVerify.length === 0) {
logger.debug(
{
historyId,
chunksRecordsToVerify: chunksRecordsToVerify.map(c => c.chunkId),
},
'chunks contain no blobs to verify'
)
return
}
await verifyBlobs(historyId, blobsToVerify, projectCache)
}
export class BackupCorruptedError extends OError {}
export class BackupRPOViolationError extends OError {}
export class BackupCorruptedMissingBlobError extends BackupCorruptedError {}
export class BackupCorruptedInvalidBlobError extends BackupCorruptedError {}
export class BackupRPOViolationChunkNotBackedUpError extends OError {}

View File

@@ -0,0 +1,212 @@
const { Binary, ObjectId } = require('mongodb')
const { projects, backedUpBlobs } = require('../mongodb')
const OError = require('@overleaf/o-error')
// List projects with pending backups older than the specified interval
function listPendingBackups(timeIntervalMs = 0, limit = null) {
const cutoffTime = new Date(Date.now() - timeIntervalMs)
const options = {
projection: { 'overleaf.backup.pendingChangeAt': 1 },
sort: { 'overleaf.backup.pendingChangeAt': 1 },
}
// Apply limit if provided
if (limit) {
options.limit = limit
}
const cursor = projects.find(
{
'overleaf.backup.pendingChangeAt': {
$exists: true,
$lt: cutoffTime,
},
},
options
)
return cursor
}
// List projects that have never been backed up and are older than the specified interval
function listUninitializedBackups(timeIntervalMs = 0, limit = null) {
const cutoffTimeInSeconds = (Date.now() - timeIntervalMs) / 1000
const options = {
projection: { _id: 1 },
sort: { _id: 1 },
}
// Apply limit if provided
if (limit) {
options.limit = limit
}
const cursor = projects.find(
{
'overleaf.backup.lastBackedUpVersion': null,
_id: {
$lt: ObjectId.createFromTime(cutoffTimeInSeconds),
},
},
options
)
return cursor
}
// Retrieve the history ID for a given project without giving direct access to the
// projects collection.
async function getHistoryId(projectId) {
const project = await projects.findOne(
{ _id: new ObjectId(projectId) },
{
projection: {
'overleaf.history.id': 1,
},
}
)
if (!project) {
throw new Error('Project not found')
}
return project.overleaf.history.id
}
async function getBackupStatus(projectId) {
const project = await projects.findOne(
{ _id: new ObjectId(projectId) },
{
projection: {
'overleaf.history': 1,
'overleaf.backup': 1,
},
}
)
if (!project) {
throw new Error('Project not found')
}
return {
backupStatus: project.overleaf.backup,
historyId: `${project.overleaf.history.id}`,
currentEndVersion: project.overleaf.history.currentEndVersion,
currentEndTimestamp: project.overleaf.history.currentEndTimestamp,
}
}
async function setBackupVersion(
projectId,
previousBackedUpVersion,
currentBackedUpVersion,
currentBackedUpAt
) {
// FIXME: include a check to handle race conditions
// to make sure only one process updates the version numbers
const result = await projects.updateOne(
{
_id: new ObjectId(projectId),
'overleaf.backup.lastBackedUpVersion': previousBackedUpVersion,
},
{
$set: {
'overleaf.backup.lastBackedUpVersion': currentBackedUpVersion,
'overleaf.backup.lastBackedUpAt': currentBackedUpAt,
},
}
)
if (result.matchedCount === 0 || result.modifiedCount === 0) {
throw new OError('Failed to update backup version', {
previousBackedUpVersion,
currentBackedUpVersion,
currentBackedUpAt,
result,
})
}
}
async function updateCurrentMetadataIfNotSet(projectId, latestChunkMetadata) {
await projects.updateOne(
{
_id: new ObjectId(projectId),
'overleaf.history.currentEndVersion': { $exists: false },
'overleaf.history.currentEndTimestamp': { $exists: false },
},
{
$set: {
'overleaf.history.currentEndVersion': latestChunkMetadata.endVersion,
'overleaf.history.currentEndTimestamp':
latestChunkMetadata.endTimestamp,
},
}
)
}
/**
* Updates the pending change timestamp for a project's backup status
* @param {string} projectId - The ID of the project to update
* @param {Date} backupStartTime - The timestamp to set for pending changes
* @returns {Promise<void>}
*
* If the project's last backed up version matches the current end version,
* the pending change timestamp is removed. Otherwise, it's set to the provided
* backup start time.
*/
async function updatePendingChangeTimestamp(projectId, backupStartTime) {
await projects.updateOne({ _id: new ObjectId(projectId) }, [
{
$set: {
'overleaf.backup.pendingChangeAt': {
$cond: {
if: {
$eq: [
'$overleaf.backup.lastBackedUpVersion',
'$overleaf.history.currentEndVersion',
],
},
then: '$$REMOVE',
else: backupStartTime,
},
},
},
},
])
}
async function getBackedUpBlobHashes(projectId) {
const result = await backedUpBlobs.findOne(
{ _id: new ObjectId(projectId) },
{ projection: { blobs: 1 } }
)
if (!result) {
return new Set()
}
const hashes = result.blobs.map(b => b.buffer.toString('hex'))
return new Set(hashes)
}
async function unsetBackedUpBlobHashes(projectId, hashes) {
const binaryHashes = hashes.map(h => new Binary(Buffer.from(h, 'hex')))
const result = await backedUpBlobs.findOneAndUpdate(
{ _id: new ObjectId(projectId) },
{
$pullAll: {
blobs: binaryHashes,
},
},
{ returnDocument: 'after' }
)
if (result && result.blobs.length === 0) {
await backedUpBlobs.deleteOne({
_id: new ObjectId(projectId),
blobs: { $size: 0 },
})
}
return result
}
module.exports = {
getHistoryId,
getBackupStatus,
setBackupVersion,
updateCurrentMetadataIfNotSet,
updatePendingChangeTimestamp,
listPendingBackups,
listUninitializedBackups,
getBackedUpBlobHashes,
unsetBackedUpBlobHashes,
}

View File

@@ -0,0 +1,40 @@
'use strict'
const BPromise = require('bluebird')
/**
* @constructor
* @param {BlobStore} blobStore
* @classdesc
* Wrapper for BlobStore that pre-fetches blob metadata to avoid making one
* database call per blob lookup.
*/
function BatchBlobStore(blobStore) {
this.blobStore = blobStore
this.blobs = new Map()
}
/**
* Pre-fetch metadata for the given blob hashes.
*
* @param {Array.<string>} hashes
* @return {Promise}
*/
BatchBlobStore.prototype.preload = function batchBlobStorePreload(hashes) {
return BPromise.each(this.blobStore.getBlobs(hashes), blob => {
this.blobs.set(blob.getHash(), blob)
})
}
/**
* @see BlobStore#getBlob
*/
BatchBlobStore.prototype.getBlob = BPromise.method(
function batchBlobStoreGetBlob(hash) {
const blob = this.blobs.get(hash)
if (blob) return blob
return this.blobStore.getBlob(hash)
}
)
module.exports = BatchBlobStore

View File

@@ -0,0 +1,80 @@
/** @module */
'use strict'
const BPromise = require('bluebird')
const fs = BPromise.promisifyAll(require('node:fs'))
const crypto = require('node:crypto')
const { pipeline } = require('node:stream')
const assert = require('./assert')
function getGitBlobHeader(byteLength) {
return 'blob ' + byteLength + '\x00'
}
function getBlobHash(byteLength) {
const hash = crypto.createHash('sha1')
hash.setEncoding('hex')
hash.update(getGitBlobHeader(byteLength))
return hash
}
/**
* Compute the git blob hash for a blob from a readable stream of its content.
*
* @function
* @param {number} byteLength
* @param {stream.Readable} stream
* @return {Promise.<string>} hexadecimal SHA-1 hash
*/
exports.fromStream = BPromise.method(
function blobHashFromStream(byteLength, stream) {
assert.integer(byteLength, 'blobHash: bad byteLength')
assert.object(stream, 'blobHash: bad stream')
const hash = getBlobHash(byteLength)
return new BPromise(function (resolve, reject) {
pipeline(stream, hash, function (err) {
if (err) {
reject(err)
} else {
hash.end()
resolve(hash.read())
}
})
})
}
)
/**
* Compute the git blob hash for a blob with the given string content.
*
* @param {string} string
* @return {string} hexadecimal SHA-1 hash
*/
exports.fromString = function blobHashFromString(string) {
assert.string(string, 'blobHash: bad string')
const hash = getBlobHash(Buffer.byteLength(string))
hash.update(string, 'utf8')
hash.end()
return hash.read()
}
/**
* Compute the git blob hash for the content of a file
*
* @param {string} filePath
* @return {string} hexadecimal SHA-1 hash
*/
exports.fromFile = function blobHashFromFile(pathname) {
assert.string(pathname, 'blobHash: bad pathname')
function getByteLengthOfFile() {
return fs.statAsync(pathname).then(stat => stat.size)
}
const fromStream = this.fromStream
return getByteLengthOfFile(pathname).then(function (byteLength) {
const stream = fs.createReadStream(pathname)
return fromStream(byteLength, stream)
})
}

View File

@@ -0,0 +1,433 @@
'use strict'
const config = require('config')
const fs = require('node:fs')
const isValidUtf8 = require('utf-8-validate')
const { ReadableString } = require('@overleaf/stream-utils')
const core = require('overleaf-editor-core')
const objectPersistor = require('@overleaf/object-persistor')
const OError = require('@overleaf/o-error')
const Blob = core.Blob
const TextOperation = core.TextOperation
const containsNonBmpChars = core.util.containsNonBmpChars
const assert = require('../assert')
const blobHash = require('../blob_hash')
const mongodb = require('../mongodb')
const persistor = require('../persistor')
const projectKey = require('../project_key')
const streams = require('../streams')
const postgresBackend = require('./postgres')
const mongoBackend = require('./mongo')
const logger = require('@overleaf/logger')
/** @import { Readable } from 'stream' */
const GLOBAL_BLOBS = new Map()
function makeGlobalKey(hash) {
return `${hash.slice(0, 2)}/${hash.slice(2, 4)}/${hash.slice(4)}`
}
function makeProjectKey(projectId, hash) {
return `${projectKey.format(projectId)}/${hash.slice(0, 2)}/${hash.slice(2)}`
}
async function uploadBlob(projectId, blob, stream, opts = {}) {
const bucket = config.get('blobStore.projectBucket')
const key = makeProjectKey(projectId, blob.getHash())
logger.debug({ projectId, blob }, 'uploadBlob started')
try {
await persistor.sendStream(bucket, key, stream, {
contentType: 'application/octet-stream',
...opts,
})
} finally {
logger.debug({ projectId, blob }, 'uploadBlob finished')
}
}
function getBlobLocation(projectId, hash) {
if (GLOBAL_BLOBS.has(hash)) {
return {
bucket: config.get('blobStore.globalBucket'),
key: makeGlobalKey(hash),
}
} else {
return {
bucket: config.get('blobStore.projectBucket'),
key: makeProjectKey(projectId, hash),
}
}
}
/**
* Returns the appropriate backend for the given project id
*
* Numeric ids use the Postgres backend.
* Strings of 24 characters use the Mongo backend.
*/
function getBackend(projectId) {
if (assert.POSTGRES_ID_REGEXP.test(projectId)) {
return postgresBackend
} else if (assert.MONGO_ID_REGEXP.test(projectId)) {
return mongoBackend
} else {
throw new OError('bad project id', { projectId })
}
}
async function makeBlobForFile(pathname) {
const { size: byteLength } = await fs.promises.stat(pathname)
const hash = await blobHash.fromStream(
byteLength,
fs.createReadStream(pathname)
)
return new Blob(hash, byteLength)
}
async function getStringLengthOfFile(byteLength, pathname) {
// We have to read the file into memory to get its UTF-8 length, so don't
// bother for files that are too large for us to edit anyway.
if (byteLength > Blob.MAX_EDITABLE_BYTE_LENGTH_BOUND) {
return null
}
// We need to check if the file contains nonBmp or null characters
let data = await fs.promises.readFile(pathname)
if (!isValidUtf8(data)) return null
data = data.toString()
if (data.length > TextOperation.MAX_STRING_LENGTH) return null
if (containsNonBmpChars(data)) return null
if (data.indexOf('\x00') !== -1) return null
return data.length
}
async function deleteBlobsInBucket(projectId) {
const bucket = config.get('blobStore.projectBucket')
const prefix = `${projectKey.format(projectId)}/`
logger.debug({ projectId }, 'deleteBlobsInBucket started')
try {
await persistor.deleteDirectory(bucket, prefix)
} finally {
logger.debug({ projectId }, 'deleteBlobsInBucket finished')
}
}
async function loadGlobalBlobs() {
const blobs = await mongodb.globalBlobs.find()
for await (const blob of blobs) {
GLOBAL_BLOBS.set(blob._id, {
blob: new Blob(blob._id, blob.byteLength, blob.stringLength),
demoted: Boolean(blob.demoted),
})
}
}
/**
* Return metadata for all blobs in the given project
* @param {Array<string|number>} projectIds
* @return {Promise<{nBlobs:number, blobs:Map<string,Array<core.Blob>>}>}
*/
async function getProjectBlobsBatch(projectIds) {
const mongoProjects = []
const postgresProjects = []
for (const projectId of projectIds) {
if (typeof projectId === 'number') {
postgresProjects.push(projectId)
} else {
mongoProjects.push(projectId)
}
}
const [
{ nBlobs: nBlobsPostgres, blobs: blobsPostgres },
{ nBlobs: nBlobsMongo, blobs: blobsMongo },
] = await Promise.all([
postgresBackend.getProjectBlobsBatch(postgresProjects),
mongoBackend.getProjectBlobsBatch(mongoProjects),
])
for (const [id, blobs] of blobsPostgres.entries()) {
blobsMongo.set(id.toString(), blobs)
}
return { nBlobs: nBlobsPostgres + nBlobsMongo, blobs: blobsMongo }
}
/**
* @classdesc
* Fetch and store the content of files using content-addressable hashing. The
* blob store manages both content and metadata (byte and UTF-8 length) for
* blobs.
*/
class BlobStore {
/**
* @constructor
* @param {string} projectId the project for which we'd like to find blobs
*/
constructor(projectId) {
assert.projectId(projectId)
this.projectId = projectId
this.backend = getBackend(this.projectId)
}
/**
* Set up the initial data structure for a given project
*/
async initialize() {
await this.backend.initialize(this.projectId)
}
/**
* Write a blob, if one does not already exist, with the given UTF-8 encoded
* string content.
*
* @param {string} string
* @return {Promise.<core.Blob>}
*/
async putString(string) {
assert.string(string, 'bad string')
const hash = blobHash.fromString(string)
const existingBlob = await this._findBlobBeforeInsert(hash)
if (existingBlob != null) {
return existingBlob
}
const newBlob = new Blob(hash, Buffer.byteLength(string), string.length)
// Note: the ReadableString is to work around a bug in the AWS SDK: it won't
// allow Body to be blank.
await uploadBlob(this.projectId, newBlob, new ReadableString(string))
await this.backend.insertBlob(this.projectId, newBlob)
return newBlob
}
/**
* Write a blob, if one does not already exist, with the given file (usually a
* temporary file).
*
* @param {string} pathname
* @return {Promise<core.Blob>}
*/
async putFile(pathname) {
assert.string(pathname, 'bad pathname')
const newBlob = await makeBlobForFile(pathname)
const existingBlob = await this._findBlobBeforeInsert(newBlob.getHash())
if (existingBlob != null) {
return existingBlob
}
const stringLength = await getStringLengthOfFile(
newBlob.getByteLength(),
pathname
)
newBlob.setStringLength(stringLength)
await this.putBlob(pathname, newBlob)
return newBlob
}
/**
* Write a new blob, the stringLength must have been added already. It should
* have been checked that the blob does not exist yet. Consider using
* {@link putFile} instead of this lower-level method.
*
* @param {string} pathname
* @param {core.Blob} finializedBlob
* @return {Promise<void>}
*/
async putBlob(pathname, finializedBlob) {
await uploadBlob(
this.projectId,
finializedBlob,
fs.createReadStream(pathname)
)
await this.backend.insertBlob(this.projectId, finializedBlob)
}
/**
* Stores an object as a JSON string in a blob.
*
* @param {object} obj
* @returns {Promise.<core.Blob>}
*/
async putObject(obj) {
assert.object(obj, 'bad object')
const string = JSON.stringify(obj)
return await this.putString(string)
}
/**
*
* Fetch a blob's content by its hash as a UTF-8 encoded string.
*
* @param {string} hash hexadecimal SHA-1 hash
* @return {Promise.<string>} promise for the content of the file
*/
async getString(hash) {
assert.blobHash(hash, 'bad hash')
const projectId = this.projectId
logger.debug({ projectId, hash }, 'getString started')
try {
const stream = await this.getStream(hash)
const buffer = await streams.readStreamToBuffer(stream)
return buffer.toString()
} finally {
logger.debug({ projectId, hash }, 'getString finished')
}
}
/**
* Fetch a JSON encoded blob by its hash and deserialize it.
*
* @template [T=unknown]
* @param {string} hash hexadecimal SHA-1 hash
* @return {Promise.<T>} promise for the content of the file
*/
async getObject(hash) {
assert.blobHash(hash, 'bad hash')
const projectId = this.projectId
logger.debug({ projectId, hash }, 'getObject started')
try {
const jsonString = await this.getString(hash)
const object = JSON.parse(jsonString)
return object
} catch (error) {
// Maybe this is blob is gzipped. Try to gunzip it.
// TODO: Remove once we've ensured this is not reached
const stream = await this.getStream(hash)
const buffer = await streams.gunzipStreamToBuffer(stream)
const object = JSON.parse(buffer.toString())
logger.warn('getObject: Gzipped object in BlobStore')
return object
} finally {
logger.debug({ projectId, hash }, 'getObject finished')
}
}
/**
* Fetch a blob by its hash as a stream.
*
* Note that, according to the AWS SDK docs, this does not retry after initial
* failure, so the caller must be prepared to retry on errors, if appropriate.
*
* @param {string} hash hexadecimal SHA-1 hash
* @param {Object} opts
* @return {Promise.<Readable>} a stream to read the file
*/
async getStream(hash, opts = {}) {
assert.blobHash(hash, 'bad hash')
const { bucket, key } = getBlobLocation(this.projectId, hash)
try {
const stream = await persistor.getObjectStream(bucket, key, opts)
return stream
} catch (err) {
if (err instanceof objectPersistor.Errors.NotFoundError) {
throw new Blob.NotFoundError(hash)
}
throw err
}
}
/**
* Read a blob metadata record by hexadecimal hash.
*
* @param {string} hash hexadecimal SHA-1 hash
* @return {Promise<core.Blob | null>}
*/
async getBlob(hash) {
assert.blobHash(hash, 'bad hash')
const globalBlob = GLOBAL_BLOBS.get(hash)
if (globalBlob != null) {
return globalBlob.blob
}
const blob = await this.backend.findBlob(this.projectId, hash)
return blob
}
async getBlobs(hashes) {
assert.array(hashes, 'bad hashes')
const nonGlobalHashes = []
const blobs = []
for (const hash of hashes) {
const globalBlob = GLOBAL_BLOBS.get(hash)
if (globalBlob != null) {
blobs.push(globalBlob.blob)
} else {
nonGlobalHashes.push(hash)
}
}
if (nonGlobalHashes.length === 0) {
return blobs // to avoid unnecessary database lookup
}
const projectBlobs = await this.backend.findBlobs(
this.projectId,
nonGlobalHashes
)
blobs.push(...projectBlobs)
return blobs
}
/**
* Retrieve all blobs associated with the project.
* @returns {Promise<core.Blob[]>} A promise that resolves to an array of blobs.
*/
async getProjectBlobs() {
const projectBlobs = await this.backend.getProjectBlobs(this.projectId)
return projectBlobs
}
/**
* Delete all blobs that belong to the project.
*/
async deleteBlobs() {
await Promise.all([
this.backend.deleteBlobs(this.projectId),
deleteBlobsInBucket(this.projectId),
])
}
async _findBlobBeforeInsert(hash) {
const globalBlob = GLOBAL_BLOBS.get(hash)
if (globalBlob != null && !globalBlob.demoted) {
return globalBlob.blob
}
const blob = await this.backend.findBlob(this.projectId, hash)
return blob
}
/**
* Copy an existing sourceBlob in this project to a target project.
* @param {Blob} sourceBlob
* @param {string} targetProjectId
* @return {Promise<void>}
*/
async copyBlob(sourceBlob, targetProjectId) {
assert.instance(sourceBlob, Blob, 'bad sourceBlob')
assert.projectId(targetProjectId, 'bad targetProjectId')
const hash = sourceBlob.getHash()
const sourceProjectId = this.projectId
const { bucket, key: sourceKey } = getBlobLocation(sourceProjectId, hash)
const destKey = makeProjectKey(targetProjectId, hash)
const targetBackend = getBackend(targetProjectId)
logger.debug({ sourceProjectId, targetProjectId, hash }, 'copyBlob started')
try {
await persistor.copyObject(bucket, sourceKey, destKey)
await targetBackend.insertBlob(targetProjectId, sourceBlob)
} finally {
logger.debug(
{ sourceProjectId, targetProjectId, hash },
'copyBlob finished'
)
}
}
}
module.exports = {
BlobStore,
getProjectBlobsBatch,
loadGlobalBlobs,
makeProjectKey,
makeBlobForFile,
getStringLengthOfFile,
GLOBAL_BLOBS,
}

View File

@@ -0,0 +1,437 @@
// @ts-check
/**
* Mongo backend for the blob store.
*
* Blobs are stored in the projectHistoryBlobs collection. Each project has a
* document in that collection. That document has a "blobs" subdocument whose
* fields are buckets of blobs. The key of a bucket is the first three hex
* digits of the blob hash. The value of the bucket is an array of blobs that
* match the key.
*
* Buckets have a maximum capacity of 8 blobs. When that capacity is exceeded,
* blobs are stored in a secondary collection: the projectHistoryShardedBlobs
* collection. This collection shards blobs between 16 documents per project.
* The shard key is the first hex digit of the hash. The documents are also
* organized in buckets, but the bucket key is made of hex digits 2, 3 and 4.
*/
const { Blob } = require('overleaf-editor-core')
const { ObjectId, Binary, MongoError, ReadPreference } = require('mongodb')
const assert = require('../assert')
const mongodb = require('../mongodb')
const MAX_BLOBS_IN_BUCKET = 8
const DUPLICATE_KEY_ERROR_CODE = 11000
/**
* @typedef {import('mongodb').ReadPreferenceLike} ReadPreferenceLike
*/
/**
* Set up the data structures for a given project.
* @param {string} projectId
*/
async function initialize(projectId) {
assert.mongoId(projectId, 'bad projectId')
try {
await mongodb.blobs.insertOne({
_id: new ObjectId(projectId),
blobs: {},
})
} catch (err) {
if (err instanceof MongoError && err.code === DUPLICATE_KEY_ERROR_CODE) {
return // ignore already initialized case
}
throw err
}
}
/**
* Return blob metadata for the given project and hash.
* @param {string} projectId
* @param {string} hash
* @return {Promise<Blob | null>}
*/
async function findBlob(projectId, hash) {
assert.mongoId(projectId, 'bad projectId')
assert.blobHash(hash, 'bad hash')
const bucket = getBucket(hash)
const result = await mongodb.blobs.findOne(
{ _id: new ObjectId(projectId) },
{ projection: { _id: 0, bucket: `$${bucket}` } }
)
if (result?.bucket == null) {
return null
}
const record = result.bucket.find(blob => blob.h.toString('hex') === hash)
if (record == null) {
if (result.bucket.length >= MAX_BLOBS_IN_BUCKET) {
return await findBlobSharded(projectId, hash)
} else {
return null
}
}
return recordToBlob(record)
}
/**
* Search in the sharded collection for blob metadata
* @param {string} projectId
* @param {string} hash
* @return {Promise<Blob | null>}
*/
async function findBlobSharded(projectId, hash) {
const [shard, bucket] = getShardedBucket(hash)
const id = makeShardedId(projectId, shard)
const result = await mongodb.shardedBlobs.findOne(
{ _id: id },
{ projection: { _id: 0, blobs: `$${bucket}` } }
)
if (result?.blobs == null) {
return null
}
const record = result.blobs.find(blob => blob.h.toString('hex') === hash)
if (!record) return null
return recordToBlob(record)
}
/**
* Read multiple blob metadata records by hexadecimal hashes.
* @param {string} projectId
* @param {Array<string>} hashes
* @return {Promise<Array<Blob>>}
*/
async function findBlobs(projectId, hashes) {
assert.mongoId(projectId, 'bad projectId')
assert.array(hashes, 'bad hashes: not array')
hashes.forEach(function (hash) {
assert.blobHash(hash, 'bad hash')
})
// Build a set of unique buckets
const buckets = new Set(hashes.map(getBucket))
// Get buckets from Mongo
const projection = { _id: 0 }
for (const bucket of buckets) {
projection[bucket] = 1
}
const result = await mongodb.blobs.findOne(
{ _id: new ObjectId(projectId) },
{ projection }
)
if (result?.blobs == null) {
return []
}
// Build blobs from the query results
const hashSet = new Set(hashes)
const blobs = []
for (const bucket of Object.values(result.blobs)) {
for (const record of bucket) {
const hash = record.h.toString('hex')
if (hashSet.has(hash)) {
blobs.push(recordToBlob(record))
hashSet.delete(hash)
}
}
}
// If we haven't found all the blobs, look in the sharded collection
if (hashSet.size > 0) {
const shardedBlobs = await findBlobsSharded(projectId, hashSet)
blobs.push(...shardedBlobs)
}
return blobs
}
/**
* Search in the sharded collection for blob metadata.
* @param {string} projectId
* @param {Set<string>} hashSet
* @return {Promise<Array<Blob>>}
*/
async function findBlobsSharded(projectId, hashSet) {
// Build a map of buckets by shard key
const bucketsByShard = new Map()
for (const hash of hashSet) {
const [shard, bucket] = getShardedBucket(hash)
let buckets = bucketsByShard.get(shard)
if (buckets == null) {
buckets = new Set()
bucketsByShard.set(shard, buckets)
}
buckets.add(bucket)
}
// Make parallel requests to the shards that might contain the hashes we want
const requests = []
for (const [shard, buckets] of bucketsByShard.entries()) {
const id = makeShardedId(projectId, shard)
const projection = { _id: 0 }
for (const bucket of buckets) {
projection[bucket] = 1
}
const request = mongodb.shardedBlobs.findOne({ _id: id }, { projection })
requests.push(request)
}
const results = await Promise.all(requests)
// Build blobs from the query results
const blobs = []
for (const result of results) {
if (result?.blobs == null) {
continue
}
for (const bucket of Object.values(result.blobs)) {
for (const record of bucket) {
const hash = record.h.toString('hex')
if (hashSet.has(hash)) {
blobs.push(recordToBlob(record))
}
}
}
}
return blobs
}
/**
* Return metadata for all blobs in the given project
*/
async function getProjectBlobs(projectId) {
assert.mongoId(projectId, 'bad projectId')
const result = await mongodb.blobs.findOne(
{ _id: new ObjectId(projectId) },
{ projection: { _id: 0 } }
)
if (!result) {
return []
}
// Build blobs from the query results
const blobs = []
for (const bucket of Object.values(result.blobs)) {
for (const record of bucket) {
blobs.push(recordToBlob(record))
}
}
// Look for all possible sharded blobs
const minShardedId = makeShardedId(projectId, '0')
const maxShardedId = makeShardedId(projectId, 'f')
// @ts-ignore We are using a custom _id here.
const shardedRecords = mongodb.shardedBlobs.find(
{
_id: { $gte: minShardedId, $lte: maxShardedId },
},
{ projection: { _id: 0 } }
)
for await (const shardedRecord of shardedRecords) {
if (shardedRecord.blobs == null) {
continue
}
for (const bucket of Object.values(shardedRecord.blobs)) {
for (const record of bucket) {
blobs.push(recordToBlob(record))
}
}
}
return blobs
}
/**
* Return metadata for all blobs in the given project
* @param {Array<string>} projectIds
* @return {Promise<{ nBlobs: number, blobs: Map<string, Array<Blob>> }>}
*/
async function getProjectBlobsBatch(projectIds) {
for (const project of projectIds) {
assert.mongoId(project, 'bad projectId')
}
let nBlobs = 0
const blobs = new Map()
if (projectIds.length === 0) return { nBlobs, blobs }
// blobs
{
const cursor = await mongodb.blobs.find(
{ _id: { $in: projectIds.map(projectId => new ObjectId(projectId)) } },
{ readPreference: ReadPreference.secondaryPreferred }
)
for await (const record of cursor) {
const projectBlobs = Object.values(record.blobs).flat().map(recordToBlob)
blobs.set(record._id.toString(), projectBlobs)
nBlobs += projectBlobs.length
}
}
// sharded blobs
{
// @ts-ignore We are using a custom _id here.
const cursor = await mongodb.shardedBlobs.find(
{
_id: {
$gte: makeShardedId(projectIds[0], '0'),
$lte: makeShardedId(projectIds[projectIds.length - 1], 'f'),
},
},
{ readPreference: ReadPreference.secondaryPreferred }
)
for await (const record of cursor) {
const recordIdHex = record._id.toString('hex')
const recordProjectId = recordIdHex.slice(0, 24)
const projectBlobs = Object.values(record.blobs).flat().map(recordToBlob)
const found = blobs.get(recordProjectId)
if (found) {
found.push(...projectBlobs)
} else {
blobs.set(recordProjectId, projectBlobs)
}
nBlobs += projectBlobs.length
}
}
return { nBlobs, blobs }
}
/**
* Add a blob's metadata to the blobs collection after it has been uploaded.
* @param {string} projectId
* @param {Blob} blob
*/
async function insertBlob(projectId, blob) {
assert.mongoId(projectId, 'bad projectId')
const hash = blob.getHash()
const bucket = getBucket(hash)
const record = blobToRecord(blob)
const result = await mongodb.blobs.updateOne(
{
_id: new ObjectId(projectId),
$expr: {
$lt: [{ $size: { $ifNull: [`$${bucket}`, []] } }, MAX_BLOBS_IN_BUCKET],
},
},
{
$addToSet: { [bucket]: record },
}
)
if (result.matchedCount === 0) {
await insertRecordSharded(projectId, hash, record)
}
}
/**
* Add a blob's metadata to the sharded blobs collection.
* @param {string} projectId
* @param {string} hash
* @param {Record} record
* @return {Promise<void>}
*/
async function insertRecordSharded(projectId, hash, record) {
const [shard, bucket] = getShardedBucket(hash)
const id = makeShardedId(projectId, shard)
await mongodb.shardedBlobs.updateOne(
{ _id: id },
{ $addToSet: { [bucket]: record } },
{ upsert: true }
)
}
/**
* Delete all blobs for a given project.
* @param {string} projectId
*/
async function deleteBlobs(projectId) {
assert.mongoId(projectId, 'bad projectId')
await mongodb.blobs.deleteOne({ _id: new ObjectId(projectId) })
const minShardedId = makeShardedId(projectId, '0')
const maxShardedId = makeShardedId(projectId, 'f')
await mongodb.shardedBlobs.deleteMany({
// @ts-ignore We are using a custom _id here.
_id: { $gte: minShardedId, $lte: maxShardedId },
})
}
/**
* Return the Mongo path to the bucket for the given hash.
* @param {string} hash
* @return {string}
*/
function getBucket(hash) {
return `blobs.${hash.slice(0, 3)}`
}
/**
* Return the shard key and Mongo path to the bucket for the given hash in the
* sharded collection.
* @param {string} hash
* @return {[string, string]}
*/
function getShardedBucket(hash) {
const shard = hash.slice(0, 1)
const bucket = `blobs.${hash.slice(1, 4)}`
return [shard, bucket]
}
/**
* Create an _id key for the sharded collection.
* @param {string} projectId
* @param {string} shard
* @return {Binary}
*/
function makeShardedId(projectId, shard) {
return new Binary(Buffer.from(`${projectId}0${shard}`, 'hex'))
}
/**
* @typedef {Object} Record
* @property {Binary} h
* @property {number} b
* @property {number} [s]
*/
/**
* Return the Mongo record for the given blob.
* @param {Blob} blob
* @return {Record}
*/
function blobToRecord(blob) {
const hash = blob.getHash()
const byteLength = blob.getByteLength()
const stringLength = blob.getStringLength()
return {
h: new Binary(Buffer.from(hash, 'hex')),
b: byteLength,
s: stringLength,
}
}
/**
* Create a blob from the given Mongo record.
* @param {Record} record
* @return {Blob}
*/
function recordToBlob(record) {
return new Blob(record.h.toString('hex'), record.b, record.s)
}
module.exports = {
initialize,
findBlob,
findBlobs,
getProjectBlobs,
getProjectBlobsBatch,
insertBlob,
deleteBlobs,
}

View File

@@ -0,0 +1,161 @@
const { Blob } = require('overleaf-editor-core')
const assert = require('../assert')
const knex = require('../knex')
/**
* Set up the initial data structures for a project
*/
async function initialize(projectId) {
// Nothing to do for Postgres
}
/**
* Return blob metadata for the given project and hash
*/
async function findBlob(projectId, hash) {
assert.postgresId(projectId, 'bad projectId')
projectId = parseInt(projectId, 10)
assert.blobHash(hash, 'bad hash')
const binaryHash = hashToBuffer(hash)
const record = await knex('project_blobs')
.select('hash_bytes', 'byte_length', 'string_length')
.where({
project_id: projectId,
hash_bytes: binaryHash,
})
.first()
return recordToBlob(record)
}
/**
* Read multiple blob metadata records by hexadecimal hashes.
*
* @param {Array.<string>} hashes hexadecimal SHA-1 hashes
* @return {Promise.<Array.<Blob?>>} no guarantee on order
*/
async function findBlobs(projectId, hashes) {
assert.postgresId(projectId, 'bad projectId')
projectId = parseInt(projectId, 10)
assert.array(hashes, 'bad hashes: not array')
hashes.forEach(function (hash) {
assert.blobHash(hash, 'bad hash')
})
const binaryHashes = hashes.map(hashToBuffer)
const records = await knex('project_blobs')
.select('hash_bytes', 'byte_length', 'string_length')
.where('project_id', projectId)
.whereIn('hash_bytes', binaryHashes)
const blobs = records.map(recordToBlob)
return blobs
}
/**
* Return metadata for all blobs in the given project
*/
async function getProjectBlobs(projectId) {
assert.postgresId(projectId, 'bad projectId')
projectId = parseInt(projectId, 10)
const records = await knex('project_blobs')
.select('hash_bytes', 'byte_length', 'string_length')
.where({
project_id: projectId,
})
const blobs = records.map(recordToBlob)
return blobs
}
/**
* Return metadata for all blobs in the given project
* @param {Array<number>} projectIds
* @return {Promise<{ nBlobs: number, blobs: Map<number, Array<Blob>> }>}
*/
async function getProjectBlobsBatch(projectIds) {
for (const projectId of projectIds) {
assert.integer(projectId, 'bad projectId')
}
let nBlobs = 0
const blobs = new Map()
if (projectIds.length === 0) return { nBlobs, blobs }
const cursor = knex('project_blobs')
.select('project_id', 'hash_bytes', 'byte_length', 'string_length')
.whereIn('project_id', projectIds)
.stream()
for await (const record of cursor) {
const found = blobs.get(record.project_id)
if (found) {
found.push(recordToBlob(record))
} else {
blobs.set(record.project_id, [recordToBlob(record)])
}
nBlobs++
}
return { nBlobs, blobs }
}
/**
* Add a blob's metadata to the blobs table after it has been uploaded.
*/
async function insertBlob(projectId, blob) {
assert.postgresId(projectId, 'bad projectId')
projectId = parseInt(projectId, 10)
await knex('project_blobs')
.insert(blobToRecord(projectId, blob))
.onConflict(['project_id', 'hash_bytes'])
.ignore()
}
/**
* Deletes all blobs for a given project
*/
async function deleteBlobs(projectId) {
assert.postgresId(projectId, 'bad projectId')
projectId = parseInt(projectId, 10)
await knex('project_blobs').where('project_id', projectId).delete()
}
function blobToRecord(projectId, blob) {
return {
project_id: projectId,
hash_bytes: hashToBuffer(blob.hash),
byte_length: blob.getByteLength(),
string_length: blob.getStringLength(),
}
}
function recordToBlob(record) {
if (!record) return
return new Blob(
hashFromBuffer(record.hash_bytes),
record.byte_length,
record.string_length
)
}
function hashToBuffer(hash) {
if (!hash) return
return Buffer.from(hash, 'hex')
}
function hashFromBuffer(buffer) {
if (!buffer) return
return buffer.toString('hex')
}
module.exports = {
initialize,
findBlob,
findBlobs,
getProjectBlobs,
getProjectBlobsBatch,
insertBlob,
deleteBlobs,
}

View File

@@ -0,0 +1,40 @@
'use strict'
/**
* @module storage/lib/chunk_buffer
*/
const chunkStore = require('../chunk_store')
const redisBackend = require('../chunk_store/redis')
const metrics = require('@overleaf/metrics')
/**
* Load the latest Chunk stored for a project, including blob metadata.
*
* @param {string} projectId
* @return {Promise.<Chunk>}
*/
async function loadLatest(projectId) {
const cachedChunk = await redisBackend.getCurrentChunk(projectId)
const chunkRecord = await chunkStore.loadLatestRaw(projectId)
const cachedChunkIsValid = redisBackend.checkCacheValidityWithMetadata(
cachedChunk,
chunkRecord
)
if (cachedChunkIsValid) {
metrics.inc('chunk_buffer.loadLatest', 1, {
status: 'cache-hit',
})
return cachedChunk
} else {
metrics.inc('chunk_buffer.loadLatest', 1, {
status: 'cache-miss',
})
const chunk = await chunkStore.loadLatest(projectId)
await redisBackend.setCurrentChunk(projectId, chunk)
return chunk
}
}
module.exports = {
loadLatest,
}

View File

@@ -0,0 +1,7 @@
const OError = require('@overleaf/o-error')
class ChunkVersionConflictError extends OError {}
module.exports = {
ChunkVersionConflictError,
}

View File

@@ -0,0 +1,447 @@
// @ts-check
'use strict'
/**
* Manage {@link Chunk} and {@link History} storage.
*
* For storage, chunks are immutable. If we want to update a project with new
* changes, we create a new chunk record and History object and delete the old
* ones. If we compact a project's history, we similarly destroy the old chunk
* (or chunks) and replace them with a new one. This is helpful when using S3,
* because it guarantees only eventual consistency for updates but provides
* stronger consistency guarantees for object creation.
*
* When a chunk record in the database is removed, we save its ID for later
* in the `old_chunks` table, rather than deleting it immediately. This lets us
* use batch deletion to reduce the number of delete requests to S3.
*
* The chunk store also caches data about which blobs are referenced by each
* chunk, which allows us to find unused blobs without loading all of the data
* for all projects from S3. Whenever we create a chunk, we also insert records
* into the `chunk_blobs` table, to help with this bookkeeping.
*/
const config = require('config')
const OError = require('@overleaf/o-error')
const { Chunk, History, Snapshot } = require('overleaf-editor-core')
const assert = require('../assert')
const BatchBlobStore = require('../batch_blob_store')
const { BlobStore } = require('../blob_store')
const { historyStore } = require('../history_store')
const mongoBackend = require('./mongo')
const postgresBackend = require('./postgres')
const { ChunkVersionConflictError } = require('./errors')
const DEFAULT_DELETE_BATCH_SIZE = parseInt(config.get('maxDeleteKeys'), 10)
const DEFAULT_DELETE_TIMEOUT_SECS = 3000 // 50 minutes
const DEFAULT_DELETE_MIN_AGE_SECS = 86400 // 1 day
/**
* Create the initial chunk for a project.
*/
async function initializeProject(projectId, snapshot) {
if (projectId != null) {
assert.projectId(projectId, 'bad projectId')
} else {
projectId = await postgresBackend.generateProjectId()
}
if (snapshot != null) {
assert.instance(snapshot, Snapshot, 'bad snapshot')
} else {
snapshot = new Snapshot()
}
const blobStore = new BlobStore(projectId)
await blobStore.initialize()
const backend = getBackend(projectId)
const chunkRecord = await backend.getLatestChunk(projectId)
if (chunkRecord != null) {
throw new AlreadyInitialized(projectId)
}
const history = new History(snapshot, [])
const chunk = new Chunk(history, 0)
await create(projectId, chunk)
return projectId
}
/**
* Load the blobs referenced in the given history
*/
async function lazyLoadHistoryFiles(history, batchBlobStore) {
const blobHashes = new Set()
history.findBlobHashes(blobHashes)
await batchBlobStore.preload(Array.from(blobHashes))
await history.loadFiles('lazy', batchBlobStore)
}
/**
* Load the latest Chunk stored for a project, including blob metadata.
*
* @param {string} projectId
* @param {Object} [opts]
* @param {boolean} [opts.readOnly]
* @return {Promise<{id: string, startVersion: number, endVersion: number, endTimestamp: Date}>}
*/
async function loadLatestRaw(projectId, opts) {
assert.projectId(projectId, 'bad projectId')
const backend = getBackend(projectId)
const chunkRecord = await backend.getLatestChunk(projectId, opts)
if (chunkRecord == null) {
throw new Chunk.NotFoundError(projectId)
}
return chunkRecord
}
/**
* Load the latest Chunk stored for a project, including blob metadata.
*
* @param {string} projectId
* @return {Promise.<Chunk>}
*/
async function loadLatest(projectId) {
const chunkRecord = await loadLatestRaw(projectId)
const rawHistory = await historyStore.loadRaw(projectId, chunkRecord.id)
const history = History.fromRaw(rawHistory)
const blobStore = new BlobStore(projectId)
const batchBlobStore = new BatchBlobStore(blobStore)
await lazyLoadHistoryFiles(history, batchBlobStore)
return new Chunk(history, chunkRecord.startVersion)
}
/**
* Load the the chunk that contains the given version, including blob metadata.
*/
async function loadAtVersion(projectId, version) {
assert.projectId(projectId, 'bad projectId')
assert.integer(version, 'bad version')
const backend = getBackend(projectId)
const blobStore = new BlobStore(projectId)
const batchBlobStore = new BatchBlobStore(blobStore)
const chunkRecord = await backend.getChunkForVersion(projectId, version)
const rawHistory = await historyStore.loadRaw(projectId, chunkRecord.id)
const history = History.fromRaw(rawHistory)
await lazyLoadHistoryFiles(history, batchBlobStore)
return new Chunk(history, chunkRecord.endVersion - history.countChanges())
}
/**
* Load the chunk that contains the version that was current at the given
* timestamp, including blob metadata.
*/
async function loadAtTimestamp(projectId, timestamp) {
assert.projectId(projectId, 'bad projectId')
assert.date(timestamp, 'bad timestamp')
const backend = getBackend(projectId)
const blobStore = new BlobStore(projectId)
const batchBlobStore = new BatchBlobStore(blobStore)
const chunkRecord = await backend.getChunkForTimestamp(projectId, timestamp)
const rawHistory = await historyStore.loadRaw(projectId, chunkRecord.id)
const history = History.fromRaw(rawHistory)
await lazyLoadHistoryFiles(history, batchBlobStore)
return new Chunk(history, chunkRecord.endVersion - history.countChanges())
}
/**
* Store the chunk and insert corresponding records in the database.
*
* @param {string} projectId
* @param {Chunk} chunk
* @param {Date} [earliestChangeTimestamp]
*/
async function create(projectId, chunk, earliestChangeTimestamp) {
assert.projectId(projectId, 'bad projectId')
assert.instance(chunk, Chunk, 'bad chunk')
assert.maybe.date(earliestChangeTimestamp, 'bad timestamp')
const backend = getBackend(projectId)
const chunkStart = chunk.getStartVersion()
const chunkId = await uploadChunk(projectId, chunk)
const opts = {}
if (chunkStart > 0) {
opts.oldChunkId = await getChunkIdForVersion(projectId, chunkStart - 1)
}
if (earliestChangeTimestamp != null) {
opts.earliestChangeTimestamp = earliestChangeTimestamp
}
await backend.confirmCreate(projectId, chunk, chunkId, opts)
}
/**
* Upload the given chunk to object storage.
*
* This is used by the create and update methods.
*/
async function uploadChunk(projectId, chunk) {
const backend = getBackend(projectId)
const blobStore = new BlobStore(projectId)
const historyStoreConcurrency = parseInt(
config.get('chunkStore.historyStoreConcurrency'),
10
)
const rawHistory = await chunk
.getHistory()
.store(blobStore, historyStoreConcurrency)
const chunkId = await backend.insertPendingChunk(projectId, chunk)
await historyStore.storeRaw(projectId, chunkId, rawHistory)
return chunkId
}
/**
* Extend the project's history by replacing the latest chunk with a new
* chunk.
*
* @param {string} projectId
* @param {number} oldEndVersion
* @param {Chunk} newChunk
* @param {Date} [earliestChangeTimestamp]
* @return {Promise}
*/
async function update(
projectId,
oldEndVersion,
newChunk,
earliestChangeTimestamp
) {
assert.projectId(projectId, 'bad projectId')
assert.integer(oldEndVersion, 'bad oldEndVersion')
assert.instance(newChunk, Chunk, 'bad newChunk')
assert.maybe.date(earliestChangeTimestamp, 'bad timestamp')
const backend = getBackend(projectId)
const oldChunkId = await getChunkIdForVersion(projectId, oldEndVersion)
const newChunkId = await uploadChunk(projectId, newChunk)
const opts = {}
if (earliestChangeTimestamp != null) {
opts.earliestChangeTimestamp = earliestChangeTimestamp
}
await backend.confirmUpdate(projectId, oldChunkId, newChunk, newChunkId, opts)
}
/**
* Find the chunk ID for a given version of a project.
*
* @param {string} projectId
* @param {number} version
* @return {Promise.<string>}
*/
async function getChunkIdForVersion(projectId, version) {
const backend = getBackend(projectId)
const chunkRecord = await backend.getChunkForVersion(projectId, version)
return chunkRecord.id
}
/**
* Find the chunk metadata for a given version of a project.
*
* @param {string} projectId
* @param {number} version
* @return {Promise.<{id: string|number, startVersion: number, endVersion: number}>}
*/
async function getChunkMetadataForVersion(projectId, version) {
const backend = getBackend(projectId)
const chunkRecord = await backend.getChunkForVersion(projectId, version)
return chunkRecord
}
/**
* Get all of a project's chunk ids
*/
async function getProjectChunkIds(projectId) {
const backend = getBackend(projectId)
const chunkIds = await backend.getProjectChunkIds(projectId)
return chunkIds
}
/**
* Get all of a projects chunks directly
*/
async function getProjectChunks(projectId) {
const backend = getBackend(projectId)
const chunkIds = await backend.getProjectChunks(projectId)
return chunkIds
}
/**
* Load the chunk for a given chunk record, including blob metadata.
*/
async function loadByChunkRecord(projectId, chunkRecord) {
const blobStore = new BlobStore(projectId)
const batchBlobStore = new BatchBlobStore(blobStore)
const { raw: rawHistory, buffer: chunkBuffer } =
await historyStore.loadRawWithBuffer(projectId, chunkRecord.id)
const history = History.fromRaw(rawHistory)
await lazyLoadHistoryFiles(history, batchBlobStore)
return {
chunk: new Chunk(history, chunkRecord.endVersion - history.countChanges()),
chunkBuffer,
}
}
/**
* Asynchronously retrieves project chunks starting from a specific version.
*
* This generator function yields chunk records for a given project starting from the specified version (inclusive).
* It continues to fetch and yield subsequent chunk records until the end version of the latest chunk metadata is reached.
* If you want to fetch all the chunks *after* a version V, call this function with V+1.
*
* @param {string} projectId - The ID of the project.
* @param {number} version - The starting version to retrieve chunks from.
* @returns {AsyncGenerator<Object, void, undefined>} An async generator that yields chunk records.
*/
async function* getProjectChunksFromVersion(projectId, version) {
const backend = getBackend(projectId)
const latestChunkMetadata = await loadLatestRaw(projectId)
if (!latestChunkMetadata || version > latestChunkMetadata.endVersion) {
return
}
let chunkRecord = await backend.getChunkForVersion(projectId, version)
while (chunkRecord != null) {
yield chunkRecord
if (chunkRecord.endVersion >= latestChunkMetadata.endVersion) {
break
} else {
chunkRecord = await backend.getChunkForVersion(
projectId,
chunkRecord.endVersion + 1
)
}
}
}
/**
* Delete the given chunk from the database.
*
* This doesn't delete the chunk from object storage yet. The old chunks
* collection will do that.
*/
async function destroy(projectId, chunkId) {
const backend = getBackend(projectId)
await backend.deleteChunk(projectId, chunkId)
}
/**
* Delete all of a project's chunks from the database.
*/
async function deleteProjectChunks(projectId) {
const backend = getBackend(projectId)
await backend.deleteProjectChunks(projectId)
}
/**
* Delete a given number of old chunks from both the database
* and from object storage.
*
* @param {object} options
* @param {number} [options.batchSize] - number of chunks to delete in each
* batch
* @param {number} [options.maxBatches] - maximum number of batches to process
* @param {number} [options.minAgeSecs] - minimum age of chunks to delete
* @param {number} [options.timeout] - maximum time to spend deleting chunks
*
* @return {Promise<number>} number of chunks deleted
*/
async function deleteOldChunks(options = {}) {
const batchSize = options.batchSize ?? DEFAULT_DELETE_BATCH_SIZE
const maxBatches = options.maxBatches ?? Number.MAX_SAFE_INTEGER
const minAgeSecs = options.minAgeSecs ?? DEFAULT_DELETE_MIN_AGE_SECS
const timeout = options.timeout ?? DEFAULT_DELETE_TIMEOUT_SECS
assert.greater(batchSize, 0)
assert.greater(timeout, 0)
assert.greater(maxBatches, 0)
assert.greaterOrEqual(minAgeSecs, 0)
const timeoutAfter = Date.now() + timeout * 1000
let deletedChunksTotal = 0
for (const backend of [postgresBackend, mongoBackend]) {
for (let i = 0; i < maxBatches; i++) {
if (Date.now() > timeoutAfter) {
break
}
const deletedChunks = await deleteOldChunksBatch(
backend,
batchSize,
minAgeSecs
)
deletedChunksTotal += deletedChunks.length
if (deletedChunks.length !== batchSize) {
// Last batch was incomplete. There probably are no old chunks left
break
}
}
}
return deletedChunksTotal
}
async function deleteOldChunksBatch(backend, count, minAgeSecs) {
assert.greater(count, 0, 'bad count')
assert.greaterOrEqual(minAgeSecs, 0, 'bad minAgeSecs')
const oldChunks = await backend.getOldChunksBatch(count, minAgeSecs)
if (oldChunks.length === 0) {
return []
}
await historyStore.deleteChunks(oldChunks)
await backend.deleteOldChunks(oldChunks.map(chunk => chunk.chunkId))
return oldChunks
}
/**
* Returns the appropriate backend for the given project id
*
* Numeric ids use the Postgres backend.
* Strings of 24 characters use the Mongo backend.
*/
function getBackend(projectId) {
if (assert.POSTGRES_ID_REGEXP.test(projectId)) {
return postgresBackend
} else if (assert.MONGO_ID_REGEXP.test(projectId)) {
return mongoBackend
} else {
throw new OError('bad project id', { projectId })
}
}
class AlreadyInitialized extends OError {
constructor(projectId) {
super('Project is already initialized', { projectId })
}
}
module.exports = {
getBackend,
initializeProject,
loadLatest,
loadLatestRaw,
loadAtVersion,
loadAtTimestamp,
loadByChunkRecord,
create,
update,
destroy,
getChunkIdForVersion,
getChunkMetadataForVersion,
getProjectChunkIds,
getProjectChunks,
getProjectChunksFromVersion,
deleteProjectChunks,
deleteOldChunks,
AlreadyInitialized,
ChunkVersionConflictError,
}

View File

@@ -0,0 +1,526 @@
// @ts-check
const { ObjectId, ReadPreference, MongoError } = require('mongodb')
const { Chunk } = require('overleaf-editor-core')
const OError = require('@overleaf/o-error')
const assert = require('../assert')
const mongodb = require('../mongodb')
const { ChunkVersionConflictError } = require('./errors')
const DUPLICATE_KEY_ERROR_CODE = 11000
/**
* @import { ClientSession } from 'mongodb'
*/
/**
* Get the latest chunk's metadata from the database
* @param {string} projectId
* @param {Object} [opts]
* @param {boolean} [opts.readOnly]
*/
async function getLatestChunk(projectId, opts = {}) {
assert.mongoId(projectId, 'bad projectId')
const { readOnly = false } = opts
const record = await mongodb.chunks.findOne(
{
projectId: new ObjectId(projectId),
state: { $in: ['active', 'closed'] },
},
{
sort: { startVersion: -1 },
readPreference: readOnly
? ReadPreference.secondaryPreferred
: ReadPreference.primary,
}
)
if (record == null) {
return null
}
return chunkFromRecord(record)
}
/**
* Get the metadata for the chunk that contains the given version.
*/
async function getChunkForVersion(projectId, version) {
assert.mongoId(projectId, 'bad projectId')
assert.integer(version, 'bad version')
const record = await mongodb.chunks.findOne(
{
projectId: new ObjectId(projectId),
state: { $in: ['active', 'closed'] },
startVersion: { $lte: version },
endVersion: { $gte: version },
},
{ sort: { startVersion: 1 } }
)
if (record == null) {
throw new Chunk.VersionNotFoundError(projectId, version)
}
return chunkFromRecord(record)
}
/**
* Get the metadata for the chunk that contains the given version before the endTime.
*/
async function getFirstChunkBeforeTimestamp(projectId, timestamp) {
assert.mongoId(projectId, 'bad projectId')
assert.date(timestamp, 'bad timestamp')
const recordActive = await getChunkForVersion(projectId, 0)
if (recordActive && recordActive.endTimestamp <= timestamp) {
return recordActive
}
// fallback to deleted chunk
const recordDeleted = await mongodb.chunks.findOne(
{
projectId: new ObjectId(projectId),
state: 'deleted',
startVersion: 0,
updatedAt: { $lte: timestamp }, // indexed for state=deleted
endTimestamp: { $lte: timestamp },
},
{ sort: { updatedAt: -1 } }
)
if (recordDeleted) {
return chunkFromRecord(recordDeleted)
}
throw new Chunk.BeforeTimestampNotFoundError(projectId, timestamp)
}
/**
* Get the metadata for the chunk that contains the version that was current at
* the given timestamp.
*/
async function getChunkForTimestamp(projectId, timestamp) {
assert.mongoId(projectId, 'bad projectId')
assert.date(timestamp, 'bad timestamp')
const record = await mongodb.chunks.findOne(
{
projectId: new ObjectId(projectId),
state: { $in: ['active', 'closed'] },
endTimestamp: { $gte: timestamp },
},
// We use the index on the startVersion for sorting records. This assumes
// that timestamps go up with each version.
{ sort: { startVersion: 1 } }
)
if (record == null) {
// Couldn't find a chunk that had modifications after the given timestamp.
// Fetch the latest chunk instead.
const chunk = await getLatestChunk(projectId)
if (chunk == null) {
throw new Chunk.BeforeTimestampNotFoundError(projectId, timestamp)
}
return chunk
}
return chunkFromRecord(record)
}
/**
* Get the metadata for the chunk that contains the version that was current before
* the given timestamp.
*/
async function getLastActiveChunkBeforeTimestamp(projectId, timestamp) {
assert.mongoId(projectId, 'bad projectId')
assert.date(timestamp, 'bad timestamp')
const record = await mongodb.chunks.findOne(
{
projectId: new ObjectId(projectId),
state: { $in: ['active', 'closed'] },
$or: [
{
endTimestamp: {
$lte: timestamp,
},
},
{
endTimestamp: null,
},
],
},
// We use the index on the startVersion for sorting records. This assumes
// that timestamps go up with each version.
{ sort: { startVersion: -1 } }
)
if (record == null) {
throw new Chunk.BeforeTimestampNotFoundError(projectId, timestamp)
}
return chunkFromRecord(record)
}
/**
* Get all of a project's chunk ids
*/
async function getProjectChunkIds(projectId) {
assert.mongoId(projectId, 'bad projectId')
const cursor = mongodb.chunks.find(
{
projectId: new ObjectId(projectId),
state: { $in: ['active', 'closed'] },
},
{ projection: { _id: 1 } }
)
return await cursor.map(record => record._id).toArray()
}
/**
* Get all of a projects chunks directly
*/
async function getProjectChunks(projectId) {
assert.mongoId(projectId, 'bad projectId')
const cursor = mongodb.chunks
.find(
{
projectId: new ObjectId(projectId),
state: { $in: ['active', 'closed'] },
},
{ projection: { state: 0 } }
)
.sort({ startVersion: 1 })
return await cursor.map(chunkFromRecord).toArray()
}
/**
* Insert a pending chunk before sending it to object storage.
*/
async function insertPendingChunk(projectId, chunk) {
assert.mongoId(projectId, 'bad projectId')
assert.instance(chunk, Chunk, 'bad chunk')
const chunkId = new ObjectId()
await mongodb.chunks.insertOne({
_id: chunkId,
projectId: new ObjectId(projectId),
startVersion: chunk.getStartVersion(),
endVersion: chunk.getEndVersion(),
endTimestamp: chunk.getEndTimestamp(),
state: 'pending',
updatedAt: new Date(),
})
return chunkId.toString()
}
/**
* Record that a new chunk was created.
*
* @param {string} projectId
* @param {Chunk} chunk
* @param {string} chunkId
* @param {object} opts
* @param {Date} [opts.earliestChangeTimestamp]
* @param {string} [opts.oldChunkId]
*/
async function confirmCreate(projectId, chunk, chunkId, opts = {}) {
assert.mongoId(projectId, 'bad projectId')
assert.instance(chunk, Chunk, 'bad newChunk')
assert.mongoId(chunkId, 'bad newChunkId')
await mongodb.client.withSession(async session => {
await session.withTransaction(async () => {
if (opts.oldChunkId != null) {
await closeChunk(projectId, opts.oldChunkId, { session })
}
await activateChunk(projectId, chunkId, { session })
await updateProjectRecord(
projectId,
chunk,
opts.earliestChangeTimestamp,
{ session }
)
})
})
}
/**
* Write the metadata to the project record
*/
async function updateProjectRecord(
projectId,
chunk,
earliestChangeTimestamp,
mongoOpts = {}
) {
// record the end version against the project
await mongodb.projects.updateOne(
{
'overleaf.history.id': projectId, // string for Object ids, number for postgres ids
},
{
// always store the latest end version and timestamp for the chunk
$max: {
'overleaf.history.currentEndVersion': chunk.getEndVersion(),
'overleaf.history.currentEndTimestamp': chunk.getEndTimestamp(),
'overleaf.history.updatedAt': new Date(),
},
// store the first pending change timestamp for the chunk, this will
// be cleared every time a backup is completed.
$min: {
'overleaf.backup.pendingChangeAt':
earliestChangeTimestamp || chunk.getEndTimestamp() || new Date(),
},
},
mongoOpts
)
}
/**
* Record that a chunk was replaced by a new one.
*
* @param {string} projectId
* @param {string} oldChunkId
* @param {Chunk} newChunk
* @param {string} newChunkId
* @param {object} [opts]
* @param {Date} [opts.earliestChangeTimestamp]
*/
async function confirmUpdate(
projectId,
oldChunkId,
newChunk,
newChunkId,
opts = {}
) {
assert.mongoId(projectId, 'bad projectId')
assert.mongoId(oldChunkId, 'bad oldChunkId')
assert.instance(newChunk, Chunk, 'bad newChunk')
assert.mongoId(newChunkId, 'bad newChunkId')
await mongodb.client.withSession(async session => {
await session.withTransaction(async () => {
await deleteActiveChunk(projectId, oldChunkId, { session })
await activateChunk(projectId, newChunkId, { session })
await updateProjectRecord(
projectId,
newChunk,
opts.earliestChangeTimestamp,
{ session }
)
})
})
}
/**
* Activate a pending chunk
*
* @param {string} projectId
* @param {string} chunkId
* @param {object} [opts]
* @param {ClientSession} [opts.session]
*/
async function activateChunk(projectId, chunkId, opts = {}) {
assert.mongoId(projectId, 'bad projectId')
assert.mongoId(chunkId, 'bad chunkId')
let result
try {
result = await mongodb.chunks.updateOne(
{
_id: new ObjectId(chunkId),
projectId: new ObjectId(projectId),
state: 'pending',
},
{ $set: { state: 'active', updatedAt: new Date() } },
opts
)
} catch (err) {
if (err instanceof MongoError && err.code === DUPLICATE_KEY_ERROR_CODE) {
throw new ChunkVersionConflictError('chunk start version is not unique', {
projectId,
chunkId,
})
} else {
throw err
}
}
if (result.matchedCount === 0) {
throw new OError('pending chunk not found', { projectId, chunkId })
}
}
/**
* Close a chunk
*
* A closed chunk is one that can't be extended anymore.
*
* @param {string} projectId
* @param {string} chunkId
* @param {object} [opts]
* @param {ClientSession} [opts.session]
*/
async function closeChunk(projectId, chunkId, opts = {}) {
const result = await mongodb.chunks.updateOne(
{
_id: new ObjectId(chunkId),
projectId: new ObjectId(projectId),
state: 'active',
},
{ $set: { state: 'closed' } },
opts
)
if (result.matchedCount === 0) {
throw new ChunkVersionConflictError('unable to close chunk', {
projectId,
chunkId,
})
}
}
/**
* Delete an active chunk
*
* This is used to delete chunks that are in the process of being extended. It
* will refuse to delete chunks that are already closed and can therefore not be
* extended.
*
* @param {string} projectId
* @param {string} chunkId
* @param {object} [opts]
* @param {ClientSession} [opts.session]
*/
async function deleteActiveChunk(projectId, chunkId, opts = {}) {
const updateResult = await mongodb.chunks.updateOne(
{
_id: new ObjectId(chunkId),
projectId: new ObjectId(projectId),
state: 'active',
},
{ $set: { state: 'deleted', updatedAt: new Date() } },
opts
)
if (updateResult.matchedCount === 0) {
throw new ChunkVersionConflictError('unable to delete active chunk', {
projectId,
chunkId,
})
}
}
/**
* Delete a chunk.
*
* @param {string} projectId
* @param {string} chunkId
* @return {Promise}
*/
async function deleteChunk(projectId, chunkId, mongoOpts = {}) {
assert.mongoId(projectId, 'bad projectId')
assert.mongoId(chunkId, 'bad chunkId')
await mongodb.chunks.updateOne(
{ _id: new ObjectId(chunkId), projectId: new ObjectId(projectId) },
{ $set: { state: 'deleted', updatedAt: new Date() } },
mongoOpts
)
}
/**
* Delete all of a project's chunks
*/
async function deleteProjectChunks(projectId) {
assert.mongoId(projectId, 'bad projectId')
await mongodb.chunks.updateMany(
{
projectId: new ObjectId(projectId),
state: { $in: ['active', 'closed'] },
},
{ $set: { state: 'deleted', updatedAt: new Date() } }
)
}
/**
* Get a batch of old chunks for deletion
*/
async function getOldChunksBatch(count, minAgeSecs) {
const maxUpdatedAt = new Date(Date.now() - minAgeSecs * 1000)
const batch = []
// We need to fetch one state at a time to take advantage of the partial
// indexes on the chunks collection.
//
// Mongo 6.0 allows partial indexes that use the $in operator. When we reach
// that Mongo version, we can create a partial index on both the deleted and
// pending states and simplify this logic a bit.
for (const state of ['deleted', 'pending']) {
if (count === 0) {
// There's no more space in the batch
break
}
const cursor = mongodb.chunks
.find(
{ state, updatedAt: { $lt: maxUpdatedAt } },
{
limit: count,
projection: { _id: 1, projectId: 1 },
}
)
.map(record => ({
chunkId: record._id.toString(),
projectId: record.projectId.toString(),
}))
for await (const record of cursor) {
batch.push(record)
count -= 1
}
}
return batch
}
/**
* Delete a batch of old chunks from the database
*/
async function deleteOldChunks(chunkIds) {
await mongodb.chunks.deleteMany({
_id: { $in: chunkIds.map(id => new ObjectId(id)) },
state: { $in: ['deleted', 'pending'] },
})
}
/**
* Build a chunk metadata object from the database record
*/
function chunkFromRecord(record) {
return {
id: record._id.toString(),
startVersion: record.startVersion,
endVersion: record.endVersion,
endTimestamp: record.endTimestamp,
}
}
module.exports = {
getLatestChunk,
getFirstChunkBeforeTimestamp,
getLastActiveChunkBeforeTimestamp,
getChunkForVersion,
getChunkForTimestamp,
getProjectChunkIds,
getProjectChunks,
insertPendingChunk,
confirmCreate,
confirmUpdate,
updateProjectRecord,
deleteChunk,
deleteProjectChunks,
getOldChunksBatch,
deleteOldChunks,
}

View File

@@ -0,0 +1,487 @@
// @ts-check
const { Chunk } = require('overleaf-editor-core')
const assert = require('../assert')
const knex = require('../knex')
const knexReadOnly = require('../knex_read_only')
const { ChunkVersionConflictError } = require('./errors')
const { updateProjectRecord } = require('./mongo')
const DUPLICATE_KEY_ERROR_CODE = '23505'
/**
* @import { Knex } from 'knex'
*/
/**
* Get the latest chunk's metadata from the database
* @param {string} projectId
* @param {Object} [opts]
* @param {boolean} [opts.readOnly]
*/
async function getLatestChunk(projectId, opts = {}) {
assert.postgresId(projectId, 'bad projectId')
const { readOnly = false } = opts
const record = await (readOnly ? knexReadOnly : knex)('chunks')
.where('doc_id', parseInt(projectId, 10))
.orderBy('end_version', 'desc')
.first()
if (record == null) {
return null
}
return chunkFromRecord(record)
}
/**
* Get the metadata for the chunk that contains the given version.
*
* @param {string} projectId
* @param {number} version
*/
async function getChunkForVersion(projectId, version) {
assert.postgresId(projectId, 'bad projectId')
const record = await knex('chunks')
.where('doc_id', parseInt(projectId, 10))
.where('end_version', '>=', version)
.orderBy('end_version')
.first()
if (!record) {
throw new Chunk.VersionNotFoundError(projectId, version)
}
return chunkFromRecord(record)
}
/**
* Get the metadata for the chunk that contains the given version.
*
* @param {string} projectId
* @param {Date} timestamp
*/
async function getFirstChunkBeforeTimestamp(projectId, timestamp) {
assert.date(timestamp, 'bad timestamp')
const recordActive = await getChunkForVersion(projectId, 0)
// projectId must be valid if getChunkForVersion did not throw
if (recordActive && recordActive.endTimestamp <= timestamp) {
return recordActive
}
// fallback to deleted chunk
const recordDeleted = await knex('old_chunks')
.where('doc_id', parseInt(projectId, 10))
.where('start_version', '=', 0)
.where('end_timestamp', '<=', timestamp)
.orderBy('end_version', 'desc')
.first()
if (recordDeleted) {
return chunkFromRecord(recordDeleted)
}
throw new Chunk.BeforeTimestampNotFoundError(projectId, timestamp)
}
/**
* Get the metadata for the chunk that contains the version that was current at
* the given timestamp.
*
* @param {string} projectId
* @param {Date} timestamp
*/
async function getLastActiveChunkBeforeTimestamp(projectId, timestamp) {
assert.date(timestamp, 'bad timestamp')
assert.postgresId(projectId, 'bad projectId')
const query = knex('chunks')
.where('doc_id', parseInt(projectId, 10))
.where(function () {
this.where('end_timestamp', '<=', timestamp).orWhere(
'end_timestamp',
null
)
})
.orderBy('end_version', 'desc', 'last')
const record = await query.first()
if (!record) {
throw new Chunk.BeforeTimestampNotFoundError(projectId, timestamp)
}
return chunkFromRecord(record)
}
/**
* Get the metadata for the chunk that contains the version that was current at
* the given timestamp.
*
* @param {string} projectId
* @param {Date} timestamp
*/
async function getChunkForTimestamp(projectId, timestamp) {
assert.postgresId(projectId, 'bad projectId')
// This query will find the latest chunk after the timestamp (query orders
// in reverse chronological order), OR the latest chunk
// This accounts for the case where the timestamp is ahead of the chunk's
// timestamp and therefore will not return any results
const whereAfterEndTimestampOrLatestChunk = knex.raw(
'end_timestamp >= ? ' +
'OR id = ( ' +
'SELECT id FROM chunks ' +
'WHERE doc_id = ? ' +
'ORDER BY end_version desc LIMIT 1' +
')',
[timestamp, parseInt(projectId, 10)]
)
const record = await knex('chunks')
.where('doc_id', parseInt(projectId, 10))
.where(whereAfterEndTimestampOrLatestChunk)
.orderBy('end_version')
.first()
if (!record) {
throw new Chunk.BeforeTimestampNotFoundError(projectId, timestamp)
}
return chunkFromRecord(record)
}
/**
* Build a chunk metadata object from the database record
*/
function chunkFromRecord(record) {
return {
id: record.id.toString(),
startVersion: record.start_version,
endVersion: record.end_version,
endTimestamp: record.end_timestamp,
}
}
/**
* Get all of a project's chunk ids
*
* @param {string} projectId
*/
async function getProjectChunkIds(projectId) {
assert.postgresId(projectId, 'bad projectId')
const records = await knex('chunks')
.select('id')
.where('doc_id', parseInt(projectId, 10))
return records.map(record => record.id)
}
/**
* Get all of a projects chunks directly
*
* @param {string} projectId
*/
async function getProjectChunks(projectId) {
assert.postgresId(projectId, 'bad projectId')
const records = await knex('chunks')
.select()
.where('doc_id', parseInt(projectId, 10))
.orderBy('end_version')
return records.map(chunkFromRecord)
}
/**
* Insert a pending chunk before sending it to object storage.
*
* @param {string} projectId
* @param {Chunk} chunk
*/
async function insertPendingChunk(projectId, chunk) {
assert.postgresId(projectId, 'bad projectId')
const result = await knex.first(
knex.raw("nextval('chunks_id_seq'::regclass)::integer as chunkid")
)
const chunkId = result.chunkid
await knex('pending_chunks').insert({
id: chunkId,
doc_id: parseInt(projectId, 10),
end_version: chunk.getEndVersion(),
start_version: chunk.getStartVersion(),
end_timestamp: chunk.getEndTimestamp(),
})
return chunkId.toString()
}
/**
* Record that a new chunk was created.
*
* @param {string} projectId
* @param {Chunk} chunk
* @param {string} chunkId
* @param {object} opts
* @param {Date} [opts.earliestChangeTimestamp]
* @param {string} [opts.oldChunkId]
*/
async function confirmCreate(projectId, chunk, chunkId, opts = {}) {
assert.postgresId(projectId, 'bad projectId')
await knex.transaction(async tx => {
if (opts.oldChunkId != null) {
await _assertChunkIsNotClosed(tx, projectId, opts.oldChunkId)
await _closeChunk(tx, projectId, opts.oldChunkId)
}
await Promise.all([
_deletePendingChunk(tx, projectId, chunkId),
_insertChunk(tx, projectId, chunk, chunkId),
])
await updateProjectRecord(
// The history id in Mongo is an integer for Postgres projects
parseInt(projectId, 10),
chunk,
opts.earliestChangeTimestamp
)
})
}
/**
* Record that a chunk was replaced by a new one.
*
* @param {string} projectId
* @param {string} oldChunkId
* @param {Chunk} newChunk
* @param {string} newChunkId
*/
async function confirmUpdate(
projectId,
oldChunkId,
newChunk,
newChunkId,
opts = {}
) {
assert.postgresId(projectId, 'bad projectId')
await knex.transaction(async tx => {
await _assertChunkIsNotClosed(tx, projectId, oldChunkId)
await _deleteChunks(tx, { doc_id: projectId, id: oldChunkId })
await Promise.all([
_deletePendingChunk(tx, projectId, newChunkId),
_insertChunk(tx, projectId, newChunk, newChunkId),
])
await updateProjectRecord(
// The history id in Mongo is an integer for Postgres projects
parseInt(projectId, 10),
newChunk,
opts.earliestChangeTimestamp
)
})
}
/**
* Delete a pending chunk
*
* @param {Knex} tx
* @param {string} projectId
* @param {string} chunkId
*/
async function _deletePendingChunk(tx, projectId, chunkId) {
await tx('pending_chunks')
.where({
doc_id: parseInt(projectId, 10),
id: parseInt(chunkId, 10),
})
.del()
}
/**
* Adds an active chunk
*
* @param {Knex} tx
* @param {string} projectId
* @param {Chunk} chunk
* @param {string} chunkId
*/
async function _insertChunk(tx, projectId, chunk, chunkId) {
const startVersion = chunk.getStartVersion()
const endVersion = chunk.getEndVersion()
try {
await tx('chunks').insert({
id: parseInt(chunkId, 10),
doc_id: parseInt(projectId, 10),
start_version: startVersion,
end_version: endVersion,
end_timestamp: chunk.getEndTimestamp(),
})
} catch (err) {
if (
err instanceof Error &&
'code' in err &&
err.code === DUPLICATE_KEY_ERROR_CODE
) {
throw new ChunkVersionConflictError(
'chunk start or end version is not unique',
{ projectId, chunkId, startVersion, endVersion }
)
}
throw err
}
}
/**
* Check that a chunk is not closed
*
* This is used to synchronize chunk creations and extensions.
*
* @param {Knex} tx
* @param {string} projectId
* @param {string} chunkId
*/
async function _assertChunkIsNotClosed(tx, projectId, chunkId) {
const record = await tx('chunks')
.forUpdate()
.select('closed')
.where('doc_id', parseInt(projectId, 10))
.where('id', parseInt(chunkId, 10))
.first()
if (!record) {
throw new ChunkVersionConflictError('unable to close chunk: not found', {
projectId,
chunkId,
})
}
if (record.closed) {
throw new ChunkVersionConflictError(
'unable to close chunk: already closed',
{
projectId,
chunkId,
}
)
}
}
/**
* Close a chunk
*
* A closed chunk can no longer be extended.
*
* @param {Knex} tx
* @param {string} projectId
* @param {string} chunkId
*/
async function _closeChunk(tx, projectId, chunkId) {
await tx('chunks')
.update({ closed: true })
.where('doc_id', parseInt(projectId, 10))
.where('id', parseInt(chunkId, 10))
}
/**
* Delete a chunk.
*
* @param {string} projectId
* @param {string} chunkId
*/
async function deleteChunk(projectId, chunkId) {
assert.postgresId(projectId, 'bad projectId')
assert.integer(chunkId, 'bad chunkId')
await _deleteChunks(knex, {
doc_id: parseInt(projectId, 10),
id: parseInt(chunkId, 10),
})
}
/**
* Delete all of a project's chunks
*
* @param {string} projectId
*/
async function deleteProjectChunks(projectId) {
assert.postgresId(projectId, 'bad projectId')
await knex.transaction(async tx => {
await _deleteChunks(knex, { doc_id: parseInt(projectId, 10) })
})
}
/**
* Delete many chunks
*
* @param {Knex} tx
* @param {any} whereClause
*/
async function _deleteChunks(tx, whereClause) {
const rows = await tx('chunks').where(whereClause).del().returning('*')
if (rows.length === 0) {
return
}
const oldChunks = rows.map(row => ({
doc_id: row.doc_id,
chunk_id: row.id,
start_version: row.start_version,
end_version: row.end_version,
end_timestamp: row.end_timestamp,
deleted_at: tx.fn.now(),
}))
await tx('old_chunks').insert(oldChunks)
}
/**
* Get a batch of old chunks for deletion
*
* @param {number} count
* @param {number} minAgeSecs
*/
async function getOldChunksBatch(count, minAgeSecs) {
const maxDeletedAt = new Date(Date.now() - minAgeSecs * 1000)
const records = await knex('old_chunks')
.whereNull('deleted_at')
.orWhere('deleted_at', '<', maxDeletedAt)
.orderBy('chunk_id')
.limit(count)
return records.map(oldChunk => ({
projectId: oldChunk.doc_id.toString(),
chunkId: oldChunk.chunk_id.toString(),
}))
}
/**
* Delete a batch of old chunks from the database
*
* @param {string[]} chunkIds
*/
async function deleteOldChunks(chunkIds) {
await knex('old_chunks')
.whereIn(
'chunk_id',
chunkIds.map(id => parseInt(id, 10))
)
.del()
}
/**
* Generate a new project id
*/
async function generateProjectId() {
const record = await knex.first(
knex.raw("nextval('docs_id_seq'::regclass)::integer as doc_id")
)
return record.doc_id.toString()
}
module.exports = {
getLatestChunk,
getFirstChunkBeforeTimestamp,
getLastActiveChunkBeforeTimestamp,
getChunkForVersion,
getChunkForTimestamp,
getProjectChunkIds,
getProjectChunks,
insertPendingChunk,
confirmCreate,
confirmUpdate,
deleteChunk,
deleteProjectChunks,
getOldChunksBatch,
deleteOldChunks,
generateProjectId,
}

View File

@@ -0,0 +1,254 @@
const metrics = require('@overleaf/metrics')
const logger = require('@overleaf/logger')
const redis = require('../redis')
const rclient = redis.rclientHistory //
const { Snapshot, Change, History, Chunk } = require('overleaf-editor-core')
const TEMPORARY_CACHE_LIFETIME = 300 // 5 minutes
const keySchema = {
snapshot({ projectId }) {
return `snapshot:{${projectId}}`
},
startVersion({ projectId }) {
return `snapshot-version:{${projectId}}`
},
changes({ projectId }) {
return `changes:{${projectId}}`
},
}
rclient.defineCommand('get_current_chunk', {
numberOfKeys: 3,
lua: `
local startVersionValue = redis.call('GET', KEYS[2])
if not startVersionValue then
return nil -- this is a cache-miss
end
local snapshotValue = redis.call('GET', KEYS[1])
local changesValues = redis.call('LRANGE', KEYS[3], 0, -1)
return {snapshotValue, startVersionValue, changesValues}
`,
})
/**
* Retrieves the current chunk of project history from Redis storage
* @param {string} projectId - The unique identifier of the project
* @returns {Promise<Chunk|null>} A Promise that resolves to a Chunk object containing project history,
* or null if retrieval fails
* @throws {Error} If Redis operations fail
*/
async function getCurrentChunk(projectId) {
try {
const result = await rclient.get_current_chunk(
keySchema.snapshot({ projectId }),
keySchema.startVersion({ projectId }),
keySchema.changes({ projectId })
)
if (!result) {
return null // cache-miss
}
const snapshot = Snapshot.fromRaw(JSON.parse(result[0]))
const startVersion = JSON.parse(result[1])
const changes = result[2].map(c => Change.fromRaw(JSON.parse(c)))
const history = new History(snapshot, changes)
const chunk = new Chunk(history, startVersion)
metrics.inc('chunk_store.redis.get_current_chunk', 1, { status: 'success' })
return chunk
} catch (err) {
logger.error({ err, projectId }, 'error getting current chunk from redis')
metrics.inc('chunk_store.redis.get_current_chunk', 1, { status: 'error' })
return null
}
}
rclient.defineCommand('get_current_chunk_metadata', {
numberOfKeys: 2,
lua: `
local startVersionValue = redis.call('GET', KEYS[1])
local changesCount = redis.call('LLEN', KEYS[2])
return {startVersionValue, changesCount}
`,
})
/**
* Retrieves the current chunk metadata for a given project from Redis
* @param {string} projectId - The ID of the project to get metadata for
* @returns {Promise<Object|null>} Object containing startVersion and changesCount if found, null on error or cache miss
* @property {number} startVersion - The starting version information
* @property {number} changesCount - The number of changes in the chunk
*/
async function getCurrentChunkMetadata(projectId) {
try {
const result = await rclient.get_current_chunk_metadata(
keySchema.startVersion({ projectId }),
keySchema.changes({ projectId })
)
if (!result) {
return null // cache-miss
}
const startVersion = JSON.parse(result[0])
const changesCount = parseInt(result[1], 10)
return { startVersion, changesCount }
} catch (err) {
return null
}
}
rclient.defineCommand('set_current_chunk', {
numberOfKeys: 3,
lua: `
local snapshotValue = ARGV[1]
local startVersionValue = ARGV[2]
redis.call('SETEX', KEYS[1], ${TEMPORARY_CACHE_LIFETIME}, snapshotValue)
redis.call('SETEX', KEYS[2], ${TEMPORARY_CACHE_LIFETIME}, startVersionValue)
redis.call('DEL', KEYS[3]) -- clear the old changes list
if #ARGV >= 3 then
redis.call('RPUSH', KEYS[3], unpack(ARGV, 3))
redis.call('EXPIRE', KEYS[3], ${TEMPORARY_CACHE_LIFETIME})
end
`,
})
/**
* Stores the current chunk of project history in Redis
* @param {string} projectId - The ID of the project
* @param {Chunk} chunk - The chunk object containing history data
* @returns {Promise<*>} Returns the result of the Redis operation, or null if an error occurs
* @throws {Error} May throw Redis-related errors which are caught internally
*/
async function setCurrentChunk(projectId, chunk) {
try {
const snapshotKey = keySchema.snapshot({ projectId })
const startVersionKey = keySchema.startVersion({ projectId })
const changesKey = keySchema.changes({ projectId })
const snapshot = chunk.history.snapshot
const startVersion = chunk.startVersion
const changes = chunk.history.changes
await rclient.set_current_chunk(
snapshotKey,
startVersionKey,
changesKey,
JSON.stringify(snapshot.toRaw()),
startVersion,
...changes.map(c => JSON.stringify(c.toRaw()))
)
metrics.inc('chunk_store.redis.set_current_chunk', 1, { status: 'success' })
} catch (err) {
logger.error(
{ err, projectId, chunk },
'error setting current chunk inredis'
)
metrics.inc('chunk_store.redis.set_current_chunk', 1, { status: 'error' })
return null // while testing we will suppress any errors
}
}
/**
* Checks whether a cached chunk's version metadata matches the current chunk's metadata
* @param {Chunk} cachedChunk - The chunk retrieved from cache
* @param {Chunk} currentChunk - The current chunk to compare against
* @returns {boolean} - Returns true if the chunks have matching start and end versions, false otherwise
*/
function checkCacheValidity(cachedChunk, currentChunk) {
return Boolean(
cachedChunk &&
cachedChunk.getStartVersion() === currentChunk.getStartVersion() &&
cachedChunk.getEndVersion() === currentChunk.getEndVersion()
)
}
/**
* Validates if a cached chunk matches the current chunk metadata by comparing versions
* @param {Object} cachedChunk - The cached chunk object to validate
* @param {Object} currentChunkMetadata - The current chunk metadata to compare against
* @param {number} currentChunkMetadata.startVersion - The starting version number
* @param {number} currentChunkMetadata.endVersion - The ending version number
* @returns {boolean} - True if the cached chunk is valid, false otherwise
*/
function checkCacheValidityWithMetadata(cachedChunk, currentChunkMetadata) {
return Boolean(
cachedChunk &&
cachedChunk.getStartVersion() === currentChunkMetadata.startVersion &&
cachedChunk.getEndVersion() === currentChunkMetadata.endVersion
)
}
/**
* Compares two chunks for equality using stringified JSON comparison
* @param {string} projectId - The ID of the project
* @param {Chunk} cachedChunk - The cached chunk to compare
* @param {Chunk} currentChunk - The current chunk to compare against
* @returns {boolean} - Returns false if either chunk is null/undefined, otherwise returns the comparison result
*/
function compareChunks(projectId, cachedChunk, currentChunk) {
if (!cachedChunk || !currentChunk) {
return false
}
const identical = JSON.stringify(cachedChunk) === JSON.stringify(currentChunk)
if (!identical) {
try {
logger.error(
{
projectId,
cachedChunkStartVersion: cachedChunk.getStartVersion(),
cachedChunkEndVersion: cachedChunk.getEndVersion(),
currentChunkStartVersion: currentChunk.getStartVersion(),
currentChunkEndVersion: currentChunk.getEndVersion(),
},
'chunk cache mismatch'
)
} catch (err) {
// ignore errors while logging
}
}
metrics.inc('chunk_store.redis.compare_chunks', 1, {
status: identical ? 'success' : 'fail',
})
return identical
}
// Define Lua script for atomic cache clearing
rclient.defineCommand('clear_chunk_cache', {
numberOfKeys: 3,
lua: `
-- Delete all keys related to a project's chunk cache atomically
redis.call('DEL', KEYS[1]) -- snapshot key
redis.call('DEL', KEYS[2]) -- startVersion key
redis.call('DEL', KEYS[3]) -- changes key
return 1
`,
})
/**
* Clears all cache entries for a project's chunk data
* @param {string} projectId - The ID of the project whose cache should be cleared
* @returns {Promise<boolean>} A promise that resolves to true if successful, false on error
*/
async function clearCache(projectId) {
try {
const snapshotKey = keySchema.snapshot({ projectId })
const startVersionKey = keySchema.startVersion({ projectId })
const changesKey = keySchema.changes({ projectId })
await rclient.clear_chunk_cache(snapshotKey, startVersionKey, changesKey)
metrics.inc('chunk_store.redis.clear_cache', 1, { status: 'success' })
return true
} catch (err) {
logger.error({ err, projectId }, 'error clearing chunk cache from redis')
metrics.inc('chunk_store.redis.clear_cache', 1, { status: 'error' })
return false
}
}
module.exports = {
getCurrentChunk,
setCurrentChunk,
getCurrentChunkMetadata,
checkCacheValidity,
checkCacheValidityWithMetadata,
compareChunks,
clearCache,
}

View File

@@ -0,0 +1,18 @@
// @ts-check
const { createHash } = require('node:crypto')
/**
* Compute a SHA-1 hash of the content
*
* This is used to validate incoming updates.
*
* @param {string} content
*/
function getContentHash(content) {
const hash = createHash('sha-1')
hash.update(content)
return hash.digest('hex')
}
module.exports = { getContentHash }

View File

@@ -0,0 +1,5 @@
const OError = require('@overleaf/o-error')
class InvalidChangeError extends OError {}
module.exports = { InvalidChangeError }

View File

@@ -0,0 +1,30 @@
const Blob = require('overleaf-editor-core').Blob
const blobHash = require('./blob_hash')
const BPromise = require('bluebird')
// We want to simulate applying all of the operations so we can return the
// resulting hashes to the caller for them to check. To do this, we need to be
// able to take the lazy files in the final snapshot, fetch their content, and
// compute the new content hashes. We don't, however, need to actually store
// that content; we just need to get the hash.
function HashCheckBlobStore(realBlobStore) {
this.realBlobStore = realBlobStore
}
HashCheckBlobStore.prototype.getString = BPromise.method(
function hashCheckBlobStoreGetString(hash) {
return this.realBlobStore.getString(hash)
}
)
HashCheckBlobStore.prototype.putString = BPromise.method(
function hashCheckBlobStorePutString(string) {
return new Blob(
blobHash.fromString(string),
Buffer.byteLength(string),
string.length
)
}
)
module.exports = HashCheckBlobStore

View File

@@ -0,0 +1,202 @@
// @ts-check
'use strict'
const core = require('overleaf-editor-core')
const config = require('config')
const path = require('node:path')
const Stream = require('node:stream')
const { promisify } = require('node:util')
const zlib = require('node:zlib')
const OError = require('@overleaf/o-error')
const objectPersistor = require('@overleaf/object-persistor')
const logger = require('@overleaf/logger')
const assert = require('./assert')
const persistor = require('./persistor')
const projectKey = require('./project_key')
const streams = require('./streams')
const Chunk = core.Chunk
const gzip = promisify(zlib.gzip)
const gunzip = promisify(zlib.gunzip)
class LoadError extends OError {
/**
* @param {string} projectId
* @param {string} chunkId
* @param {any} cause
*/
constructor(projectId, chunkId, cause) {
super(
'HistoryStore: failed to load chunk history',
{ projectId, chunkId },
cause
)
this.projectId = projectId
this.chunkId = chunkId
}
}
class StoreError extends OError {
/**
* @param {string} projectId
* @param {string} chunkId
* @param {any} cause
*/
constructor(projectId, chunkId, cause) {
super(
'HistoryStore: failed to store chunk history',
{ projectId, chunkId },
cause
)
this.projectId = projectId
this.chunkId = chunkId
}
}
/**
* @param {string} projectId
* @param {string} chunkId
* @return {string}
*/
function getKey(projectId, chunkId) {
return path.join(projectKey.format(projectId), projectKey.pad(chunkId))
}
/**
* Store and retreive raw {@link History} objects from bucket. Mainly used via the
* {@link ChunkStore}.
*
* Histories are stored as gzipped JSON blobs, keyed on the project ID and the
* ID of the Chunk that owns the history. The project ID is currently redundant,
* but I think it might help in future if we have to shard on project ID, and
* it gives us some chance of reconstructing histories even if there is a
* problem with the chunk metadata in the database.
*
* @class
*/
class HistoryStore {
#persistor
#bucket
constructor(persistor, bucket) {
this.#persistor = persistor
this.#bucket = bucket
}
/**
* Load the raw object for a History.
*
* @param {string} projectId
* @param {string} chunkId
* @return {Promise<import('overleaf-editor-core/lib/types').RawHistory>}
*/
async loadRaw(projectId, chunkId) {
assert.projectId(projectId, 'bad projectId')
assert.chunkId(chunkId, 'bad chunkId')
const key = getKey(projectId, chunkId)
logger.debug({ projectId, chunkId }, 'loadRaw started')
try {
const buf = await streams.gunzipStreamToBuffer(
await this.#persistor.getObjectStream(this.#bucket, key)
)
return JSON.parse(buf.toString('utf-8'))
} catch (err) {
if (err instanceof objectPersistor.Errors.NotFoundError) {
throw new Chunk.NotPersistedError(projectId)
}
throw new LoadError(projectId, chunkId, err)
} finally {
logger.debug({ projectId, chunkId }, 'loadRaw finished')
}
}
async loadRawWithBuffer(projectId, chunkId) {
assert.projectId(projectId, 'bad projectId')
assert.chunkId(chunkId, 'bad chunkId')
const key = getKey(projectId, chunkId)
logger.debug({ projectId, chunkId }, 'loadBuffer started')
try {
const buf = await streams.readStreamToBuffer(
await this.#persistor.getObjectStream(this.#bucket, key)
)
const unzipped = await gunzip(buf)
return {
buffer: buf,
raw: JSON.parse(unzipped.toString('utf-8')),
}
} catch (err) {
if (err instanceof objectPersistor.Errors.NotFoundError) {
throw new Chunk.NotPersistedError(projectId)
}
throw new LoadError(projectId, chunkId, err)
} finally {
logger.debug({ projectId, chunkId }, 'loadBuffer finished')
}
}
/**
* Compress and store a {@link History}.
*
* @param {string} projectId
* @param {string} chunkId
* @param {import('overleaf-editor-core/lib/types').RawHistory} rawHistory
*/
async storeRaw(projectId, chunkId, rawHistory) {
assert.projectId(projectId, 'bad projectId')
assert.chunkId(chunkId, 'bad chunkId')
assert.object(rawHistory, 'bad rawHistory')
const key = getKey(projectId, chunkId)
logger.debug({ projectId, chunkId }, 'storeRaw started')
const buf = await gzip(JSON.stringify(rawHistory))
try {
await this.#persistor.sendStream(
this.#bucket,
key,
Stream.Readable.from([buf]),
{
contentType: 'application/json',
contentEncoding: 'gzip',
contentLength: buf.byteLength,
}
)
} catch (err) {
throw new StoreError(projectId, chunkId, err)
} finally {
logger.debug({ projectId, chunkId }, 'storeRaw finished')
}
}
/**
* Delete multiple chunks from bucket. Expects an Array of objects with
* projectId and chunkId properties
* @param {Array<{projectId: string,chunkId:string}>} chunks
*/
async deleteChunks(chunks) {
logger.debug({ chunks }, 'deleteChunks started')
try {
await Promise.all(
chunks.map(chunk => {
const key = getKey(chunk.projectId, chunk.chunkId)
return this.#persistor.deleteObject(this.#bucket, key)
})
)
} finally {
logger.debug({ chunks }, 'deleteChunks finished')
}
}
}
module.exports = {
HistoryStore,
historyStore: new HistoryStore(persistor, config.get('chunkStore.bucket')),
}

View File

@@ -0,0 +1,8 @@
// @ts-check
'use strict'
const env = process.env.NODE_ENV || 'development'
const knexfile = require('../../knexfile')
module.exports = require('knex').default(knexfile[env])

View File

@@ -0,0 +1,19 @@
'use strict'
const config = require('config')
const knexfile = require('../../knexfile')
const env = process.env.NODE_ENV || 'development'
if (config.databaseUrlReadOnly) {
module.exports = require('knex')({
...knexfile[env],
pool: {
...knexfile[env].pool,
min: 0,
},
connection: config.databaseUrlReadOnly,
})
} else {
module.exports = require('./knex')
}

View File

@@ -0,0 +1,30 @@
const Metrics = require('@overleaf/metrics')
const config = require('config')
const { MongoClient } = require('mongodb')
const client = new MongoClient(config.mongo.uri)
const db = client.db()
const chunks = db.collection('projectHistoryChunks')
const blobs = db.collection('projectHistoryBlobs')
const globalBlobs = db.collection('projectHistoryGlobalBlobs')
const shardedBlobs = db.collection('projectHistoryShardedBlobs')
const projects = db.collection('projects')
// Temporary collection for tracking progress of backed up old blobs (without a hash).
// The initial sync process will be able to skip over these.
// Schema: _id: projectId, blobs: [Binary]
const backedUpBlobs = db.collection('projectHistoryBackedUpBlobs')
Metrics.mongodb.monitor(client)
module.exports = {
client,
db,
chunks,
blobs,
globalBlobs,
projects,
shardedBlobs,
backedUpBlobs,
}

View File

@@ -0,0 +1,261 @@
// @ts-check
'use strict'
const _ = require('lodash')
const logger = require('@overleaf/logger')
const core = require('overleaf-editor-core')
const Chunk = core.Chunk
const History = core.History
const assert = require('./assert')
const chunkStore = require('./chunk_store')
const { BlobStore } = require('./blob_store')
const { InvalidChangeError } = require('./errors')
const { getContentHash } = require('./content_hash')
function countChangeBytes(change) {
// Note: This is not quite accurate, because the raw change may contain raw
// file info (or conceivably even content) that will not be included in the
// actual stored object.
return Buffer.byteLength(JSON.stringify(change.toRaw()))
}
function totalChangeBytes(changes) {
return changes.length ? _(changes).map(countChangeBytes).sum() : 0
}
// provide a simple timer function
function Timer() {
this.t0 = process.hrtime()
}
Timer.prototype.elapsed = function () {
const dt = process.hrtime(this.t0)
const timeInMilliseconds = (dt[0] + dt[1] * 1e-9) * 1e3
return timeInMilliseconds
}
/**
* Break the given set of changes into zero or more Chunks according to the
* provided limits and store them.
*
* Some other possible improvements:
* 1. This does a lot more JSON serialization than it has to. We may know the
* JSON for the changes before we call this function, so we could in that
* case get the byte size of each change without doing any work. Even if we
* don't know it initially, we could save some computation by caching this
* info rather than recomputing it many times. TBD whether it is worthwhile.
* 2. We don't necessarily have to fetch the latest chunk in order to determine
* that it is full. We could store this in the chunk metadata record. It may
* be worth distinguishing between a Chunk and its metadata record. The
* endVersion may be better suited to the metadata record.
*
* @param {string} projectId
* @param {core.Change[]} allChanges
* @param {Object} limits
* @param {number} clientEndVersion
* @return {Promise.<Object?>}
*/
async function persistChanges(projectId, allChanges, limits, clientEndVersion) {
assert.projectId(projectId)
assert.array(allChanges)
assert.maybe.object(limits)
assert.integer(clientEndVersion)
const blobStore = new BlobStore(projectId)
const earliestChangeTimestamp =
allChanges.length > 0 ? allChanges[0].getTimestamp() : null
let currentChunk
/**
* currentSnapshot tracks the latest change that we're applying; we use it to
* check that the changes we are persisting are valid.
*
* @type {core.Snapshot}
*/
let currentSnapshot
let originalEndVersion
let changesToPersist
limits = limits || {}
_.defaults(limits, {
changeBucketMinutes: 60,
maxChanges: 2500,
maxChangeBytes: 5 * 1024 * 1024,
maxChunkChanges: 2000,
maxChunkChangeBytes: 5 * 1024 * 1024,
maxChunkChangeTime: 5000, // warn if total time for changes in a chunk takes longer than this
})
function checkElapsedTime(timer) {
const timeTaken = timer.elapsed()
if (timeTaken > limits.maxChunkChangeTime) {
console.log('warning: slow chunk', projectId, timeTaken)
}
}
/**
* Add changes to a chunk until the chunk is full
*
* The chunk is full if it reaches a certain number of changes or a certain
* size in bytes
*
* @param {core.Chunk} chunk
* @param {core.Change[]} changes
*/
async function fillChunk(chunk, changes) {
let totalBytes = totalChangeBytes(chunk.getChanges())
let changesPushed = false
while (changes.length > 0) {
if (chunk.getChanges().length >= limits.maxChunkChanges) {
break
}
const change = changes[0]
const changeBytes = countChangeBytes(change)
if (totalBytes + changeBytes > limits.maxChunkChangeBytes) {
break
}
for (const operation of change.iterativelyApplyTo(currentSnapshot, {
strict: true,
})) {
await validateContentHash(operation)
}
chunk.pushChanges([change])
changes.shift()
totalBytes += changeBytes
changesPushed = true
}
return changesPushed
}
/**
* Check that the operation is valid and can be incorporated to the history.
*
* For now, this checks content hashes when they are provided.
*
* @param {core.Operation} operation
*/
async function validateContentHash(operation) {
if (operation instanceof core.EditFileOperation) {
const editOperation = operation.getOperation()
if (
editOperation instanceof core.TextOperation &&
editOperation.contentHash != null
) {
const path = operation.getPathname()
const file = currentSnapshot.getFile(path)
if (file == null) {
throw new InvalidChangeError('file not found for hash validation', {
projectId,
path,
})
}
await file.load('eager', blobStore)
const content = file.getContent({ filterTrackedDeletes: true })
const expectedHash = editOperation.contentHash
const actualHash = content != null ? getContentHash(content) : null
logger.debug({ expectedHash, actualHash }, 'validating content hash')
if (actualHash !== expectedHash) {
throw new InvalidChangeError('content hash mismatch', {
projectId,
path,
expectedHash,
actualHash,
})
}
// Remove the content hash from the change before storing it in the chunk.
// It was only useful for validation.
editOperation.contentHash = null
}
}
}
async function extendLastChunkIfPossible() {
const latestChunk = await chunkStore.loadLatest(projectId)
currentChunk = latestChunk
originalEndVersion = latestChunk.getEndVersion()
if (originalEndVersion !== clientEndVersion) {
throw new Chunk.ConflictingEndVersion(
clientEndVersion,
originalEndVersion
)
}
currentSnapshot = latestChunk.getSnapshot().clone()
const timer = new Timer()
currentSnapshot.applyAll(latestChunk.getChanges())
const changesPushed = await fillChunk(currentChunk, changesToPersist)
if (!changesPushed) {
return
}
checkElapsedTime(timer)
await chunkStore.update(
projectId,
originalEndVersion,
currentChunk,
earliestChangeTimestamp
)
}
async function createNewChunksAsNeeded() {
while (changesToPersist.length > 0) {
const endVersion = currentChunk.getEndVersion()
const history = new History(currentSnapshot.clone(), [])
const chunk = new Chunk(history, endVersion)
const timer = new Timer()
const changesPushed = await fillChunk(chunk, changesToPersist)
if (changesPushed) {
checkElapsedTime(timer)
currentChunk = chunk
await chunkStore.create(projectId, chunk, earliestChangeTimestamp)
} else {
throw new Error('failed to fill empty chunk')
}
}
}
function isOlderThanMinChangeTimestamp(change) {
return change.getTimestamp().getTime() < limits.minChangeTimestamp
}
function isOlderThanMaxChangeTimestamp(change) {
return change.getTimestamp().getTime() < limits.maxChangeTimestamp
}
const oldChanges = _.filter(allChanges, isOlderThanMinChangeTimestamp)
const anyTooOld = _.some(oldChanges, isOlderThanMaxChangeTimestamp)
const tooManyChanges = oldChanges.length > limits.maxChanges
const tooManyBytes = totalChangeBytes(oldChanges) > limits.maxChangeBytes
if (anyTooOld || tooManyChanges || tooManyBytes) {
changesToPersist = oldChanges
const numberOfChangesToPersist = oldChanges.length
await extendLastChunkIfPossible()
await createNewChunksAsNeeded()
return {
numberOfChangesPersisted: numberOfChangesToPersist,
originalEndVersion,
currentChunk,
}
} else {
return null
}
}
module.exports = persistChanges

View File

@@ -0,0 +1,27 @@
const _ = require('lodash')
const config = require('config')
const metrics = require('@overleaf/metrics')
const objectPersistor = require('@overleaf/object-persistor')
const persistorConfig = _.cloneDeep(config.get('persistor'))
function convertKey(key, convertFn) {
if (_.has(persistorConfig, key)) {
_.update(persistorConfig, key, convertFn)
}
}
convertKey('s3.signedUrlExpiryInMs', s => parseInt(s, 10))
convertKey('s3.httpOptions.timeout', s => parseInt(s, 10))
convertKey('s3.maxRetries', s => parseInt(s, 10))
convertKey('s3.pathStyle', s => s === 'true')
convertKey('gcs.unlockBeforeDelete', s => s === 'true')
convertKey('gcs.unsignedUrls', s => s === 'true')
convertKey('gcs.signedUrlExpiryInMs', s => parseInt(s, 10))
convertKey('gcs.deleteConcurrency', s => parseInt(s, 10))
convertKey('gcs.retryOptions.maxRetries', s => parseInt(s, 10))
convertKey('fallback.buckets', s => JSON.parse(s || '{}'))
persistorConfig.Metrics = metrics
module.exports = objectPersistor(persistorConfig)

View File

@@ -0,0 +1,140 @@
// @ts-check
'use strict'
/**
* @import { Snapshot } from 'overleaf-editor-core'
* @import { BlobStore } from '../../storage/lib/blob_store/index'
*/
const Archive = require('archiver')
const BPromise = require('bluebird')
const fs = require('node:fs')
const { pipeline } = require('node:stream')
const core = require('overleaf-editor-core')
const Snapshot = core.Snapshot
const OError = require('@overleaf/o-error')
const assert = require('./assert')
// The maximum safe concurrency appears to be 1.
// https://github.com/overleaf/issues/issues/1909
const FETCH_CONCURRENCY = 1 // number of files to fetch at once
const DEFAULT_ZIP_TIMEOUT = 25000 // ms
class DownloadError extends OError {
constructor(hash) {
super(`ProjectArchive: blob download failed: ${hash}`, { hash })
}
}
class ArchiveTimeout extends OError {
constructor() {
super('ProjectArchive timed out')
}
}
class MissingfileError extends OError {
constructor() {
super('ProjectArchive: attempting to look up a file that does not exist')
}
}
class ProjectArchive {
static ArchiveTimeout = ArchiveTimeout
static MissingfileError = MissingfileError
static DownloadError = DownloadError
/**
* @constructor
* @param {Snapshot} snapshot
* @param {number} [timeout] in ms
* @classdesc
* Writes the project snapshot to a zip file.
*/
constructor(snapshot, timeout) {
assert.instance(snapshot, Snapshot)
this.snapshot = snapshot
this.timeout = timeout || DEFAULT_ZIP_TIMEOUT
}
/**
* Write zip archive to the given file path.
*
* @param {BlobStore} blobStore
* @param {string} zipFilePath
*/
writeZip(blobStore, zipFilePath) {
const snapshot = this.snapshot
const timeout = this.timeout
const startTime = process.hrtime()
const archive = new Archive('zip')
// Convert elapsed seconds and nanoseconds to milliseconds.
function findElapsedMilliseconds() {
const elapsed = process.hrtime(startTime)
return elapsed[0] * 1e3 + elapsed[1] * 1e-6
}
function addFileToArchive(pathname) {
if (findElapsedMilliseconds() > timeout) {
throw new ProjectArchive.ArchiveTimeout()
}
const file = snapshot.getFile(pathname)
if (!file) {
throw new ProjectArchive.MissingfileError()
}
return file.load('eager', blobStore).then(function () {
const content = file.getContent({ filterTrackedDeletes: true })
if (content === null) {
return streamFileToArchive(pathname, file).catch(function (err) {
throw new ProjectArchive.DownloadError(file.getHash()).withCause(
err
)
})
} else {
archive.append(content, { name: pathname })
}
})
}
function streamFileToArchive(pathname, file) {
return new BPromise(function (resolve, reject) {
blobStore
.getStream(file.getHash())
.then(stream => {
stream.on('error', reject)
stream.on('end', resolve)
archive.append(stream, { name: pathname })
})
.catch(reject)
})
}
const addFilesToArchiveAndFinalize = BPromise.map(
snapshot.getFilePathnames(),
addFileToArchive,
{ concurrency: FETCH_CONCURRENCY }
).then(function () {
archive.finalize()
})
const streamArchiveToFile = new BPromise(function (resolve, reject) {
const stream = fs.createWriteStream(zipFilePath)
pipeline(archive, stream, function (err) {
if (err) {
reject(err)
} else {
resolve()
}
})
})
return BPromise.join(streamArchiveToFile, addFilesToArchiveAndFinalize)
}
}
module.exports = ProjectArchive

View File

@@ -0,0 +1,24 @@
// Keep in sync with services/web/app/src/Features/History/project_key.js
const _ = require('lodash')
const path = require('node:path')
//
// The advice in http://docs.aws.amazon.com/AmazonS3/latest/dev/
// request-rate-perf-considerations.html is to avoid sequential key prefixes,
// so we reverse the project ID part of the key as they suggest.
//
function format(projectId) {
const prefix = naiveReverse(pad(projectId))
return path.join(prefix.slice(0, 3), prefix.slice(3, 6), prefix.slice(6))
}
function pad(number) {
return _.padStart(number, 9, '0')
}
function naiveReverse(string) {
return string.split('').reverse().join('')
}
exports.format = format
exports.pad = pad

View File

@@ -0,0 +1,19 @@
const config = require('config')
const redis = require('@overleaf/redis-wrapper')
const historyRedisOptions = config.get('redis.history')
const rclientHistory = redis.createClient(historyRedisOptions)
const lockRedisOptions = config.get('redis.history')
const rclientLock = redis.createClient(lockRedisOptions)
async function disconnect() {
await Promise.all([rclientHistory.disconnect(), rclientLock.disconnect()])
}
module.exports = {
rclientHistory,
rclientLock,
redis,
disconnect,
}

View File

@@ -0,0 +1,40 @@
// @ts-check
/**
* Promises are promises and streams are streams, and ne'er the twain shall
* meet.
* @module
*/
'use strict'
const Stream = require('node:stream')
const zlib = require('node:zlib')
const { WritableBuffer } = require('@overleaf/stream-utils')
/**
* Create a promise for the result of reading a stream to a buffer.
*
* @param {Stream.Readable} readStream
* @return {Promise<Buffer>}
*/
async function readStreamToBuffer(readStream) {
const bufferStream = new WritableBuffer()
await Stream.promises.pipeline(readStream, bufferStream)
return bufferStream.contents()
}
exports.readStreamToBuffer = readStreamToBuffer
/**
* Create a promise for the result of un-gzipping a stream to a buffer.
*
* @param {NodeJS.ReadableStream} readStream
* @return {Promise<Buffer>}
*/
async function gunzipStreamToBuffer(readStream) {
const gunzip = zlib.createGunzip()
const bufferStream = new WritableBuffer()
await Stream.promises.pipeline(readStream, gunzip, bufferStream)
return bufferStream.contents()
}
exports.gunzipStreamToBuffer = gunzipStreamToBuffer

View File

@@ -0,0 +1,25 @@
/*
* Taken from renderer/app/helpers/temp.js with minor cosmetic changes.
* Promisify the temp package. The temp package provides a 'track' feature
* that automatically cleans up temp files at process exit, but that is not
* very useful. They also provide a method to trigger cleanup, but that is not
* safe for concurrent use. So, we use a disposer to unlink the file.
*/
const BPromise = require('bluebird')
const fs = BPromise.promisifyAll(require('node:fs'))
const temp = BPromise.promisifyAll(require('temp'))
exports.open = function (affixes) {
return temp.openAsync(affixes).disposer(function (fileInfo) {
fs.closeAsync(fileInfo.fd)
.then(() => {
return fs.unlinkAsync(fileInfo.path)
})
.catch(function (err) {
if (err.code !== 'ENOENT') {
throw err
}
})
})
}

View File

@@ -0,0 +1,134 @@
'use strict'
const BPromise = require('bluebird')
const config = require('config')
const fs = require('node:fs')
const path = require('node:path')
const OError = require('@overleaf/o-error')
const objectPersistor = require('@overleaf/object-persistor')
const assert = require('./assert')
const { BlobStore } = require('./blob_store')
const persistor = require('./persistor')
const ProjectArchive = require('./project_archive')
const projectKey = require('./project_key')
const temp = require('./temp')
const BUCKET = config.get('zipStore.bucket')
function getZipKey(projectId, version) {
return path.join(
projectKey.format(projectId),
version.toString(),
'project.zip'
)
}
/**
* Store a zip of a given version of a project in bucket.
*
* @class
*/
class ZipStore {
/**
* Generate signed link to access the zip file.
*
* @param {number | string} projectId
* @param {number} version
* @return {string}
*/
async getSignedUrl(projectId, version) {
assert.projectId(projectId, 'bad projectId')
assert.integer(version, 'bad version')
const key = getZipKey(projectId, version)
return await persistor.getRedirectUrl(BUCKET, key)
}
/**
* Generate a zip of the given snapshot.
*
* @param {number | string} projectId
* @param {number} version
* @param {Snapshot} snapshot
*/
async storeZip(projectId, version, snapshot) {
assert.projectId(projectId, 'bad projectId')
assert.integer(version, 'bad version')
assert.object(snapshot, 'bad snapshot')
const zipKey = getZipKey(projectId, version)
if (await isZipPresent()) return
await BPromise.using(temp.open('zip'), async tempFileInfo => {
await zipSnapshot(tempFileInfo.path, snapshot)
await uploadZip(tempFileInfo.path)
})
// If the file is already there, we don't need to build the zip again. If we
// just HEAD the file, there's a race condition, because the zip files
// automatically expire. So, we try to copy the file from itself to itself,
// and if it fails, we know the file didn't exist. If it succeeds, this has
// the effect of re-extending its lifetime.
async function isZipPresent() {
try {
await persistor.copyObject(BUCKET, zipKey, zipKey)
return true
} catch (error) {
if (!(error instanceof objectPersistor.Errors.NotFoundError)) {
console.error(
'storeZip: isZipPresent: unexpected error (except in dev): %s',
error
)
}
return false
}
}
async function zipSnapshot(tempPathname, snapshot) {
const blobStore = new BlobStore(projectId)
const zipTimeoutMs = parseInt(config.get('zipStore.zipTimeoutMs'), 10)
const archive = new ProjectArchive(snapshot, zipTimeoutMs)
try {
await archive.writeZip(blobStore, tempPathname)
} catch (err) {
throw new ZipStore.CreationError(projectId, version).withCause(err)
}
}
async function uploadZip(tempPathname, snapshot) {
const stream = fs.createReadStream(tempPathname)
try {
await persistor.sendStream(BUCKET, zipKey, stream, {
contentType: 'application/zip',
})
} catch (err) {
throw new ZipStore.UploadError(projectId, version).withCause(err)
}
}
}
}
class CreationError extends OError {
constructor(projectId, version) {
super(`Zip creation failed for ${projectId} version ${version}`, {
projectId,
version,
})
}
}
ZipStore.CreationError = CreationError
class UploadError extends OError {
constructor(projectId, version) {
super(`Zip upload failed for ${projectId} version ${version}`, {
projectId,
version,
})
}
}
ZipStore.UploadError = UploadError
module.exports = new ZipStore()