Skip to main content

client

Client class is used to publish messages to the RabbitMQ server

Factory

getClient()

The getClient function is used for instantiating and returning an instance of the Client class defined in this module.

Parameters

  • connectionUrls: ConnectionUrl | ConnectionUrl[] - One or many RabbitMQ connection URLs (if many, the additional parameters will be treated as fallback ones)

  • options?: InitRabbitOptions - Additional options for initializing RabbitMQ

Example usage

const client = await getClient([
"amqp://guest:guest@localhost:5672/",
"amqp://fallback@localhost:5672/",
]);

Methods

publishMessage()

Publishes message to the given exchange

Parameters

  • message: Buffer | string | unknown - Message to be published
  • options: ClientOptions - Options for publishing the message

Example usage

Example in which the client publishes a message to the exchange my-exchang with the routing key my.routing-key

await client.publishMessage(
{ content: "hello world!!!" },
{
exchangeName: "my-exchange",
routingKey: "my.routing-key",
loggerOptions: {
isDataHidden: false, // default is true,
},
sendType: "json", // default is 'json'
}
);


publishRPCMessage()

Publishes an RPC message to the given exchange and waits for a single response

Parameters

  • message: Buffer | string | unknown - Message to be published
  • options: ClientRPCOptions - Options for publishing the message

Example usage

Example in which the client publishes a message to the exchange test with the routing key test and waits for the response with a timeout of 1 second

const rpcMessage = { message: "OurMessage", nested: { value: 15 } };

await client.publishRPCMessage(rpcMessage, {
exchangeName: "rpc-exchange-name",
routingKey: "rpc.routing.key",
replyQueueName: "rpc-reply-queue", // A reply queue will be created for listening rpc answers
timeout: 5_000, // optional After 5 secs, returns what it receives
responseContains: {
// optional What data the response object will contain
content: true,
headers: true,
signature: true,
},
correlationId: "some-random-nanoid", // optional r4bbit provides a default random value,
loggerOptions: {
// optional
isConsumeDataHidden: false, // default is true
isSendDataHidden: false, // default is true
},
sendType: "json", // optional default is 'json'
receiveType: "json", // optional default is 'json'
replySignature: "rpc-server-signature", // optional
});

publishMultipleRPC()

Publishes messages to the given exchange and waits for multiple responses

It has 2 possible strategies.

  • Listening for all the messages and returning the replies after if timeout occurs or expected number of replies gets received.
  • Using a handler function that listens for all the messages, and currently that the message gets received, executing necessary actions.

We are giving examples for both of those options

Parameters

  • message: Buffer | string | unknown - Message to be published
  • options: ClientMultipleRPC - Options for publishing multiple rpc messages.

Example usage

Example with specified number of expected replies (waitedReplies: 2) within timeout (timeout: 5_000):

await client.publishMultipleRPC(objectMessage, {
exchangeName: "multiple-rpc-exchange-name",
routingKey: "multiple-rpc.routing.key",
replyQueueName: "multiple-rpc-reply-queue",
timeout: 5_000,
waitedReplies: 2,
responseContains: {
content: true,
headers: true,
signature: true,
},
correlationId: "some-random-nanoid", // optional r4bbit provides a default random value,
loggerOptions: {
// optional
isConsumeDataHidden: false,
isSendDataHidden: false,
},
sendType: "json", // optional default is 'json',
receiveType: "json", // optional default is 'json',
replySignature: "server-1", // optional,
});

Example, with an unlimited number of replies that can occur within a specified timespan timeout: 5_000:

await client.publishMultipleRPC(objectMessage, {
exchangeName: "multiple-rpc-exchange-name",
routingKey: "multiple-rpc.routing.key",
replyQueueName: "multiple-rpc-reply-queue",
timeout: 5_000,
responseContains: {
content: true,
headers: true,
signature: true,
},
handler: async (msg) => {
// Handler is taking actions immediately when reply is received.
switch (msg.signature) {
case "server-1":
console.log("Server-1 Received:", msg);
break;
case "server-2":
console.log("Server-2 Received:", msg);
break;
default:
console.log("Unknown resource Received", msg);
}
},
});

close()

Closes the connection with RabbitMq.

Parameters

None

Example usage

const client = await getClient("amqp://localhost");
await client.close();

Types

InitRabbitOptions

export type InitRabbitOptions = {
connectOptions?: AmqpConnectionManagerOptions; // Linked below 👇
createChannelOptions?: CreateChannelOpts; // Linked below 👇
};

ConnectionUrl

type ConnectionUrl =
| string
| amqp.Options.Connect
| {
url: string;
connectionOptions?: AmqpConnectionOptions;
};

ClientOptions

type ClientOptions = {
exchangeName: string;
routingKey: string;
sendType?: MessageType;
publishOptions?: Options.Publish; // Linked below 👇
loggerOptions?: {
isDataHidden?: boolean;
};
};

ServerRPCOptions

Link for the custom types used as property of ServerRPCOptions type

type ServerRPCOptions = {
publishOptions?: Options.Publish; // Linked below 👇
consumeOptions?: Options.Consume; // Linked below 👇
sendType?: MessageType;
correlationId?: string;
replySignature?: string;
responseContains?: ServerResponseContains;
loggerOptions?: {
isSendDataHidden?: boolean;
isConsumeDataHidden?: boolean;
};
};

ClientRPCOptions

Link for the custom types used as property of ClientRPCOptions type

type ClientRPCOptions = {
exchangeName: string;
routingKey: string;
replyQueueName: string;
receiveType?: MessageType;
timeout?: number;
responseContains?: ResponseContains;
} & ServerRPCOptions;

ClientMultipleRPC

Link for the custom types used as property of ClientMultipleRPC type

export type ClientMultipleRPC = {
exchangeName: string;
routingKey: string;
replyQueueName: string;
receiveType?: MessageType;
timeout?: number;
responseContains?: ResponseContains;
waitedReplies?: number;
handler?: (msg: Record<string, unknown>) => void;
} & ServerRPCOptions;

MessageType

type MessageType = "json" | "string" | "object";

ResponseContains

type ResponseContains = {
signature?: boolean;
headers?: boolean;
content?: boolean;
};

ServerResponseContains

type ServerResponseContains = {
headers?: boolean;
content?: boolean;
};