fix: disable opentelemetry tracing on async worker jobs

This commit is contained in:
Karol Sójko
2023-10-12 09:41:15 +02:00
parent 63201934a5
commit e0b19ef011
21 changed files with 7 additions and 117 deletions

View File

@@ -16,8 +16,6 @@ import TYPES from './Types'
import { AppDataSource } from './DataSource'
import { DomainEventFactory } from '../Domain/Event/DomainEventFactory'
import {
OpenTelemetryPropagation,
OpenTelemetryPropagationInterface,
SNSOpenTelemetryDomainEventPublisher,
SQSDomainEventSubscriberFactory,
SQSOpenTelemetryEventMessageHandler,
@@ -89,10 +87,6 @@ export class ContainerConfigLoader {
})
container.bind<winston.Logger>(TYPES.Logger).toConstantValue(logger)
container
.bind<OpenTelemetryPropagationInterface>(TYPES.OTEL_PROPAGATOR)
.toConstantValue(new OpenTelemetryPropagation())
const snsConfig: SNSClientConfig = {
apiVersion: 'latest',
region: env.get('SNS_AWS_REGION', true),
@@ -146,11 +140,7 @@ export class ContainerConfigLoader {
container
.bind<DomainEventPublisherInterface>(TYPES.DomainEventPublisher)
.toConstantValue(
new SNSOpenTelemetryDomainEventPublisher(
container.get<OpenTelemetryPropagationInterface>(TYPES.OTEL_PROPAGATOR),
container.get(TYPES.SNS),
container.get(TYPES.SNS_TOPIC_ARN),
),
new SNSOpenTelemetryDomainEventPublisher(container.get(TYPES.SNS), container.get(TYPES.SNS_TOPIC_ARN)),
)
if (env.get('MIXPANEL_TOKEN', true)) {
container.bind<Mixpanel>(TYPES.MixpanelClient).toConstantValue(Mixpanel.init(env.get('MIXPANEL_TOKEN', true)))
@@ -251,7 +241,6 @@ export class ContainerConfigLoader {
.toConstantValue(
new SQSOpenTelemetryEventMessageHandler(
ServiceIdentifier.NAMES.AnalyticsWorker,
container.get<OpenTelemetryPropagationInterface>(TYPES.OTEL_PROPAGATOR),
eventHandlers,
container.get(TYPES.Logger),
),

View File

@@ -3,7 +3,6 @@ const TYPES = {
Redis: Symbol.for('Redis'),
SNS: Symbol.for('SNS'),
SQS: Symbol.for('SQS'),
OTEL_PROPAGATOR: Symbol.for('OTEL_PROPAGATOR'),
// env vars
REDIS_URL: Symbol.for('REDIS_URL'),
SNS_TOPIC_ARN: Symbol.for('SNS_TOPIC_ARN'),

View File

@@ -89,8 +89,6 @@ import { ExtensionKeyGrantedEventHandler } from '../Domain/Handler/ExtensionKeyG
import {
DirectCallDomainEventPublisher,
DirectCallEventMessageHandler,
OpenTelemetryPropagation,
OpenTelemetryPropagationInterface,
SNSOpenTelemetryDomainEventPublisher,
SQSDomainEventSubscriberFactory,
SQSEventMessageHandler,
@@ -310,10 +308,6 @@ export class ContainerConfigLoader {
}
container.bind<winston.Logger>(TYPES.Auth_Logger).toConstantValue(logger)
container
.bind<OpenTelemetryPropagationInterface>(TYPES.Auth_OTEL_PROPAGATOR)
.toConstantValue(new OpenTelemetryPropagation())
const appDataSource = new AppDataSource({ env, runMigrations: this.mode === 'server' })
await appDataSource.initialize()
@@ -732,7 +726,6 @@ export class ContainerConfigLoader {
isConfiguredForHomeServer
? directCallDomainEventPublisher
: new SNSOpenTelemetryDomainEventPublisher(
container.get<OpenTelemetryPropagationInterface>(TYPES.Auth_OTEL_PROPAGATOR),
container.get(TYPES.Auth_SNS),
container.get(TYPES.Auth_SNS_TOPIC_ARN),
),
@@ -1238,7 +1231,6 @@ export class ContainerConfigLoader {
? new SQSEventMessageHandler(eventHandlers, container.get(TYPES.Auth_Logger))
: new SQSOpenTelemetryEventMessageHandler(
ServiceIdentifier.NAMES.AuthWorker,
container.get<OpenTelemetryPropagationInterface>(TYPES.Auth_OTEL_PROPAGATOR),
eventHandlers,
container.get(TYPES.Auth_Logger),
),

View File

@@ -3,7 +3,6 @@ const TYPES = {
Auth_Redis: Symbol.for('Auth_Redis'),
Auth_SNS: Symbol.for('Auth_SNS'),
Auth_SQS: Symbol.for('Auth_SQS'),
Auth_OTEL_PROPAGATOR: Symbol.for('Auth_OTEL_PROPAGATOR'),
// Mapping
Auth_SessionTracePersistenceMapper: Symbol.for('Auth_SessionTracePersistenceMapper'),
Auth_AuthenticatorChallengePersistenceMapper: Symbol.for('Auth_AuthenticatorChallengePersistenceMapper'),

View File

@@ -1,19 +0,0 @@
import * as OpenTelemetryApi from '@opentelemetry/api'
import { OpenTelemetryPropagationInterface } from './OpenTelemetryPropagationInterface'
export class OpenTelemetryPropagation implements OpenTelemetryPropagationInterface {
inject(): Record<string, string> {
const output = {}
OpenTelemetryApi.propagation.inject(OpenTelemetryApi.context.active(), output)
return output
}
extract(input: Record<string, string>): OpenTelemetryApi.Context {
const activeContext = OpenTelemetryApi.propagation.extract(OpenTelemetryApi.context.active(), input)
return activeContext
}
}

View File

@@ -1,6 +0,0 @@
import { Context } from '@opentelemetry/api'
export interface OpenTelemetryPropagationInterface {
inject(): Record<string, string>
extract(input: Record<string, string>): Context
}

View File

@@ -6,11 +6,11 @@ export class OpenTelemetryTracer implements OpenTelemetryTracerInterface {
private parentSpan: OpenTelemetryApi.Span | undefined
private internalSpan: OpenTelemetryApi.Span | undefined
startSpan(parentSpanName: string, internalSpanName: string, activeContext?: OpenTelemetryApi.Context): void {
startSpan(parentSpanName: string, internalSpanName: string): void {
const tracer = OpenTelemetryApi.trace.getTracer(`${parentSpanName}-handler`)
this.parentSpan = tracer.startSpan(parentSpanName, { kind: OpenTelemetryApi.SpanKind.CONSUMER }, activeContext)
const ctx = OpenTelemetryApi.trace.setSpan(activeContext ?? OpenTelemetryApi.context.active(), this.parentSpan)
this.parentSpan = tracer.startSpan(parentSpanName, { kind: OpenTelemetryApi.SpanKind.CONSUMER })
const ctx = OpenTelemetryApi.trace.setSpan(OpenTelemetryApi.context.active(), this.parentSpan)
this.internalSpan = tracer.startSpan(internalSpanName, { kind: OpenTelemetryApi.SpanKind.INTERNAL }, ctx)
}

View File

@@ -1,7 +1,5 @@
import { Context } from '@opentelemetry/api'
export interface OpenTelemetryTracerInterface {
startSpan(parentSpanName: string, internalSpanName: string, activeContext?: Context): void
startSpan(parentSpanName: string, internalSpanName: string): void
stopSpan(): void
stopSpanWithError(error: Error): void
}

View File

@@ -2,21 +2,14 @@ import * as zlib from 'zlib'
import { MessageAttributeValue, PublishCommand, PublishCommandInput, SNSClient } from '@aws-sdk/client-sns'
import { DomainEventInterface, DomainEventPublisherInterface } from '@standardnotes/domain-events'
import { OpenTelemetryPropagationInterface } from '../OpenTelemetry/OpenTelemetryPropagationInterface'
export class SNSOpenTelemetryDomainEventPublisher implements DomainEventPublisherInterface {
constructor(
private propagator: OpenTelemetryPropagationInterface,
private snsClient: SNSClient,
private topicArn: string,
) {}
async publish(event: DomainEventInterface): Promise<void> {
const trace = this.propagator.inject()
if (Object.keys(trace).length > 0) {
event.meta.trace = trace
}
const message: PublishCommandInput = {
TopicArn: this.topicArn,
MessageAttributes: {

View File

@@ -7,14 +7,12 @@ import {
} from '@standardnotes/domain-events'
import { OpenTelemetryTracer } from '../OpenTelemetry/OpenTelemetryTracer'
import { OpenTelemetryTracerInterface } from '../OpenTelemetry/OpenTelemetryTracerInterface'
import { OpenTelemetryPropagationInterface } from '../OpenTelemetry/OpenTelemetryPropagationInterface'
export class SQSOpenTelemetryEventMessageHandler implements DomainEventMessageHandlerInterface {
private tracer: OpenTelemetryTracerInterface | undefined
constructor(
private serviceName: string,
private propagator: OpenTelemetryPropagationInterface,
private handlers: Map<string, DomainEventHandlerInterface>,
private logger: Logger,
) {}
@@ -39,16 +37,7 @@ export class SQSOpenTelemetryEventMessageHandler implements DomainEventMessageHa
this.tracer = new OpenTelemetryTracer()
let activeContext = undefined
if (domainEvent.meta.trace) {
this.logger.debug(`Event has trace: ${JSON.stringify(domainEvent.meta.trace)}`)
activeContext = this.propagator.extract(domainEvent.meta.trace)
this.logger.info(`Injecting pre-existing active context into trace: ${JSON.stringify(activeContext)}`)
}
this.tracer.startSpan(this.serviceName, domainEvent.type, activeContext)
this.tracer.startSpan(this.serviceName, domainEvent.type)
try {
await handler.handle(domainEvent)

View File

@@ -1,8 +1,6 @@
export * from './DirectCall/DirectCallDomainEventPublisher'
export * from './DirectCall/DirectCallEventMessageHandler'
export * from './OpenTelemetry/OpenTelemetryPropagation'
export * from './OpenTelemetry/OpenTelemetryPropagationInterface'
export * from './OpenTelemetry/OpenTelemetrySDK'
export * from './OpenTelemetry/OpenTelemetrySDKInterface'
export * from './OpenTelemetry/OpenTelemetryTracer'

View File

@@ -9,7 +9,6 @@ export interface DomainEventInterface {
userIdentifier: string
userIdentifierType: 'uuid' | 'email' | 'shared-vault-uuid'
}
trace?: Record<string, string>
origin: DomainEventService
target?: DomainEventService
}

View File

@@ -16,8 +16,6 @@ import { DomainEventFactory } from '../Domain/Event/DomainEventFactory'
import {
DirectCallDomainEventPublisher,
DirectCallEventMessageHandler,
OpenTelemetryPropagation,
OpenTelemetryPropagationInterface,
SNSOpenTelemetryDomainEventPublisher,
SQSDomainEventSubscriberFactory,
SQSEventMessageHandler,
@@ -100,10 +98,6 @@ export class ContainerConfigLoader {
container.bind<TimerInterface>(TYPES.Files_Timer).toConstantValue(new Timer())
container
.bind<OpenTelemetryPropagationInterface>(TYPES.Files_OTEL_PROPAGATOR)
.toConstantValue(new OpenTelemetryPropagation())
// services
container
.bind<TokenDecoderInterface<ValetTokenData>>(TYPES.Files_ValetTokenDecoder)
@@ -183,7 +177,6 @@ export class ContainerConfigLoader {
.bind<DomainEventPublisherInterface>(TYPES.Files_DomainEventPublisher)
.toConstantValue(
new SNSOpenTelemetryDomainEventPublisher(
container.get<OpenTelemetryPropagationInterface>(TYPES.Files_OTEL_PROPAGATOR),
container.get(TYPES.Files_SNS),
container.get(TYPES.Files_SNS_TOPIC_ARN),
),
@@ -310,7 +303,6 @@ export class ContainerConfigLoader {
? new SQSEventMessageHandler(eventHandlers, container.get(TYPES.Files_Logger))
: new SQSOpenTelemetryEventMessageHandler(
ServiceIdentifier.NAMES.FilesWorker,
container.get<OpenTelemetryPropagationInterface>(TYPES.Files_OTEL_PROPAGATOR),
eventHandlers,
container.get(TYPES.Files_Logger),
),

View File

@@ -5,7 +5,6 @@ const TYPES = {
Files_S3: Symbol.for('Files_S3'),
Files_SNS: Symbol.for('Files_SNS'),
Files_SQS: Symbol.for('Files_SQS'),
Files_OTEL_PROPAGATOR: Symbol.for('Files_OTEL_PROPAGATOR'),
// use cases
Files_UploadFileChunk: Symbol.for('Files_UploadFileChunk'),

View File

@@ -40,9 +40,7 @@ import {
DirectCallEventMessageHandler,
DirectCallDomainEventPublisher,
SQSOpenTelemetryEventMessageHandler,
OpenTelemetryPropagation,
SNSOpenTelemetryDomainEventPublisher,
OpenTelemetryPropagationInterface,
} from '@standardnotes/domain-events-infra'
import { DumpRepositoryInterface } from '../Domain/Dump/DumpRepositoryInterface'
import { AccountDeletionRequestedEventHandler } from '../Domain/Handler/AccountDeletionRequestedEventHandler'
@@ -142,10 +140,6 @@ export class ContainerConfigLoader {
container.bind<TimerInterface>(TYPES.Revisions_Timer).toDynamicValue(() => new Timer())
container
.bind<OpenTelemetryPropagationInterface>(TYPES.Revisions_OTEL_PROPAGATOR)
.toConstantValue(new OpenTelemetryPropagation())
const appDataSource = new AppDataSource({ env, runMigrations: this.mode === 'server' })
await appDataSource.initialize()
@@ -187,7 +181,6 @@ export class ContainerConfigLoader {
.bind<DomainEventPublisherInterface>(TYPES.Revisions_DomainEventPublisher)
.toDynamicValue((context: interfaces.Context) => {
return new SNSOpenTelemetryDomainEventPublisher(
context.container.get<OpenTelemetryPropagationInterface>(TYPES.Revisions_OTEL_PROPAGATOR),
context.container.get(TYPES.Revisions_SNS),
context.container.get(TYPES.Revisions_SNS_TOPIC_ARN),
)
@@ -522,7 +515,6 @@ export class ContainerConfigLoader {
? new SQSEventMessageHandler(eventHandlers, container.get(TYPES.Revisions_Logger))
: new SQSOpenTelemetryEventMessageHandler(
ServiceIdentifier.NAMES.RevisionsWorker,
container.get<OpenTelemetryPropagationInterface>(TYPES.Revisions_OTEL_PROPAGATOR),
eventHandlers,
container.get(TYPES.Revisions_Logger),
),

View File

@@ -6,7 +6,6 @@ const TYPES = {
Revisions_SNS: Symbol.for('Revisions_SNS'),
Revisions_S3: Symbol.for('Revisions_S3'),
Revisions_Env: Symbol.for('Revisions_Env'),
Revisions_OTEL_PROPAGATOR: Symbol.for('Revisions_OTEL_PROPAGATOR'),
// Map
Revisions_SQLLegacyRevisionMetadataPersistenceMapper: Symbol.for(
'Revisions_SQLLegacyRevisionMetadataPersistenceMapper',

View File

@@ -15,8 +15,6 @@ import TYPES from './Types'
import { AppDataSource } from './DataSource'
import { DomainEventFactory } from '../Domain/Event/DomainEventFactory'
import {
OpenTelemetryPropagation,
OpenTelemetryPropagationInterface,
SNSOpenTelemetryDomainEventPublisher,
SQSDomainEventSubscriberFactory,
SQSOpenTelemetryEventMessageHandler,
@@ -68,10 +66,6 @@ export class ContainerConfigLoader {
})
container.bind<winston.Logger>(TYPES.Logger).toConstantValue(logger)
container
.bind<OpenTelemetryPropagationInterface>(TYPES.Scheduler_OTEL_PROPAGATOR)
.toConstantValue(new OpenTelemetryPropagation())
if (env.get('SNS_TOPIC_ARN', true)) {
const snsConfig: SNSClientConfig = {
apiVersion: 'latest',
@@ -143,11 +137,7 @@ export class ContainerConfigLoader {
container
.bind<DomainEventPublisherInterface>(TYPES.DomainEventPublisher)
.toConstantValue(
new SNSOpenTelemetryDomainEventPublisher(
container.get<OpenTelemetryPropagationInterface>(TYPES.Scheduler_OTEL_PROPAGATOR),
container.get(TYPES.SNS),
container.get(TYPES.SNS_TOPIC_ARN),
),
new SNSOpenTelemetryDomainEventPublisher(container.get(TYPES.SNS), container.get(TYPES.SNS_TOPIC_ARN)),
)
const eventHandlers: Map<string, DomainEventHandlerInterface> = new Map([
@@ -162,7 +152,6 @@ export class ContainerConfigLoader {
.toConstantValue(
new SQSOpenTelemetryEventMessageHandler(
ServiceIdentifier.NAMES.SchedulerWorker,
container.get<OpenTelemetryPropagationInterface>(TYPES.Scheduler_OTEL_PROPAGATOR),
eventHandlers,
container.get(TYPES.Logger),
),

View File

@@ -3,7 +3,6 @@ const TYPES = {
Redis: Symbol.for('Redis'),
SNS: Symbol.for('SNS'),
SQS: Symbol.for('SQS'),
Scheduler_OTEL_PROPAGATOR: Symbol.for('Scheduler_OTEL_PROPAGATOR'),
// env vars
REDIS_URL: Symbol.for('REDIS_URL'),
SNS_TOPIC_ARN: Symbol.for('SNS_TOPIC_ARN'),

View File

@@ -13,8 +13,6 @@ import { Item } from '../Domain/Item/Item'
import {
DirectCallDomainEventPublisher,
DirectCallEventMessageHandler,
OpenTelemetryPropagation,
OpenTelemetryPropagationInterface,
SNSOpenTelemetryDomainEventPublisher,
SQSDomainEventSubscriberFactory,
SQSEventMessageHandler,
@@ -218,10 +216,6 @@ export class ContainerConfigLoader {
}
container.bind<winston.Logger>(TYPES.Sync_Logger).toConstantValue(logger)
container
.bind<OpenTelemetryPropagationInterface>(TYPES.Sync_OTEL_PROPAGATOR)
.toConstantValue(new OpenTelemetryPropagation())
const appDataSource = new AppDataSource({ env, runMigrations: this.mode === 'server' })
await appDataSource.initialize()
@@ -293,7 +287,6 @@ export class ContainerConfigLoader {
.bind<DomainEventPublisherInterface>(TYPES.Sync_DomainEventPublisher)
.toDynamicValue((context: interfaces.Context) => {
return new SNSOpenTelemetryDomainEventPublisher(
context.container.get<OpenTelemetryPropagationInterface>(TYPES.Sync_OTEL_PROPAGATOR),
context.container.get(TYPES.Sync_SNS),
context.container.get(TYPES.Sync_SNS_TOPIC_ARN),
)
@@ -1164,7 +1157,6 @@ export class ContainerConfigLoader {
? new SQSEventMessageHandler(eventHandlers, container.get(TYPES.Sync_Logger))
: new SQSOpenTelemetryEventMessageHandler(
ServiceIdentifier.NAMES.SyncingServerWorker,
container.get<OpenTelemetryPropagationInterface>(TYPES.Sync_OTEL_PROPAGATOR),
eventHandlers,
container.get(TYPES.Sync_Logger),
),

View File

@@ -6,7 +6,6 @@ const TYPES = {
Sync_SQS: Symbol.for('Sync_SQS'),
Sync_S3: Symbol.for('Sync_S3'),
Sync_Env: Symbol.for('Sync_Env'),
Sync_OTEL_PROPAGATOR: Symbol.for('Sync_OTEL_PROPAGATOR'),
// Repositories
Sync_ItemRepositoryResolver: Symbol.for('Sync_ItemRepositoryResolver'),
Sync_SQLItemRepository: Symbol.for('Sync_SQLItemRepository'),

View File

@@ -19,7 +19,6 @@ import { AddWebSocketsConnection } from '../Domain/UseCase/AddWebSocketsConnecti
import { RemoveWebSocketsConnection } from '../Domain/UseCase/RemoveWebSocketsConnection/RemoveWebSocketsConnection'
import { WebSocketsClientMessenger } from '../Infra/WebSockets/WebSocketsClientMessenger'
import {
OpenTelemetryPropagation,
SQSDomainEventSubscriberFactory,
SQSOpenTelemetryEventMessageHandler,
} from '@standardnotes/domain-events-infra'
@@ -148,7 +147,6 @@ export class ContainerConfigLoader {
.toConstantValue(
new SQSOpenTelemetryEventMessageHandler(
ServiceIdentifier.NAMES.WebsocketsWorker,
new OpenTelemetryPropagation(),
eventHandlers,
container.get(TYPES.Logger),
),