server
Server class is used to receive messages from the RabbitMQ.
Factory
getServer()
Creates a singleton server instance. r4bbit makes validation for URLs, and throws an error for the invalid ones.
Parameters
connectionUrls
: ConnectionUrl | ConnectionUrl[] - One or an array of RabbitMQ connection URLs/objects.options
?: InitRabbitOptions - Options for the connection with RabbitMQ instance.
Example usage
Methods
registerRoute()
Receives messages from a specific routing key and exchange name. It doesn't reply. For replying servers, take a look at registerRPCRoute
Parameters
connection
: ServerConnection - Message to be publishedhandlerFunction
: Handler | AckHandler - A handler function that is executed when a message is received.options
?: ServerOptions - Options related to received message's content Example usage
Example in which the server receives a message from the exchange my-exchange
with the routing key my.*
Because of r4bbit uses topic exchange, it means any topic name starts with my.<any-string>
const handlerFunc: ServerTypes.AckHandler =
({ ack }) =>
(msg: string | object) => {
console.log("Received message is ->", msg); // our
ack(); // Manual acknowledgement (execute after your operation ends)
};
await server.registerRoute(
{
queueName: "my-queue",
exchangeName: "my-exchange",
routingKey: "my.*",
},
handlerFunc,
{
consumeOptions: {
noAck: false, // default is false
},
loggerOptions: {
isDataHidden: true, // default is false
},
responseContains: {
content: true, // default is true
headers: true, // default is false
},
}
);
registerRPCRoute()
Registers an RPC route. It receives messages from a specific routing key and exchange name and returns a response.
Parameters
connection
: ServerConnection - connection to RabbitMQhandlerFunction
: RpcHandler - function that will be executed when a message is receivedoptions
?: ServerRPCOptions - server options for receiving rpc messages
Example usage
This example demonstrates the usage of the registerRPCRoute method to create a Remote Procedure Call (RPC) route. It shows setting up a server, defining an RPC route, and creating a handler function to process incoming requests and respond back to the client with processed data.
const handler: ServerTypes.RpcHandler =
(reply: ServerTypes.Reply) => (msg: Record<string, unknown> | string) => {
if (!msg) {
return;
}
reply((msg as { content: string }).content);
};
await server.registerRPCRoute(
{
queueName: serverQueueName,
routingKey: "*.routing-key",
exchangeName: "my-exchange",
},
handler,
{
replySignature: "rpc-server-signature",
responseContains: {
content: true, // default is true
headers: true, // default is false
signature: false, // default is false
},
consumeOptions: {
noAck: false, // default is false
},
loggerOptions: {
isConsumeDataHidden: false, // default is false
isSendDataHidden: false, // default is false
},
publishOptions: {
persistent: true, // default is true
// ...
},
sendType: "json", // default is 'json'
}
);
await server.registerRPCRoute(
{
exchangeName: "test-exchange",
queueName: "test-queue",
routingKey: "test-routing-key",
},
handler,
{
responseContains: { signature: true }, // optional, default: undefined - no signature in the response
loggerOptions: {
isSendDataHidden: true, // optional, default: false
isConsumeDataHidden: true, // optional, default: false
},
consumeOptions: { noAck: false }, // optional, default: false, i.e., acknowledgments are expected
publishOptions: { persistent: true }, // optional, default: false
replySignature: "reply-sig", // optional, default: undefined - no reply signature
}
);
getWrapper()
r4bbit is built over amqplib and node-amqp-connection-manager, wrapper is an api exposed by node-amqp-connection-manager that allows us to do all those crazy stuff like
- deleting exchanges
- cancelling all the processes
- ...
Parameters
None
Example usage
In this example, we're closing all the consumer connections and eventually the connection with the MQ itself.
await server.getWrapper().cancelAll();
await server.getWrapper().close();
close()
Gracefully closes the connection with RabbitMQ.
Parameters
None
Example usage
await server.close();
Types
ConnectionUrl
type ConnectionUrl =
| string
| amqp.Options.Connect // Linked below 👇
| {
url: string;
connectionOptions?: AmqpConnectionOptions; // Linked below 👇
};
See:
InitRabbitOptions
export type InitRabbitOptions = {
connectOptions?: AmqpConnectionManagerOptions; // Linked below 👇
createChannelOptions?: CreateChannelOpts; // Linked below 👇
};
ServerConnection
type ServerConnection = {
queueName: string;
routingKey: string;
exchangeName: string;
};
ServerOptions
Link for the custom types used as property of ClientMultipleRPC type
consumeOptions?: Options.Consume; // Linked below 👇
responseContains?: ServerResponseContains;
loggerOptions?: {
isDataHidden?: boolean;
};
ServerRPCOptions
type ServerRPCOptions = {
publishOptions?: Options.Publish; // Linked below 👇
consumeOptions?: Options.Consume; // Linked below 👇
sendType?: MessageType;
correlationId?: string;
replySignature?: string;
responseContains?: ServerResponseContains;
loggerOptions?: {
isSendDataHidden?: boolean;
isConsumeDataHidden?: boolean;
};
};
Handler
The Handler function is a function that executes when a message is received, it should be only used when noAck option specified in the ServerOptions.consumeOptions
type Handler = (msg: string | Record<string, unknown>) => void;
AckHandler
AckHandler is a function that returns handlerFunction, the advantage of using ackHandler is, you can manually acknowledge your messages. It should be only used when ack option specified in the ServerOptions.consumeOptions
export type AckHandler = (ackObj: AckObj) => Handler; // Referenced above 👆
RpcHandler
type RpcHandler = (
reply: Reply
) => (msg: string | Record<string, unknown>) => void;
MessageType
type MessageType = "json" | "string" | "object";
ResponseContains
type ResponseContains = {
signature?: boolean;
headers?: boolean;
content?: boolean;
};
ServerResponseContains
type ServerResponseContains = {
headers?: boolean;
content?: boolean;
};