feat(revisions): add procedure for transitioning data from primary to secondary database (#787)

* feat(revisions): add procedure for transitioning data from primary to secondary database

* fix: transition status updating for both items and revisions

* fix: dependabot
This commit is contained in:
Karol Sójko
2023-08-30 11:20:01 +02:00
committed by GitHub
parent 06170d8d7d
commit fe273a9107
28 changed files with 733 additions and 35 deletions

View File

@@ -30,6 +30,11 @@ updates:
schedule:
interval: "daily"
- package-ecosystem: "npm"
directory: "/packages/domain-core"
schedule:
interval: "daily"
- package-ecosystem: "npm"
directory: "/packages/domain-events"
schedule:
@@ -50,6 +55,11 @@ updates:
schedule:
interval: "daily"
- package-ecosystem: "npm"
directory: "/packages/home-server"
schedule:
interval: "daily"
- package-ecosystem: "npm"
directory: "/packages/predicates"
schedule:

View File

@@ -122,6 +122,10 @@ echo "linking topic $FILES_TOPIC_ARN to queue $AUTH_QUEUE_ARN"
LINKING_RESULT=$(link_queue_and_topic $FILES_TOPIC_ARN $AUTH_QUEUE_ARN)
echo "linking done:"
echo "$LINKING_RESULT"
echo "linking topic $REVISIONS_TOPIC_ARN to queue $AUTH_QUEUE_ARN"
LINKING_RESULT=$(link_queue_and_topic $REVISIONS_TOPIC_ARN $AUTH_QUEUE_ARN)
echo "linking done:"
echo "$LINKING_RESULT"
QUEUE_NAME="files-local-queue"

View File

@@ -9,6 +9,7 @@ export class TransitionStatusUpdatedEventHandler implements DomainEventHandlerIn
const result = await this.updateTransitionStatusUseCase.execute({
status: event.payload.status,
userUuid: event.payload.userUuid,
transitionType: event.payload.transitionType,
})
if (result.isFailed()) {

View File

@@ -1,5 +1,5 @@
export interface TransitionStatusRepositoryInterface {
updateStatus(userUuid: string, status: 'STARTED' | 'FAILED'): Promise<void>
removeStatus(userUuid: string): Promise<void>
getStatus(userUuid: string): Promise<'STARTED' | 'FAILED' | null>
updateStatus(userUuid: string, transitionType: 'items' | 'revisions', status: 'STARTED' | 'FAILED'): Promise<void>
removeStatus(userUuid: string, transitionType: 'items' | 'revisions'): Promise<void>
getStatus(userUuid: string, transitionType: 'items' | 'revisions'): Promise<'STARTED' | 'FAILED' | null>
}

View File

@@ -45,7 +45,7 @@ export class CreateCrossServiceToken implements UseCaseInterface<string> {
return Result.fail(`Could not find user with uuid ${dto.userUuid}`)
}
const transitionStatus = await this.transitionStatusRepository.getStatus(user.uuid)
const transitionStatus = await this.transitionStatusRepository.getStatus(user.uuid, 'items')
const roles = await user.roles

View File

@@ -39,6 +39,7 @@ describe('GetTransitionStatus', () => {
const result = await useCase.execute({
userUuid: '00000000-0000-0000-0000-000000000000',
transitionType: 'items',
})
expect(result.isFailed()).toBeFalsy()
@@ -52,6 +53,7 @@ describe('GetTransitionStatus', () => {
const result = await useCase.execute({
userUuid: '00000000-0000-0000-0000-000000000000',
transitionType: 'items',
})
expect(result.isFailed()).toBeFalsy()
@@ -63,6 +65,7 @@ describe('GetTransitionStatus', () => {
const result = await useCase.execute({
userUuid: '00000000-0000-0000-0000-000000000000',
transitionType: 'items',
})
expect(result.isFailed()).toBeFalsy()
@@ -76,6 +79,7 @@ describe('GetTransitionStatus', () => {
const result = await useCase.execute({
userUuid: '00000000-0000-0000-0000-000000000000',
transitionType: 'items',
})
expect(result.isFailed()).toBeFalsy()
@@ -87,6 +91,7 @@ describe('GetTransitionStatus', () => {
const result = await useCase.execute({
userUuid: 'invalid',
transitionType: 'items',
})
expect(result.isFailed()).toBeTruthy()
@@ -100,6 +105,7 @@ describe('GetTransitionStatus', () => {
const result = await useCase.execute({
userUuid: '00000000-0000-0000-0000-000000000000',
transitionType: 'items',
})
expect(result.isFailed()).toBeTruthy()

View File

@@ -29,7 +29,7 @@ export class GetTransitionStatus implements UseCaseInterface<'TO-DO' | 'STARTED'
}
}
const transitionStatus = await this.transitionStatusRepository.getStatus(userUuid.value)
const transitionStatus = await this.transitionStatusRepository.getStatus(userUuid.value, dto.transitionType)
if (transitionStatus === null) {
return Result.ok('TO-DO')
}

View File

@@ -1,3 +1,4 @@
export interface GetTransitionStatusDTO {
userUuid: string
transitionType: 'items' | 'revisions'
}

View File

@@ -25,10 +25,14 @@ describe('UpdateTransitionStatus', () => {
const result = await useCase.execute({
userUuid: '00000000-0000-0000-0000-000000000000',
status: 'FINISHED',
transitionType: 'items',
})
expect(result.isFailed()).toBeFalsy()
expect(transitionStatusRepository.removeStatus).toHaveBeenCalledWith('00000000-0000-0000-0000-000000000000')
expect(transitionStatusRepository.removeStatus).toHaveBeenCalledWith(
'00000000-0000-0000-0000-000000000000',
'items',
)
expect(roleService.addRoleToUser).toHaveBeenCalledWith(
Uuid.create('00000000-0000-0000-0000-000000000000').getValue(),
RoleName.create(RoleName.NAMES.TransitionUser).getValue(),
@@ -41,11 +45,13 @@ describe('UpdateTransitionStatus', () => {
const result = await useCase.execute({
userUuid: '00000000-0000-0000-0000-000000000000',
status: 'STARTED',
transitionType: 'items',
})
expect(result.isFailed()).toBeFalsy()
expect(transitionStatusRepository.updateStatus).toHaveBeenCalledWith(
'00000000-0000-0000-0000-000000000000',
'items',
'STARTED',
)
})
@@ -56,6 +62,7 @@ describe('UpdateTransitionStatus', () => {
const result = await useCase.execute({
userUuid: 'invalid',
status: 'STARTED',
transitionType: 'items',
})
expect(result.isFailed()).toBeTruthy()

View File

@@ -17,14 +17,14 @@ export class UpdateTransitionStatus implements UseCaseInterface<void> {
const userUuid = userUuidOrError.getValue()
if (dto.status === 'FINISHED') {
await this.transitionStatusRepository.removeStatus(dto.userUuid)
await this.transitionStatusRepository.removeStatus(dto.userUuid, dto.transitionType)
await this.roleService.addRoleToUser(userUuid, RoleName.create(RoleName.NAMES.TransitionUser).getValue())
return Result.ok()
}
await this.transitionStatusRepository.updateStatus(dto.userUuid, dto.status)
await this.transitionStatusRepository.updateStatus(dto.userUuid, dto.transitionType, dto.status)
return Result.ok()
}

View File

@@ -1,4 +1,5 @@
export interface UpdateTransitionStatusDTO {
userUuid: string
transitionType: 'items' | 'revisions'
status: 'STARTED' | 'FINISHED' | 'FAILED'
}

View File

@@ -1,18 +1,37 @@
import { TransitionStatusRepositoryInterface } from '../../Domain/Transition/TransitionStatusRepositoryInterface'
export class InMemoryTransitionStatusRepository implements TransitionStatusRepositoryInterface {
private statuses: Map<string, 'STARTED' | 'FAILED'> = new Map()
private itemStatuses: Map<string, 'STARTED' | 'FAILED'> = new Map()
private revisionStatuses: Map<string, 'STARTED' | 'FAILED'> = new Map()
async updateStatus(userUuid: string, status: 'STARTED' | 'FAILED'): Promise<void> {
this.statuses.set(userUuid, status)
async updateStatus(
userUuid: string,
transitionType: 'items' | 'revisions',
status: 'STARTED' | 'FAILED',
): Promise<void> {
if (transitionType === 'items') {
this.itemStatuses.set(userUuid, status)
} else {
this.revisionStatuses.set(userUuid, status)
}
}
async removeStatus(userUuid: string): Promise<void> {
this.statuses.delete(userUuid)
async removeStatus(userUuid: string, transitionType: 'items' | 'revisions'): Promise<void> {
if (transitionType === 'items') {
this.itemStatuses.delete(userUuid)
} else {
this.revisionStatuses.delete(userUuid)
}
}
async getStatus(userUuid: string): Promise<'STARTED' | 'FAILED' | null> {
const status = this.statuses.get(userUuid) || null
async getStatus(userUuid: string, transitionType: 'items' | 'revisions'): Promise<'STARTED' | 'FAILED' | null> {
let status: 'STARTED' | 'FAILED' | null = null
if (transitionType === 'items') {
status = this.itemStatuses.get(userUuid) ?? null
} else {
status = this.revisionStatuses.get(userUuid) ?? null
}
return status
}

View File

@@ -109,6 +109,7 @@ export class BaseUsersController extends BaseHttpController {
async transitionStatus(_request: Request, response: Response): Promise<results.JsonResult> {
const result = await this.getTransitionStatusUseCase.execute({
userUuid: response.locals.user.uuid,
transitionType: 'items',
})
if (result.isFailed()) {

View File

@@ -7,16 +7,23 @@ export class RedisTransitionStatusRepository implements TransitionStatusReposito
constructor(private redisClient: IORedis.Redis) {}
async updateStatus(userUuid: string, status: 'STARTED' | 'FAILED'): Promise<void> {
await this.redisClient.set(`${this.PREFIX}:${userUuid}`, status)
async updateStatus(
userUuid: string,
transitionType: 'items' | 'revisions',
status: 'STARTED' | 'FAILED',
): Promise<void> {
await this.redisClient.set(`${this.PREFIX}:${transitionType}:${userUuid}`, status)
}
async removeStatus(userUuid: string): Promise<void> {
await this.redisClient.del(`${this.PREFIX}:${userUuid}`)
async removeStatus(userUuid: string, transitionType: 'items' | 'revisions'): Promise<void> {
await this.redisClient.del(`${this.PREFIX}:${transitionType}:${userUuid}`)
}
async getStatus(userUuid: string): Promise<'STARTED' | 'FAILED' | null> {
const status = (await this.redisClient.get(`${this.PREFIX}:${userUuid}`)) as 'STARTED' | 'FAILED' | null
async getStatus(userUuid: string, transitionType: 'items' | 'revisions'): Promise<'STARTED' | 'FAILED' | null> {
const status = (await this.redisClient.get(`${this.PREFIX}:${transitionType}:${userUuid}`)) as
| 'STARTED'
| 'FAILED'
| null
return status
}

View File

@@ -1,4 +1,5 @@
export interface TransitionStatusUpdatedEventPayload {
userUuid: string
transitionType: 'items' | 'revisions'
status: 'STARTED' | 'FINISHED' | 'FAILED'
}

View File

@@ -54,6 +54,7 @@ import { RevisionRepositoryResolverInterface } from '../Domain/Revision/Revision
import { TypeORMRevisionRepositoryResolver } from '../Infra/TypeORM/TypeORMRevisionRepositoryResolver'
import { RevisionMetadataHttpRepresentation } from '../Mapping/Http/RevisionMetadataHttpRepresentation'
import { RevisionHttpRepresentation } from '../Mapping/Http/RevisionHttpRepresentation'
import { TransitionRevisionsFromPrimaryToSecondaryDatabaseForUser } from '../Domain/UseCase/Transition/TransitionRevisionsFromPrimaryToSecondaryDatabaseForUser/TransitionRevisionsFromPrimaryToSecondaryDatabaseForUser'
export class ContainerConfigLoader {
async load(configuration?: {
@@ -219,6 +220,20 @@ export class ContainerConfigLoader {
container.get<RevisionRepositoryResolverInterface>(TYPES.Revisions_RevisionRepositoryResolver),
),
)
container
.bind<TransitionRevisionsFromPrimaryToSecondaryDatabaseForUser>(
TYPES.Revisions_TransitionRevisionsFromPrimaryToSecondaryDatabaseForUser,
)
.toConstantValue(
new TransitionRevisionsFromPrimaryToSecondaryDatabaseForUser(
container.get<RevisionRepositoryInterface>(TYPES.Revisions_SQLRevisionRepository),
isSecondaryDatabaseEnabled
? container.get<RevisionRepositoryInterface>(TYPES.Revisions_MongoDBRevisionRepository)
: null,
container.get<TimerInterface>(TYPES.Revisions_Timer),
container.get<winston.Logger>(TYPES.Revisions_Logger),
),
)
// env vars
container.bind(TYPES.Revisions_AUTH_JWT_SECRET).toConstantValue(env.get('AUTH_JWT_SECRET'))

View File

@@ -35,6 +35,9 @@ const TYPES = {
Revisions_DeleteRevision: Symbol.for('Revisions_DeleteRevision'),
Revisions_CopyRevisions: Symbol.for('Revisions_CopyRevisions'),
Revisions_GetRequiredRoleToViewRevision: Symbol.for('Revisions_GetRequiredRoleToViewRevision'),
Revisions_TransitionRevisionsFromPrimaryToSecondaryDatabaseForUser: Symbol.for(
'Revisions_TransitionRevisionsFromPrimaryToSecondaryDatabaseForUser',
),
// Controller
Revisions_ControllerContainer: Symbol.for('Revisions_ControllerContainer'),
Revisions_RevisionsController: Symbol.for('Revisions_RevisionsController'),

View File

@@ -10,4 +10,18 @@ export class Revision extends Entity<RevisionProps> {
static create(props: RevisionProps, id?: UniqueEntityId): Result<Revision> {
return Result.ok<Revision>(new Revision(props, id))
}
isIdenticalTo(revision: Revision): boolean {
if (this._id.toString() !== revision._id.toString()) {
return false
}
const stringifiedThis = JSON.stringify(this.props)
const stringifiedRevision = JSON.stringify(revision.props)
const base64This = Buffer.from(stringifiedThis).toString('base64')
const base64Item = Buffer.from(stringifiedRevision).toString('base64')
return base64This === base64Item
}
}

View File

@@ -4,11 +4,13 @@ import { Revision } from './Revision'
import { RevisionMetadata } from './RevisionMetadata'
export interface RevisionRepositoryInterface {
countByUserUuid(userUuid: Uuid): Promise<number>
removeByUserUuid(userUuid: Uuid): Promise<void>
removeOneByUuid(revisionUuid: Uuid, userUuid: Uuid): Promise<void>
findOneByUuid(revisionUuid: Uuid, userUuid: Uuid): Promise<Revision | null>
findByItemUuid(itemUuid: Uuid): Promise<Array<Revision>>
findMetadataByItemId(itemUuid: Uuid, userUuid: Uuid): Promise<Array<RevisionMetadata>>
updateUserUuid(itemUuid: Uuid, userUuid: Uuid): Promise<void>
findByUserUuid(dto: { userUuid: Uuid; offset?: number; limit?: number }): Promise<Array<Revision>>
save(revision: Revision): Promise<Revision>
}

View File

@@ -0,0 +1,376 @@
import { Logger } from 'winston'
import { RevisionRepositoryInterface } from '../../../Revision/RevisionRepositoryInterface'
import { TransitionRevisionsFromPrimaryToSecondaryDatabaseForUser } from './TransitionRevisionsFromPrimaryToSecondaryDatabaseForUser'
import { Revision } from '../../../Revision/Revision'
import { ContentType, Dates, UniqueEntityId, Uuid } from '@standardnotes/domain-core'
import { TimerInterface } from '@standardnotes/time'
describe('TransitionRevisionsFromPrimaryToSecondaryDatabaseForUser', () => {
let primaryRevisionRepository: RevisionRepositoryInterface
let secondaryRevisionRepository: RevisionRepositoryInterface | null
let logger: Logger
let primaryRevision1: Revision
let primaryRevision2: Revision
let secondaryRevision1: Revision
let secondaryRevision2: Revision
let timer: TimerInterface
const createUseCase = () =>
new TransitionRevisionsFromPrimaryToSecondaryDatabaseForUser(
primaryRevisionRepository,
secondaryRevisionRepository,
timer,
logger,
)
beforeEach(() => {
primaryRevision1 = Revision.create(
{
itemUuid: Uuid.create('84c0f8e8-544a-4c7e-9adf-26209303bc1d').getValue(),
userUuid: Uuid.create('84c0f8e8-544a-4c7e-9adf-26209303bc1d').getValue(),
content: 'test',
contentType: ContentType.create('Note').getValue(),
itemsKeyId: 'test',
encItemKey: 'test',
authHash: 'test',
creationDate: new Date(1),
dates: Dates.create(new Date(1), new Date(2)).getValue(),
},
new UniqueEntityId('00000000-0000-0000-0000-000000000000'),
).getValue()
primaryRevision2 = Revision.create(
{
itemUuid: Uuid.create('84c0f8e8-544a-4c7e-9adf-26209303bc2d').getValue(),
userUuid: Uuid.create('84c0f8e8-544a-4c7e-9adf-26209303bc1d').getValue(),
content: 'test',
contentType: ContentType.create('Note').getValue(),
itemsKeyId: 'test',
encItemKey: 'test',
authHash: 'test',
creationDate: new Date(1),
dates: Dates.create(new Date(1), new Date(2)).getValue(),
},
new UniqueEntityId('00000000-0000-0000-0000-000000000001'),
).getValue()
secondaryRevision1 = Revision.create(
{
itemUuid: Uuid.create('84c0f8e8-544a-4c7e-9adf-26209303bc1d').getValue(),
userUuid: Uuid.create('84c0f8e8-544a-4c7e-9adf-26209303bc1d').getValue(),
content: 'test',
contentType: ContentType.create('Note').getValue(),
itemsKeyId: 'test',
encItemKey: 'test',
authHash: 'test',
creationDate: new Date(1),
dates: Dates.create(new Date(1), new Date(2)).getValue(),
},
new UniqueEntityId('00000000-0000-0000-0000-000000000000'),
).getValue()
secondaryRevision2 = Revision.create(
{
itemUuid: Uuid.create('84c0f8e8-544a-4c7e-9adf-26209303bc2d').getValue(),
userUuid: Uuid.create('84c0f8e8-544a-4c7e-9adf-26209303bc1d').getValue(),
content: 'test',
contentType: ContentType.create('Note').getValue(),
itemsKeyId: 'test',
encItemKey: 'test',
authHash: 'test',
creationDate: new Date(1),
dates: Dates.create(new Date(1), new Date(2)).getValue(),
},
new UniqueEntityId('00000000-0000-0000-0000-000000000001'),
).getValue()
primaryRevisionRepository = {} as jest.Mocked<RevisionRepositoryInterface>
primaryRevisionRepository.countByUserUuid = jest.fn().mockResolvedValue(2)
primaryRevisionRepository.findByUserUuid = jest
.fn()
.mockResolvedValueOnce([primaryRevision1])
.mockResolvedValueOnce([primaryRevision2])
.mockResolvedValueOnce([primaryRevision1])
.mockResolvedValueOnce([primaryRevision2])
primaryRevisionRepository.removeByUserUuid = jest.fn().mockResolvedValue(undefined)
secondaryRevisionRepository = {} as jest.Mocked<RevisionRepositoryInterface>
secondaryRevisionRepository.save = jest.fn().mockResolvedValue(undefined)
secondaryRevisionRepository.removeByUserUuid = jest.fn().mockResolvedValue(undefined)
secondaryRevisionRepository.countByUserUuid = jest.fn().mockResolvedValue(2)
secondaryRevisionRepository.findOneByUuid = jest
.fn()
.mockResolvedValueOnce(secondaryRevision1)
.mockResolvedValueOnce(secondaryRevision2)
logger = {} as jest.Mocked<Logger>
logger.error = jest.fn()
logger.info = jest.fn()
timer = {} as jest.Mocked<TimerInterface>
timer.getTimestampInMicroseconds = jest.fn().mockReturnValue(123)
timer.convertMicrosecondsToTimeStructure = jest.fn().mockReturnValue({
days: 0,
hours: 0,
minutes: 0,
seconds: 0,
milliseconds: 0,
})
})
describe('successfull transition', () => {
it('should transition Revisions from primary to secondary database', async () => {
const useCase = createUseCase()
const result = await useCase.execute({
userUuid: '00000000-0000-0000-0000-000000000000',
})
expect(result.isFailed()).toBeFalsy()
expect(primaryRevisionRepository.countByUserUuid).toHaveBeenCalledTimes(2)
expect(primaryRevisionRepository.countByUserUuid).toHaveBeenCalledWith(
Uuid.create('00000000-0000-0000-0000-000000000000').getValue(),
)
expect(primaryRevisionRepository.findByUserUuid).toHaveBeenCalledTimes(4)
expect(primaryRevisionRepository.findByUserUuid).toHaveBeenNthCalledWith(1, {
userUuid: Uuid.create('00000000-0000-0000-0000-000000000000').getValue(),
limit: 1,
offset: 0,
})
expect(primaryRevisionRepository.findByUserUuid).toHaveBeenNthCalledWith(2, {
userUuid: Uuid.create('00000000-0000-0000-0000-000000000000').getValue(),
limit: 1,
offset: 1,
})
expect(primaryRevisionRepository.findByUserUuid).toHaveBeenNthCalledWith(3, {
userUuid: Uuid.create('00000000-0000-0000-0000-000000000000').getValue(),
limit: 1,
offset: 0,
})
expect(primaryRevisionRepository.findByUserUuid).toHaveBeenNthCalledWith(4, {
userUuid: Uuid.create('00000000-0000-0000-0000-000000000000').getValue(),
limit: 1,
offset: 1,
})
expect((secondaryRevisionRepository as RevisionRepositoryInterface).save).toHaveBeenCalledTimes(2)
expect((secondaryRevisionRepository as RevisionRepositoryInterface).save).toHaveBeenCalledWith(primaryRevision1)
expect((secondaryRevisionRepository as RevisionRepositoryInterface).save).toHaveBeenCalledWith(primaryRevision2)
expect((secondaryRevisionRepository as RevisionRepositoryInterface).removeByUserUuid).not.toHaveBeenCalled()
expect(primaryRevisionRepository.removeByUserUuid).toHaveBeenCalledTimes(1)
})
it('should log an error if deleting Revisions from primary database fails', async () => {
primaryRevisionRepository.removeByUserUuid = jest.fn().mockRejectedValue(new Error('error'))
const useCase = createUseCase()
const result = await useCase.execute({
userUuid: '00000000-0000-0000-0000-000000000000',
})
expect(result.isFailed()).toBeFalsy()
expect(logger.error).toHaveBeenCalledTimes(1)
expect(logger.error).toHaveBeenCalledWith(
'Failed to clean up primary database revisions for user 00000000-0000-0000-0000-000000000000: error',
)
})
})
describe('failed transition', () => {
it('should remove Revisions from secondary database if integrity check fails', async () => {
const secondaryRevision2WithDifferentContent = Revision.create({
...secondaryRevision2.props,
content: 'different-content',
}).getValue()
;(secondaryRevisionRepository as RevisionRepositoryInterface).findOneByUuid = jest
.fn()
.mockResolvedValueOnce(secondaryRevision1)
.mockResolvedValueOnce(secondaryRevision2WithDifferentContent)
const useCase = createUseCase()
const result = await useCase.execute({
userUuid: '00000000-0000-0000-0000-000000000000',
})
expect(result.isFailed()).toBeTruthy()
expect(result.getError()).toEqual(
'Revision 00000000-0000-0000-0000-000000000001 is not identical in primary and secondary database',
)
expect((secondaryRevisionRepository as RevisionRepositoryInterface).removeByUserUuid).toHaveBeenCalledTimes(1)
expect(primaryRevisionRepository.removeByUserUuid).not.toHaveBeenCalled()
})
it('should remove Revisions from secondary database if migrating Revisions fails', async () => {
primaryRevisionRepository.findByUserUuid = jest
.fn()
.mockResolvedValueOnce([primaryRevision1])
.mockRejectedValueOnce(new Error('error'))
const useCase = createUseCase()
const result = await useCase.execute({
userUuid: '00000000-0000-0000-0000-000000000000',
})
expect(result.isFailed()).toBeTruthy()
expect(result.getError()).toEqual('error')
expect((secondaryRevisionRepository as RevisionRepositoryInterface).removeByUserUuid).toHaveBeenCalledTimes(1)
expect(primaryRevisionRepository.removeByUserUuid).not.toHaveBeenCalled()
})
it('should log an error if deleting Revisions from secondary database fails upon migration failure', async () => {
primaryRevisionRepository.findByUserUuid = jest
.fn()
.mockResolvedValueOnce([primaryRevision1])
.mockRejectedValueOnce(new Error('error'))
;(secondaryRevisionRepository as RevisionRepositoryInterface).removeByUserUuid = jest
.fn()
.mockRejectedValue(new Error('error'))
const useCase = createUseCase()
const result = await useCase.execute({
userUuid: '00000000-0000-0000-0000-000000000000',
})
expect(result.isFailed()).toBeTruthy()
expect(logger.error).toHaveBeenCalledTimes(1)
expect(logger.error).toHaveBeenCalledWith(
'Failed to clean up secondary database revisions for user 00000000-0000-0000-0000-000000000000: error',
)
})
it('should log an error if deleting Revisions from secondary database fails upon integrity check failure', async () => {
const secondaryRevision2WithDifferentContent = Revision.create({
...secondaryRevision2.props,
content: 'different-content',
}).getValue()
;(secondaryRevisionRepository as RevisionRepositoryInterface).findOneByUuid = jest
.fn()
.mockResolvedValueOnce(secondaryRevision1)
.mockResolvedValueOnce(secondaryRevision2WithDifferentContent)
;(secondaryRevisionRepository as RevisionRepositoryInterface).removeByUserUuid = jest
.fn()
.mockRejectedValue(new Error('error'))
const useCase = createUseCase()
const result = await useCase.execute({
userUuid: '00000000-0000-0000-0000-000000000000',
})
expect(result.isFailed()).toBeTruthy()
expect(logger.error).toHaveBeenCalledTimes(1)
expect(logger.error).toHaveBeenCalledWith(
'Failed to clean up secondary database revisions for user 00000000-0000-0000-0000-000000000000: error',
)
})
it('should not perform the transition if secondary Revision repository is not set', async () => {
secondaryRevisionRepository = null
const useCase = createUseCase()
const result = await useCase.execute({
userUuid: '00000000-0000-0000-0000-000000000000',
})
expect(result.isFailed()).toBeTruthy()
expect(result.getError()).toEqual('Secondary revision repository is not set')
expect(primaryRevisionRepository.countByUserUuid).not.toHaveBeenCalled()
expect(primaryRevisionRepository.findByUserUuid).not.toHaveBeenCalled()
expect(primaryRevisionRepository.removeByUserUuid).not.toHaveBeenCalled()
})
it('should not perform the transition if the user uuid is invalid', async () => {
const useCase = createUseCase()
const result = await useCase.execute({
userUuid: 'invalid-uuid',
})
expect(result.isFailed()).toBeTruthy()
expect(result.getError()).toEqual('Given value is not a valid uuid: invalid-uuid')
expect(primaryRevisionRepository.countByUserUuid).not.toHaveBeenCalled()
expect(primaryRevisionRepository.findByUserUuid).not.toHaveBeenCalled()
expect(primaryRevisionRepository.removeByUserUuid).not.toHaveBeenCalled()
})
it('should fail integrity check if the Revision count is not the same in both databases', async () => {
;(secondaryRevisionRepository as RevisionRepositoryInterface).countByUserUuid = jest.fn().mockResolvedValue(1)
const useCase = createUseCase()
const result = await useCase.execute({
userUuid: '00000000-0000-0000-0000-000000000000',
})
expect(result.isFailed()).toBeTruthy()
expect(result.getError()).toEqual(
'Total revisions count for user 00000000-0000-0000-0000-000000000000 in primary database (2) does not match total revisions count in secondary database (1)',
)
expect(primaryRevisionRepository.countByUserUuid).toHaveBeenCalledTimes(2)
expect(primaryRevisionRepository.countByUserUuid).toHaveBeenCalledWith(
Uuid.create('00000000-0000-0000-0000-000000000000').getValue(),
)
expect((secondaryRevisionRepository as RevisionRepositoryInterface).countByUserUuid).toHaveBeenCalledTimes(1)
expect(primaryRevisionRepository.removeByUserUuid).not.toHaveBeenCalled()
expect((secondaryRevisionRepository as RevisionRepositoryInterface).removeByUserUuid).toHaveBeenCalledTimes(1)
})
it('should fail if one Revision is not found in the secondary database', async () => {
;(secondaryRevisionRepository as RevisionRepositoryInterface).findOneByUuid = jest
.fn()
.mockResolvedValueOnce(secondaryRevision1)
.mockResolvedValueOnce(null)
const useCase = createUseCase()
const result = await useCase.execute({
userUuid: '00000000-0000-0000-0000-000000000000',
})
expect(result.isFailed()).toBeTruthy()
expect(result.getError()).toEqual('Revision 00000000-0000-0000-0000-000000000001 not found in secondary database')
expect(primaryRevisionRepository.countByUserUuid).toHaveBeenCalledTimes(2)
expect(primaryRevisionRepository.countByUserUuid).toHaveBeenCalledWith(
Uuid.create('00000000-0000-0000-0000-000000000000').getValue(),
)
expect((secondaryRevisionRepository as RevisionRepositoryInterface).countByUserUuid).toHaveBeenCalledTimes(1)
expect(primaryRevisionRepository.removeByUserUuid).not.toHaveBeenCalled()
expect((secondaryRevisionRepository as RevisionRepositoryInterface).removeByUserUuid).toHaveBeenCalledTimes(1)
})
it('should fail if an error is thrown during integrity check between primary and secondary database', async () => {
;(secondaryRevisionRepository as RevisionRepositoryInterface).countByUserUuid = jest
.fn()
.mockRejectedValue(new Error('error'))
const useCase = createUseCase()
const result = await useCase.execute({
userUuid: '00000000-0000-0000-0000-000000000000',
})
expect(result.isFailed()).toBeTruthy()
expect(result.getError()).toEqual('error')
expect(primaryRevisionRepository.removeByUserUuid).not.toHaveBeenCalled()
expect((secondaryRevisionRepository as RevisionRepositoryInterface).removeByUserUuid).toHaveBeenCalledTimes(1)
})
})
})

View File

@@ -0,0 +1,160 @@
import { Result, UseCaseInterface, Uuid } from '@standardnotes/domain-core'
import { TimerInterface } from '@standardnotes/time'
import { Logger } from 'winston'
import { TransitionRevisionsFromPrimaryToSecondaryDatabaseForUserDTO } from './TransitionRevisionsFromPrimaryToSecondaryDatabaseForUserDTO'
import { RevisionRepositoryInterface } from '../../../Revision/RevisionRepositoryInterface'
export class TransitionRevisionsFromPrimaryToSecondaryDatabaseForUser implements UseCaseInterface<void> {
constructor(
private primaryRevisionsRepository: RevisionRepositoryInterface,
private secondRevisionsRepository: RevisionRepositoryInterface | null,
private timer: TimerInterface,
private logger: Logger,
) {}
async execute(dto: TransitionRevisionsFromPrimaryToSecondaryDatabaseForUserDTO): Promise<Result<void>> {
if (this.secondRevisionsRepository === null) {
return Result.fail('Secondary revision repository is not set')
}
const userUuidOrError = Uuid.create(dto.userUuid)
if (userUuidOrError.isFailed()) {
return Result.fail(userUuidOrError.getError())
}
const userUuid = userUuidOrError.getValue()
const migrationTimeStart = this.timer.getTimestampInMicroseconds()
const migrationResult = await this.migrateRevisionsForUser(userUuid)
if (migrationResult.isFailed()) {
const cleanupResult = await this.deleteRevisionsForUser(userUuid, this.secondRevisionsRepository)
if (cleanupResult.isFailed()) {
this.logger.error(
`Failed to clean up secondary database revisions for user ${userUuid.value}: ${cleanupResult.getError()}`,
)
}
return Result.fail(migrationResult.getError())
}
const integrityCheckResult = await this.checkIntegrityBetweenPrimaryAndSecondaryDatabase(userUuid)
if (integrityCheckResult.isFailed()) {
const cleanupResult = await this.deleteRevisionsForUser(userUuid, this.secondRevisionsRepository)
if (cleanupResult.isFailed()) {
this.logger.error(
`Failed to clean up secondary database revisions for user ${userUuid.value}: ${cleanupResult.getError()}`,
)
}
return Result.fail(integrityCheckResult.getError())
}
const cleanupResult = await this.deleteRevisionsForUser(userUuid, this.primaryRevisionsRepository)
if (cleanupResult.isFailed()) {
this.logger.error(
`Failed to clean up primary database revisions for user ${userUuid.value}: ${cleanupResult.getError()}`,
)
}
const migrationTimeEnd = this.timer.getTimestampInMicroseconds()
const migrationDuration = migrationTimeEnd - migrationTimeStart
const migrationDurationTimeStructure = this.timer.convertMicrosecondsToTimeStructure(migrationDuration)
this.logger.info(
`Transitioned revisions for user ${userUuid.value} in ${migrationDurationTimeStructure.hours}h ${migrationDurationTimeStructure.minutes}m ${migrationDurationTimeStructure.seconds}s ${migrationDurationTimeStructure.milliseconds}ms`,
)
return Result.ok()
}
private async migrateRevisionsForUser(userUuid: Uuid): Promise<Result<void>> {
try {
const totalRevisionsCountForUser = await this.primaryRevisionsRepository.countByUserUuid(userUuid)
const pageSize = 1
const totalPages = Math.ceil(totalRevisionsCountForUser / pageSize)
for (let currentPage = 1; currentPage <= totalPages; currentPage++) {
const query = {
userUuid: userUuid,
offset: (currentPage - 1) * pageSize,
limit: pageSize,
}
const revisions = await this.primaryRevisionsRepository.findByUserUuid(query)
for (const revision of revisions) {
await (this.secondRevisionsRepository as RevisionRepositoryInterface).save(revision)
}
}
return Result.ok()
} catch (error) {
return Result.fail((error as Error).message)
}
}
private async deleteRevisionsForUser(
userUuid: Uuid,
revisionRepository: RevisionRepositoryInterface,
): Promise<Result<void>> {
try {
await revisionRepository.removeByUserUuid(userUuid)
return Result.ok()
} catch (error) {
return Result.fail((error as Error).message)
}
}
private async checkIntegrityBetweenPrimaryAndSecondaryDatabase(userUuid: Uuid): Promise<Result<boolean>> {
try {
const totalRevisionsCountForUserInPrimary = await this.primaryRevisionsRepository.countByUserUuid(userUuid)
const totalRevisionsCountForUserInSecondary = await (
this.secondRevisionsRepository as RevisionRepositoryInterface
).countByUserUuid(userUuid)
if (totalRevisionsCountForUserInPrimary !== totalRevisionsCountForUserInSecondary) {
return Result.fail(
`Total revisions count for user ${userUuid.value} in primary database (${totalRevisionsCountForUserInPrimary}) does not match total revisions count in secondary database (${totalRevisionsCountForUserInSecondary})`,
)
}
const pageSize = 1
const totalPages = Math.ceil(totalRevisionsCountForUserInPrimary / pageSize)
for (let currentPage = 1; currentPage <= totalPages; currentPage++) {
const query = {
userUuid: userUuid,
offset: (currentPage - 1) * pageSize,
limit: pageSize,
}
const revisions = await this.primaryRevisionsRepository.findByUserUuid(query)
for (const revision of revisions) {
const revisionUuidOrError = Uuid.create(revision.id.toString())
/* istanbul ignore if */
if (revisionUuidOrError.isFailed()) {
return Result.fail(revisionUuidOrError.getError())
}
const revisionUuid = revisionUuidOrError.getValue()
const revisionInSecondary = await (
this.secondRevisionsRepository as RevisionRepositoryInterface
).findOneByUuid(revisionUuid, userUuid)
if (!revisionInSecondary) {
return Result.fail(`Revision ${revision.id.toString()} not found in secondary database`)
}
if (!revision.isIdenticalTo(revisionInSecondary)) {
return Result.fail(`Revision ${revision.id.toString()} is not identical in primary and secondary database`)
}
}
}
return Result.ok()
} catch (error) {
return Result.fail((error as Error).message)
}
}
}

View File

@@ -0,0 +1,3 @@
export interface TransitionRevisionsFromPrimaryToSecondaryDatabaseForUserDTO {
userUuid: string
}

View File

@@ -16,6 +16,28 @@ export class MongoDBRevisionRepository implements RevisionRepositoryInterface {
private logger: Logger,
) {}
async countByUserUuid(userUuid: Uuid): Promise<number> {
return this.mongoRepository.count({ where: { userUuid: { $eq: userUuid.value } } })
}
async findByUserUuid(dto: { userUuid: Uuid; offset?: number; limit?: number }): Promise<Revision[]> {
const mongoRevisions = await this.mongoRepository.find({
where: { userUuid: { $eq: dto.userUuid.value } },
order: {
createdAt: 'ASC',
},
skip: dto.offset,
take: dto.limit,
})
const revisions = []
for (const mongoRevision of mongoRevisions) {
revisions.push(this.revisionMapper.toDomain(mongoRevision))
}
return revisions
}
async removeByUserUuid(userUuid: Uuid): Promise<void> {
await this.mongoRepository.deleteMany({ where: { userUuid: { $eq: userUuid.value } } })
}

View File

@@ -15,6 +15,37 @@ export class SQLRevisionRepository implements RevisionRepositoryInterface {
private logger: Logger,
) {}
async countByUserUuid(userUuid: Uuid): Promise<number> {
return this.ormRepository
.createQueryBuilder()
.where('user_uuid = :userUuid', { userUuid: userUuid.value })
.getCount()
}
async findByUserUuid(dto: { userUuid: Uuid; offset?: number; limit?: number }): Promise<Revision[]> {
const queryBuilder = this.ormRepository
.createQueryBuilder()
.where('user_uuid = :userUuid', { userUuid: dto.userUuid.value })
.orderBy('created_at', 'ASC')
if (dto.offset !== undefined) {
queryBuilder.skip(dto.offset)
}
if (dto.limit !== undefined) {
queryBuilder.take(dto.limit)
}
const sqlRevisions = await queryBuilder.getMany()
const revisions = []
for (const sqlRevision of sqlRevisions) {
revisions.push(this.revisionMapper.toDomain(sqlRevision))
}
return revisions
}
async updateUserUuid(itemUuid: Uuid, userUuid: Uuid): Promise<void> {
await this.ormRepository
.createQueryBuilder()

View File

@@ -14,21 +14,22 @@ import { DomainEventFactoryInterface } from './DomainEventFactoryInterface'
export class DomainEventFactory implements DomainEventFactoryInterface {
constructor(private timer: TimerInterface) {}
createTransitionStatusUpdatedEvent(userUuid: string, status: 'FINISHED' | 'FAILED'): TransitionStatusUpdatedEvent {
createTransitionStatusUpdatedEvent(dto: {
userUuid: string
transitionType: 'items' | 'revisions'
status: 'STARTED' | 'FAILED' | 'FINISHED'
}): TransitionStatusUpdatedEvent {
return {
type: 'TRANSITION_STATUS_UPDATED',
createdAt: this.timer.getUTCDate(),
meta: {
correlation: {
userIdentifier: userUuid,
userIdentifier: dto.userUuid,
userIdentifierType: 'uuid',
},
origin: DomainEventService.SyncingServer,
},
payload: {
userUuid,
status,
},
payload: dto,
}
}

View File

@@ -8,10 +8,11 @@ import {
} from '@standardnotes/domain-events'
export interface DomainEventFactoryInterface {
createTransitionStatusUpdatedEvent(
userUuid: string,
status: 'STARTED' | 'FAILED' | 'FINISHED',
): TransitionStatusUpdatedEvent
createTransitionStatusUpdatedEvent(dto: {
userUuid: string
transitionType: 'items' | 'revisions'
status: 'STARTED' | 'FAILED' | 'FINISHED'
}): TransitionStatusUpdatedEvent
createEmailRequestedEvent(dto: {
userEmail: string
messageIdentifier: string

View File

@@ -16,7 +16,7 @@ export class TransitionStatusUpdatedEventHandler implements DomainEventHandlerIn
) {}
async handle(event: TransitionStatusUpdatedEvent): Promise<void> {
if (event.payload.status === 'STARTED') {
if (event.payload.status === 'STARTED' && event.payload.transitionType === 'items') {
const result = await this.transitionItemsFromPrimaryToSecondaryDatabaseForUser.execute({
userUuid: event.payload.userUuid,
})
@@ -25,14 +25,22 @@ export class TransitionStatusUpdatedEventHandler implements DomainEventHandlerIn
this.logger.error(`Failed to transition items for user ${event.payload.userUuid}: ${result.getError()}`)
await this.domainEventPublisher.publish(
this.domainEventFactory.createTransitionStatusUpdatedEvent(event.payload.userUuid, 'FAILED'),
this.domainEventFactory.createTransitionStatusUpdatedEvent({
userUuid: event.payload.userUuid,
status: 'FAILED',
transitionType: 'items',
}),
)
return
}
await this.domainEventPublisher.publish(
this.domainEventFactory.createTransitionStatusUpdatedEvent(event.payload.userUuid, 'FINISHED'),
this.domainEventFactory.createTransitionStatusUpdatedEvent({
userUuid: event.payload.userUuid,
status: 'FINISHED',
transitionType: 'items',
}),
)
}
}

View File

@@ -11,7 +11,11 @@ export class TriggerTransitionFromPrimaryToSecondaryDatabaseForUser implements U
) {}
async execute(dto: TriggerTransitionFromPrimaryToSecondaryDatabaseForUserDTO): Promise<Result<void>> {
const event = this.domainEventFactory.createTransitionStatusUpdatedEvent(dto.userUuid, 'STARTED')
const event = this.domainEventFactory.createTransitionStatusUpdatedEvent({
userUuid: dto.userUuid,
status: 'STARTED',
transitionType: 'items',
})
await this.domainEventPubliser.publish(event)