client
Client class is used to publish messages to the RabbitMQ server
Factory
getClient()
The getClient function is used for instantiating and returning an instance of the Client class defined in this module.
Parameters
connectionUrls
: ConnectionUrl | ConnectionUrl[] - One or many RabbitMQ connection URLs (if many, the additional parameters will be treated as fallback ones)options
?: InitRabbitOptions - Additional options for initializing RabbitMQ
Example usage
const client = await getClient([
"amqp://guest:guest@localhost:5672/",
"amqp://fallback@localhost:5672/",
]);
Methods
publishMessage()
Publishes message to the given exchange
Parameters
message
:Buffer | string | unknown
- Message to be publishedoptions
: ClientOptions - Options for publishing the message
Example usage
Example in which the client publishes a message to the exchange my-exchang
with the routing key my.routing-key
await client.publishMessage(
{ content: "hello world!!!" },
{
exchangeName: "my-exchange",
routingKey: "my.routing-key",
loggerOptions: {
isDataHidden: false, // default is true,
},
sendType: "json", // default is 'json'
}
);
publishRPCMessage()
Publishes an RPC message to the given exchange and waits for a single response
Parameters
message
:Buffer | string | unknown
- Message to be publishedoptions
: ClientRPCOptions - Options for publishing the message
Example usage
Example in which the client publishes a message to the exchange test
with the routing key test
and waits for the response with a timeout of 1 second
const rpcMessage = { message: "OurMessage", nested: { value: 15 } };
await client.publishRPCMessage(rpcMessage, {
exchangeName: "rpc-exchange-name",
routingKey: "rpc.routing.key",
replyQueueName: "rpc-reply-queue", // A reply queue will be created for listening rpc answers
timeout: 5_000, // optional After 5 secs, returns what it receives
responseContains: {
// optional What data the response object will contain
content: true,
headers: true,
signature: true,
},
correlationId: "some-random-nanoid", // optional r4bbit provides a default random value,
loggerOptions: {
// optional
isConsumeDataHidden: false, // default is true
isSendDataHidden: false, // default is true
},
sendType: "json", // optional default is 'json'
receiveType: "json", // optional default is 'json'
replySignature: "rpc-server-signature", // optional
});
publishMultipleRPC()
Publishes messages to the given exchange and waits for multiple responses
It has 2 possible strategies.
- Listening for all the messages and returning the replies after if timeout occurs or expected number of replies gets received.
- Using a handler function that listens for all the messages, and currently that the message gets received, executing necessary actions.
We are giving examples for both of those options
Parameters
message
:Buffer | string | unknown
- Message to be publishedoptions
: ClientMultipleRPC - Options for publishing multiple rpc messages.
Example usage
Example with specified number of expected replies (waitedReplies: 2
) within timeout (timeout: 5_000
):
await client.publishMultipleRPC(objectMessage, {
exchangeName: "multiple-rpc-exchange-name",
routingKey: "multiple-rpc.routing.key",
replyQueueName: "multiple-rpc-reply-queue",
timeout: 5_000,
waitedReplies: 2,
responseContains: {
content: true,
headers: true,
signature: true,
},
correlationId: "some-random-nanoid", // optional r4bbit provides a default random value,
loggerOptions: {
// optional
isConsumeDataHidden: false,
isSendDataHidden: false,
},
sendType: "json", // optional default is 'json',
receiveType: "json", // optional default is 'json',
replySignature: "server-1", // optional,
});
Example, with an unlimited number of replies that can occur within a specified timespan timeout: 5_000
:
await client.publishMultipleRPC(objectMessage, {
exchangeName: "multiple-rpc-exchange-name",
routingKey: "multiple-rpc.routing.key",
replyQueueName: "multiple-rpc-reply-queue",
timeout: 5_000,
responseContains: {
content: true,
headers: true,
signature: true,
},
handler: async (msg) => {
// Handler is taking actions immediately when reply is received.
switch (msg.signature) {
case "server-1":
console.log("Server-1 Received:", msg);
break;
case "server-2":
console.log("Server-2 Received:", msg);
break;
default:
console.log("Unknown resource Received", msg);
}
},
});
close()
Closes the connection with RabbitMq.
Parameters
None
Example usage
const client = await getClient("amqp://localhost");
await client.close();
Types
InitRabbitOptions
export type InitRabbitOptions = {
connectOptions?: AmqpConnectionManagerOptions; // Linked below 👇
createChannelOptions?: CreateChannelOpts; // Linked below 👇
};
ConnectionUrl
type ConnectionUrl =
| string
| amqp.Options.Connect
| {
url: string;
connectionOptions?: AmqpConnectionOptions;
};
ClientOptions
type ClientOptions = {
exchangeName: string;
routingKey: string;
sendType?: MessageType;
publishOptions?: Options.Publish; // Linked below 👇
loggerOptions?: {
isDataHidden?: boolean;
};
};
ServerRPCOptions
Link for the custom types used as property of ServerRPCOptions type
type ServerRPCOptions = {
publishOptions?: Options.Publish; // Linked below 👇
consumeOptions?: Options.Consume; // Linked below 👇
sendType?: MessageType;
correlationId?: string;
replySignature?: string;
responseContains?: ServerResponseContains;
loggerOptions?: {
isSendDataHidden?: boolean;
isConsumeDataHidden?: boolean;
};
};
ClientRPCOptions
Link for the custom types used as property of ClientRPCOptions type
type ClientRPCOptions = {
exchangeName: string;
routingKey: string;
replyQueueName: string;
receiveType?: MessageType;
timeout?: number;
responseContains?: ResponseContains;
} & ServerRPCOptions;
ClientMultipleRPC
Link for the custom types used as property of ClientMultipleRPC type
export type ClientMultipleRPC = {
exchangeName: string;
routingKey: string;
replyQueueName: string;
receiveType?: MessageType;
timeout?: number;
responseContains?: ResponseContains;
waitedReplies?: number;
handler?: (msg: Record<string, unknown>) => void;
} & ServerRPCOptions;
MessageType
type MessageType = "json" | "string" | "object";
ResponseContains
type ResponseContains = {
signature?: boolean;
headers?: boolean;
content?: boolean;
};
ServerResponseContains
type ServerResponseContains = {
headers?: boolean;
content?: boolean;
};