-
Notifications
You must be signed in to change notification settings - Fork 10
PgPubSub
Globals / PgPubSub
Implements LISTEN/NOTIFY client for PostgreSQL connections.
It is a basic public interface of this library, so the end-user is going to work with this class directly to solve his/her tasks.
Importing:
import { AnyJson, PgPubSub } from '@imqueue/pg-pubsub';Instantiation:
const pubSub = new PgPubSub(options)see PgPubSubOptions
Connecting and listening:
pubSub.on('connect', async () => {
await pubSub.listen('ChannelOne');
await pubSub.listen('ChannelTwo');
});
// or, even better:
pubSub.on('connect', async () => {
await Promise.all(
['ChannelOne', 'ChannelTwo'].map(channel => channel.listen()),
);
});
// or. less reliable:
await pubSub.connect();
await Promise.all(
['ChannelOne', 'ChannelTwo'].map(channel => channel.listen()),
);Handle messages:
pubSub.on('message', (channel: string, payload: AnyJson) =>
console.log(channel, payload);
);
// or, using channels
pubSub.channels.on('ChannelOne', (payload: AnyJson) =>
console.log(1, payload),
);
pubSub.channels.on('ChannelTwo', (payload: AnyJson) =>
console.log(2, payload),
);Destroying:
await pubSub.destroy();Closing and re-using connection:
await pubSub.close();
await pubSub.connect();This close/connect technique may be used when doing some heavy message handling, so while you close, another running copy may handle next messages...
-
EventEmitter
↳ PgPubSub
- activeChannels
- addListener
- allChannels
- close
- connect
- destroy
- emit
- eventNames
- getMaxListeners
- inactiveChannels
- isActive
- listen
- listenerCount
- listeners
- notify
- off
- on
- once
- prependListener
- prependOnceListener
- rawListeners
- removeAllListeners
- removeListener
- setMaxListeners
- unlisten
- unlistenAll
- listenerCount
+ new PgPubSub(options: Partial<PgPubSubOptions>, logger?: AnyLogger): PgPubSub
Overrides PgChannelEmitter.constructor
| Name | Type | Default value | Description |
|---|---|---|---|
options |
Partial<PgPubSubOptions> | - | options |
logger |
AnyLogger | console | logger |
Returns: PgPubSub
• close(): void
'close' event, occurs each time connection closed. Differs from 'end'
event, because 'end' event may occur many times during re-connectable
connection process, but 'close' event states that connection was
safely programmatically closed and further re-connections won't happen.
asmemberof PgPubSub
Returns: void
• connect(): void
'connect' event, occurs each time database connection is established.
asmemberof PgPubSub
Returns: void
• end(): void
'end' event, occurs whenever pg connection ends, so, literally it's simply
proxy to 'end' event from pg.Client
asmemberof PgPubSub
Returns: void
• error(err: Error): void
'error' event occurs each time connection error is happened
asmemberof PgPubSub
| Name | Type | Description |
|---|---|---|
err |
Error | error occurred during connection |
Returns: void
• listen(channels: string[]): void
'listen' event occurs each time channel starts being listening
asmemberof PgPubSub
| Name | Type | Description |
|---|---|---|
channels |
string[] | list of channels being started listening |
Returns: void
• message(chan: string, payload: AnyJson): void
'message' event occurs each time database connection gets notification
to any listening channel. Fired before channel event emitted.
asmemberof PgPubSub
| Name | Type | Description |
|---|---|---|
chan |
string | channel to which notification corresponding to |
payload |
AnyJson | notification message payload |
Returns: void
• notify(chan: string, payload: AnyJson): void
'notify' event occurs each time new message has been published to a
particular channel. Occurs right after database NOTIFY command succeeded.
asmemberof PgPubSub
| Name | Type | Description |
|---|---|---|
chan |
string | channel to which notification was sent |
payload |
AnyJson | notification message payload |
Returns: void
• reconnect(retries: number): void
'reconnect' event occurs each time, when the connection is successfully
established after connection retry. It is followed by a corresponding
'connect' event, but after all possible channel locks finished their
attempts to be re-acquired.
asmemberof PgPubSub
| Name | Type | Description |
|---|---|---|
retries |
number | number of retries made before re-connect succeeded |
Returns: void
• unlisten(channels: string[]): void
'unlisten' event occurs each time channel ends being listening
asmemberof PgPubSub
| Name | Type | Description |
|---|---|---|
channels |
string[] | list of channels being stopped listening |
Returns: void
• Readonly channels: PgChannelEmitter = new PgChannelEmitter()
• Readonly logger: AnyLogger
logger
• Readonly options: PgPubSubOptions
• Readonly pgClient: PgClient
▪ Static defaultMaxListeners: number
Inherited from PgChannelEmitter.defaultMaxListeners
▪ Static Readonly errorMonitor: unique symbol
Inherited from PgChannelEmitter.errorMonitor
This symbol shall be used to install a listener for only monitoring 'error'
events. Listeners installed using this symbol are called before the regular
'error' listeners are called.
Installing a listener using this symbol does not change the behavior once an
'error' event is emitted, therefore the process will still crash if no
regular 'error' listener is installed.
▸ activeChannels(): string[]
Returns list of all active subscribed channels
Returns: string[]
▸ addListener(event: string | symbol, listener: (...args: any[]) => void): this
Inherited from PgClient.addListener
| Name | Type |
|---|---|
event |
string | symbol |
listener |
(...args: any[]) => void |
Returns: this
▸ allChannels(): string[]
Returns list of all known channels, despite the fact they are listening (active) or not (inactive).
Returns: string[]
▸ close(): Promise<void>
Safely closes this database connection
Returns: Promise<void>
▸ connect(): Promise<void>
Establishes re-connectable database connection
Returns: Promise<void>
▸ destroy(): Promise<void>
Destroys this object properly, destroying all locks, closing all connections and removing all event listeners to avoid memory leaking. So whenever you need to destroy an object programmatically - use this method. Note, that after destroy it is broken and should be removed from memory.
Returns: Promise<void>
▸ emit(event: string | symbol, ...args: any[]): boolean
| Name | Type |
|---|---|
event |
string | symbol |
...args |
any[] |
Returns: boolean
▸ eventNames(): Array<string | symbol>
Inherited from PgClient.eventNames
Returns: Array<string | symbol>
▸ getMaxListeners(): number
Inherited from PgClient.getMaxListeners
Returns: number
▸ inactiveChannels(): string[]
Returns list of all inactive channels (those which are known, but not actively listening at a time)
Returns: string[]
▸ isActive(channel?: undefined | string): boolean
If channel argument passed will return true if channel is in active state (listening by this pub/sub), false - otherwise. If channel is not specified - will return true if there is at least one active channel listened by this pub/sub, false - otherwise.
| Name | Type |
|---|---|
channel? |
undefined | string |
Returns: boolean
▸ listen(channel: string): Promise<void>
Starts listening given channel. If singleListener option is set to true, it guarantees that only one process would be able to listen this channel at a time.
| Name | Type | Description |
|---|---|---|
channel |
string | channel name to listen |
Returns: Promise<void>
▸ listenerCount(event: string | symbol): number
Inherited from PgClient.listenerCount
| Name | Type |
|---|---|
event |
string | symbol |
Returns: number
▸ listeners(event: string | symbol): Function[]
Inherited from PgClient.listeners
| Name | Type |
|---|---|
event |
string | symbol |
Returns: Function[]
▸ notify(channel: string, payload: AnyJson): Promise<void>
Performs NOTIFY to a given channel with a given payload to all listening subscribers
| Name | Type | Description |
|---|---|---|
channel |
string | channel to publish to |
payload |
AnyJson | payload to publish for subscribers |
Returns: Promise<void>
▸ off(event: string | symbol, listener: (...args: any[]) => void): this
| Name | Type |
|---|---|
event |
string | symbol |
listener |
(...args: any[]) => void |
Returns: this
▸ on(event: "end", listener: typeof end): this
Overrides void
Sets 'end' event handler
| Name | Type |
|---|---|
event |
"end" |
listener |
typeof end |
Returns: this
▸ on(event: "connect", listener: typeof connect): this
Overrides void
Sets 'connect' event handler
| Name | Type |
|---|---|
event |
"connect" |
listener |
typeof connect |
Returns: this
▸ on(event: "close", listener: typeof close): this
Overrides void
Sets 'close' event handler
| Name | Type |
|---|---|
event |
"close" |
listener |
typeof close |
Returns: this
▸ on(event: "listen", listener: typeof listen): this
Overrides void
Sets 'listen' event handler
| Name | Type |
|---|---|
event |
"listen" |
listener |
typeof listen |
Returns: this
▸ on(event: "unlisten", listener: typeof unlisten): this
Overrides void
Sets 'unlisten' event handler
| Name | Type |
|---|---|
event |
"unlisten" |
listener |
typeof unlisten |
Returns: this
▸ on(event: "error", listener: typeof error): this
Overrides void
Sets 'error' event handler
| Name | Type |
|---|---|
event |
"error" |
listener |
typeof error |
Returns: this
▸ on(event: "reconnect", listener: typeof reconnect): this
Overrides void
Sets 'reconnect' event handler
| Name | Type |
|---|---|
event |
"reconnect" |
listener |
typeof reconnect |
Returns: this
▸ on(event: "message", listener: typeof message): this
Overrides void
Sets 'message' event handler
| Name | Type |
|---|---|
event |
"message" |
listener |
typeof message |
Returns: this
▸ on(event: "notify", listener: typeof notify): this
Overrides void
Sets 'notify' event handler
| Name | Type |
|---|---|
event |
"notify" |
listener |
typeof notify |
Returns: this
▸ on(event: string | symbol, listener: (...args: any[]) => void): this
Overrides void
Sets any unknown or user-defined event handler
| Name | Type | Description |
|---|---|---|
event |
string | symbol | event name |
listener |
(...args: any[]) => void | event handler |
Returns: this
▸ once(event: "end", listener: typeof end): this
Sets 'end' event handler, which fired only one single time
| Name | Type |
|---|---|
event |
"end" |
listener |
typeof end |
Returns: this
▸ once(event: "connect", listener: typeof connect): this
Sets 'connect' event handler, which fired only one single time
| Name | Type |
|---|---|
event |
"connect" |
listener |
typeof connect |
Returns: this
▸ once(event: "close", listener: typeof close): this
Sets 'close' event handler, which fired only one single time
| Name | Type |
|---|---|
event |
"close" |
listener |
typeof close |
Returns: this
▸ once(event: "listen", listener: typeof listen): this
Sets 'listen' event handler, which fired only one single time
| Name | Type |
|---|---|
event |
"listen" |
listener |
typeof listen |
Returns: this
▸ once(event: "unlisten", listener: typeof unlisten): this
Sets 'unlisten' event handler, which fired only one single time
| Name | Type |
|---|---|
event |
"unlisten" |
listener |
typeof unlisten |
Returns: this
▸ once(event: "error", listener: typeof error): this
Sets 'error' event handler, which fired only one single time
| Name | Type |
|---|---|
event |
"error" |
listener |
typeof error |
Returns: this
▸ once(event: "reconnect", listener: typeof reconnect): this
Sets 'reconnect' event handler, which fired only one single time
| Name | Type |
|---|---|
event |
"reconnect" |
listener |
typeof reconnect |
Returns: this
▸ once(event: "message", listener: typeof message): this
Sets 'message' event handler, which fired only one single time
| Name | Type |
|---|---|
event |
"message" |
listener |
typeof message |
Returns: this
▸ once(event: "notify", listener: typeof notify): this
Sets 'notify' event handler, which fired only one single time
| Name | Type |
|---|---|
event |
"notify" |
listener |
typeof notify |
Returns: this
▸ once(event: string | symbol, listener: (...args: any[]) => void): this
Sets any unknown or user-defined event handler, which would fire only one single time
| Name | Type | Description |
|---|---|---|
event |
string | symbol | event name |
listener |
(...args: any[]) => void | event handler |
Returns: this
▸ prependListener(event: string | symbol, listener: (...args: any[]) => void): this
Inherited from PgClient.prependListener
| Name | Type |
|---|---|
event |
string | symbol |
listener |
(...args: any[]) => void |
Returns: this
▸ prependOnceListener(event: string | symbol, listener: (...args: any[]) => void): this
Inherited from PgClient.prependOnceListener
| Name | Type |
|---|---|
event |
string | symbol |
listener |
(...args: any[]) => void |
Returns: this
▸ rawListeners(event: string | symbol): Function[]
Inherited from PgClient.rawListeners
| Name | Type |
|---|---|
event |
string | symbol |
Returns: Function[]
▸ removeAllListeners(event?: string | symbol): this
Inherited from PgClient.removeAllListeners
| Name | Type |
|---|---|
event? |
string | symbol |
Returns: this
▸ removeListener(event: string | symbol, listener: (...args: any[]) => void): this
Inherited from PgClient.removeListener
| Name | Type |
|---|---|
event |
string | symbol |
listener |
(...args: any[]) => void |
Returns: this
▸ setMaxListeners(n: number): this
Inherited from PgClient.setMaxListeners
| Name | Type |
|---|---|
n |
number |
Returns: this
▸ unlisten(channel: string): Promise<void>
Stops listening of the given channel, and, if singleListener option is set to true - will release an acquired lock (if it was settled).
| Name | Type | Description |
|---|---|---|
channel |
string | channel name to unlisten |
Returns: Promise<void>
▸ unlistenAll(): Promise<void>
Stops listening all connected channels, and, if singleListener option is set to true - will release all acquired locks (if any was settled).
Returns: Promise<void>
▸ StaticlistenerCount(emitter: EventEmitter, event: string | symbol): number
Inherited from PgChannelEmitter.listenerCount
deprecated since v4.0.0
| Name | Type |
|---|---|
emitter |
EventEmitter |
event |
string | symbol |
Returns: number