Skip to main content

server

Server class is used to receive messages from the RabbitMQ.

Factory

getServer()

Creates a singleton server instance. r4bbit makes validation for URLs, and throws an error for the invalid ones.

Parameters

Example usage

Methods

registerRoute()

Receives messages from a specific routing key and exchange name. It doesn't reply. For replying servers, take a look at registerRPCRoute

Parameters

  • connection: ServerConnection - Message to be published
  • handlerFunction: Handler | AckHandler - A handler function that is executed when a message is received.
  • options?: ServerOptions - Options related to received message's content Example usage

Example in which the server receives a message from the exchange my-exchange with the routing key my.*

Because of r4bbit uses topic exchange, it means any topic name starts with my.<any-string>

const handlerFunc: ServerTypes.AckHandler =
({ ack }) =>
(msg: string | object) => {
console.log("Received message is ->", msg); // our
ack(); // Manual acknowledgement (execute after your operation ends)
};

await server.registerRoute(
{
queueName: "my-queue",
exchangeName: "my-exchange",
routingKey: "my.*",
},
handlerFunc,
{
consumeOptions: {
noAck: false, // default is false
},
loggerOptions: {
isDataHidden: true, // default is false
},
responseContains: {
content: true, // default is true
headers: true, // default is false
},
}
);


registerRPCRoute()

Registers an RPC route. It receives messages from a specific routing key and exchange name and returns a response.

Parameters

  • connection: ServerConnection - connection to RabbitMQ
  • handlerFunction: RpcHandler - function that will be executed when a message is received
  • options?: ServerRPCOptions - server options for receiving rpc messages

Example usage

This example demonstrates the usage of the registerRPCRoute method to create a Remote Procedure Call (RPC) route. It shows setting up a server, defining an RPC route, and creating a handler function to process incoming requests and respond back to the client with processed data.

const handler: ServerTypes.RpcHandler =
(reply: ServerTypes.Reply) => (msg: Record<string, unknown> | string) => {
if (!msg) {
return;
}
reply((msg as { content: string }).content);
};

await server.registerRPCRoute(
{
queueName: serverQueueName,
routingKey: "*.routing-key",
exchangeName: "my-exchange",
},
handler,
{
replySignature: "rpc-server-signature",
responseContains: {
content: true, // default is true
headers: true, // default is false
signature: false, // default is false
},
consumeOptions: {
noAck: false, // default is false
},
loggerOptions: {
isConsumeDataHidden: false, // default is false
isSendDataHidden: false, // default is false
},
publishOptions: {
persistent: true, // default is true
// ...
},
sendType: "json", // default is 'json'
}
);

await server.registerRPCRoute(
{
exchangeName: "test-exchange",
queueName: "test-queue",
routingKey: "test-routing-key",
},
handler,
{
responseContains: { signature: true }, // optional, default: undefined - no signature in the response
loggerOptions: {
isSendDataHidden: true, // optional, default: false
isConsumeDataHidden: true, // optional, default: false
},
consumeOptions: { noAck: false }, // optional, default: false, i.e., acknowledgments are expected
publishOptions: { persistent: true }, // optional, default: false
replySignature: "reply-sig", // optional, default: undefined - no reply signature
}
);

getWrapper()

r4bbit is built over amqplib and node-amqp-connection-manager, wrapper is an api exposed by node-amqp-connection-manager that allows us to do all those crazy stuff like

  • deleting exchanges
  • cancelling all the processes
  • ...

Parameters

None

Example usage

In this example, we're closing all the consumer connections and eventually the connection with the MQ itself.

await server.getWrapper().cancelAll();
await server.getWrapper().close();

close()

Gracefully closes the connection with RabbitMQ.

Parameters

None

Example usage

await server.close();

Types

ConnectionUrl

type ConnectionUrl =
| string
| amqp.Options.Connect // Linked below 👇
| {
url: string;
connectionOptions?: AmqpConnectionOptions; // Linked below 👇
};

InitRabbitOptions

export type InitRabbitOptions = {
connectOptions?: AmqpConnectionManagerOptions; // Linked below 👇
createChannelOptions?: CreateChannelOpts; // Linked below 👇
};

ServerConnection

type ServerConnection = {
queueName: string;
routingKey: string;
exchangeName: string;
};

ServerOptions

Link for the custom types used as property of ClientMultipleRPC type

consumeOptions?: Options.Consume;  // Linked below 👇
responseContains?: ServerResponseContains;
loggerOptions?: {
isDataHidden?: boolean;
};

ServerRPCOptions

type ServerRPCOptions = {
publishOptions?: Options.Publish; // Linked below 👇
consumeOptions?: Options.Consume; // Linked below 👇
sendType?: MessageType;
correlationId?: string;
replySignature?: string;
responseContains?: ServerResponseContains;
loggerOptions?: {
isSendDataHidden?: boolean;
isConsumeDataHidden?: boolean;
};
};

Handler

The Handler function is a function that executes when a message is received, it should be only used when noAck option specified in the ServerOptions.consumeOptions

type Handler = (msg: string | Record<string, unknown>) => void;

AckHandler

AckHandler is a function that returns handlerFunction, the advantage of using ackHandler is, you can manually acknowledge your messages. It should be only used when ack option specified in the ServerOptions.consumeOptions

export type AckHandler = (ackObj: AckObj) => Handler; // Referenced above 👆

RpcHandler

type RpcHandler = (
reply: Reply
) => (msg: string | Record<string, unknown>) => void;

MessageType

type MessageType = "json" | "string" | "object";

ResponseContains

type ResponseContains = {
signature?: boolean;
headers?: boolean;
content?: boolean;
};

ServerResponseContains

type ServerResponseContains = {
headers?: boolean;
content?: boolean;
};