Options
All
  • Public
  • Public/Protected
  • All
Menu

Class TopicSagaConsumer<Payload, Context>

Type parameters

  • Payload

  • Context: Record<string, any> = Record<string, any>

Hierarchy

  • TopicSagaConsumer

Index

Constructors

constructor

  • Parameters

    • __namedParameters: { kafka: Kafka; loggerConfig: undefined | ILoggerConfig; middlewares: Middleware<IEffectDescription, SagaContext<Context>, any>[]; saga: Saga<Payload, SagaContext<Context>>; topic: string; topicAdministrator: undefined | TopicAdministrator; getContext: any; consumerConfig: object; producerConfig: object }
      • kafka: Kafka
      • loggerConfig: undefined | ILoggerConfig
      • middlewares: Middleware<IEffectDescription, SagaContext<Context>, any>[]
      • saga: Saga<Payload, SagaContext<Context>>
      • topic: string
      • topicAdministrator: undefined | TopicAdministrator
      • getContext: function
        • getContext(): Promise<Context>
      • consumerConfig: object
        • allowAutoTopicCreation: false

          Allows main consumer and action channel consumers to create new topics.

        • consumptionTimeoutMs: number

          How much time should be given to a saga to complete before a consumer is considered unhealthy and killed?

          Providing -1 will allow a saga to run indefinitely.

        • groupId: string

          Is this a special consumer group? Use case: Provide a custom consumerGroup if this saga is not the primary consumer of an event. For instance, you may want to have multiple different reactions to an event aside from the primary work to kick off notifactions.

        • heartbeatInterval: number

          How often should heartbeats be sent back to the broker?

        • maxWaitTimeInMs: number

          How long should the broker wait before responding in the case of too small a number of records to return?

      • producerConfig: object
        • allowAutoTopicCreation: false

          Allows producer to create new topics.

        • flushIntervalMs: number

          How often should produced message batches be sent out?

        • maxOutgoingBatchSize: number

          When batching produced messages (with the PUT effect), how many should be flushed at a time?

    Returns TopicSagaConsumer

Properties

Protected Optional backgroundHeartbeat

backgroundHeartbeat: NodeJS.Timeout

Protected consumer

consumer: Consumer

Protected consumerConfig

consumerConfig: ConsumerConfig

Protected consumerPool

consumerPool: ConsumerPool

Protected consumptionTimeoutMs

consumptionTimeoutMs: number

eventEmitter

eventEmitter: TypedEventEmitter<{ comitted_offsets: (...args: any[]) => void; completed_saga: (...args: any[]) => void; consumed_message: (consumptionEvent: IConsumptionEvent<Payload>) => void; started_saga: (...args: any[]) => void }> = new EventEmitter() as TypedEmitter<{comitted_offsets: (...args: any[]) => void;started_saga: (...args: any[]) => void;completed_saga: (...args: any[]) => void;consumed_message: (consumptionEvent: IConsumptionEvent<Payload>) => void;}>

Protected getContext

getContext: (message: KafkaMessage) => Promise<Context>

Type declaration

    • (message: KafkaMessage): Promise<Context>
    • Parameters

      • message: KafkaMessage

      Returns Promise<Context>

Protected logger

logger: ReturnType<typeof pino>

Protected middlewares

middlewares: Array<Middleware<IEffectDescription, SagaContext<Context>>>

Protected producerConfig

producerConfig: SagaProducerConfig

Protected saga

saga: Saga<Payload, SagaContext<Context>>

Protected throttledProducer

throttledProducer: ThrottledProducer

Protected topic

topic: string

Protected topicAdminstrator

topicAdminstrator: TopicAdministrator

Methods

disconnect

  • disconnect(): Promise<void>

Private eachMessage

  • eachMessage(runner: SagaRunner<Payload, SagaContext<Context>>, __namedParameters: { message: KafkaMessage; partition: number }): Promise<void>

run

  • run(): Promise<void>
  • Catching and crashing is left to consumers of this class so that they can log as they see fit.

    Returns Promise<void>

Generated using TypeDoc