RabbitMQ Adapter
RabbitMQ Adapter
Section titled “RabbitMQ Adapter”The RabbitMQ adapter provides reliable message queue communication using AMQP protocol with support for topic exchanges, durable messaging, and load balancing.
Installation
Section titled “Installation”First, install the required dependencies:
# npmnpm install amqplib
# yarnyarn add amqplib
# pnpmpnpm add amqplibBasic Usage
Section titled “Basic Usage”Producer
Section titled “Producer”import { RabbitMQIO, RPCChannel } from "kkrpc"import { apiMethods, type API } from "./api"
const rabbitmqIO = new RabbitMQIO({ url: "amqp://localhost", exchange: "kkrpc-exchange", exchangeType: "topic", durable: true})
const producerRPC = new RPCChannel<API, API>(rabbitmqIO, { expose: apiMethods})
const api = producerRPC.getAPI()
// Test basic RPC callsconsole.log(await api.add(5, 3)) // 8console.log(await api.echo("Hello from RabbitMQ!")) // "Hello from RabbitMQ!"
rabbitmqIO.destroy()Consumer
Section titled “Consumer”import { RabbitMQIO, RPCChannel } from "kkrpc"import { apiMethods, type API } from "./api"
const rabbitmqIO = new RabbitMQIO({ url: "amqp://localhost", exchange: "kkrpc-exchange", exchangeType: "topic", durable: true, sessionId: "consumer-session"})
const consumerRPC = new RPCChannel<API, API>(rabbitmqIO, { expose: apiMethods})
const api = consumerRPC.getAPI()
// Process messages from producerconsole.log(await api.add(10, 20)) // 30console.log(await api.echo("Hello from consumer!")) // "Hello from consumer!"
rabbitmqIO.destroy()Configuration Options
Section titled “Configuration Options”interface RabbitMQOptions { url?: string // AMQP broker URL (default: "amqp://localhost") exchange?: string // Exchange name (default: "kkrpc-exchange") exchangeType?: "topic" | "direct" | "fanout" // Exchange type (default: "topic") durable?: boolean // Durable exchange and queues (default: true) sessionId?: string // Unique session identifier routingKeyPrefix?: string // Routing key prefix (default: "kkrpc")}Features
Section titled “Features”Topic Exchange Routing
Section titled “Topic Exchange Routing”The RabbitMQ adapter uses a topic exchange with routing keys to separate kkrpc traffic from other consumers:
const rabbitmqIO = new RabbitMQIO({ exchange: "my-exchange", exchangeType: "topic", // Use topic exchange for flexible routing routingKeyPrefix: "myapp.rpc" // Custom routing key prefix})
// Get routing informationconst routingKeys = rabbitmqIO.getRoutingKeys()console.log(routingKeys) // { inbound: "myapp.rpc.messages", outbound: "myapp.rpc.messages" }Durable Messaging
Section titled “Durable Messaging”Configure durable exchanges and queues to survive broker restarts:
const rabbitmqIO = new RabbitMQIO({ durable: true, // Messages survive broker restarts exchange: "durable-exchange"})Session Management
Section titled “Session Management”Each adapter instance gets a unique session ID to prevent message conflicts:
const rabbitmqIO = new RabbitMQIO({ sessionId: "my-unique-session" // Optional custom session ID})
console.log(rabbitmqIO.getSessionId()) // Get current session IDconsole.log(rabbitmqIO.getExchange()) // Get exchange nameAdvanced Usage
Section titled “Advanced Usage”Custom Exchange Configuration
Section titled “Custom Exchange Configuration”const rabbitmqIO = new RabbitMQIO({ url: "amqp://guest:guest@localhost:5672", exchange: "custom-exchange", exchangeType: "direct", // Direct exchange for point-to-point durable: false, // Non-durable for temporary queues routingKeyPrefix: "custom.rpc"})Multiple Consumers
Section titled “Multiple Consumers”// Consumer 1const consumer1 = new RabbitMQIO({ sessionId: "consumer-1", exchange: "load-balanced-exchange"})
// Consumer 2const consumer2 = new RabbitMQIO({ sessionId: "consumer-2", exchange: "load-balanced-exchange"})
// Both consumers will receive all messages (broadcast pattern)Error Handling
Section titled “Error Handling”const rabbitmqIO = new RabbitMQIO({ url: "amqp://localhost"})
try { const api = rabbitmqRPC.getAPI() await api.someMethod()} catch (error) { if (error.message.includes("RabbitMQ adapter has been destroyed")) { console.log("Adapter was destroyed") } else if (error.message.includes("Failed to create RabbitMQ channel")) { console.log("Connection failed - check RabbitMQ server") }}Connection Management
Section titled “Connection Management”const rabbitmqIO = new RabbitMQIO()
// Graceful cleanuprabbitmqIO.destroy()
// Signal destroy to remote partiesawait rabbitmqIO.signalDestroy()Best Practices
Section titled “Best Practices”- Use unique session IDs when running multiple instances
- Enable durable messaging for production systems
- Monitor connection health and implement reconnection logic
- Use appropriate exchange types for your use case:
topic: Flexible routing with wildcardsdirect: Point-to-point communicationfanout: Broadcast to all queues
- Clean up resources with
destroy()when shutting down
Dependencies
Section titled “Dependencies”amqplib: AMQP client library for RabbitMQ- RabbitMQ server running on accessible host