Using RabbitMQ for RPC (Remote Procedure Call) and Pub/Sub (Publish/Subscribe) in Go
RabbitMQ is a powerful message broker that can be used for various messaging patterns. One of the most interesting and useful patterns is RPC (Remote Procedure Call), which allows a client to execute a procedure on a remote server and receive the result. Another common pattern is Pub/Sub (Publish/Subscribe), which allows messages to be broadcast to multiple consumers. In this blog post, we will explore how to implement both RPC and Pub/Sub using RabbitMQ in Go (Golang).
Introduction
RPC is a protocol that one program can use to request a service from a program located on another computer in a network. RabbitMQ provides a robust mechanism to implement RPC by using request and reply queues. We will set up a simple RPC system where a client sends a number to the server, and the server returns the Fibonacci value of that number.
Pub/Sub is a messaging pattern where messages are published to an exchange and then broadcast to multiple queues. This allows multiple consumers to receive the same message simultaneously.
Prerequisites
- A basic understanding of Go programming.
- RabbitMQ installed and running. You can follow RabbitMQ installation guide if you haven't installed it yet.
- Go installed on your machine. You can download it from the official Go website.
Setting Up the Project
First, let's set up a new Go project:
mkdir
rabbitmq-patternscd
rabbitmq-patterns
go mod init rabbitmq-patterns
go get github.com/streadway/amqp
Writing the RPC Server
The server will receive requests from the client, process them, and send back the results.
Create a file named rpc_server.go
:
package
mainimport
( "log"
"strconv"
"github.com/streadway/amqp"
)func failOnError(err error, msg string)
{ if err != nil
{ log.Fatalf("%s: %s"
, msg, err)
}
}func fibonacci(n int) int
{ if n == 0
{ return 0
{
} else if n == 1 return 1
{
} else return fibonacci(n-1) + fibonacci(n-2
)
}
}func main()
{ conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/"
) failOnError(err, "Failed to connect to RabbitMQ"
) defer
conn.Close()
ch, err := conn.Channel() failOnError(err, "Failed to open a channel"
) defer
ch.Close()
q, err := ch.QueueDeclare( "rpc_queue", // name
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
) failOnError(err, "Failed to declare a queue"
)
msgs, err := ch.Consume( q.Name, // queue
"", // consumer
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
) failOnError(err, "Failed to register a consumer"
) forever := make(chan bool
) go func()
{ for d := range
msgs { n, err := strconv.Atoi(string
(d.Body)) failOnError(err, "Failed to convert body to integer"
) log.Printf(" [.] fib(%d)"
, n)
response := fibonacci(n)
err = ch.Publish( "", // exchange
d.ReplyTo, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{ ContentType: "text/plain"
,
CorrelationId: d.CorrelationId, Body: []byte
(strconv.Itoa(response)),
}) failOnError(err, "Failed to publish a message"
) d.Ack(false
)
}
}() log.Printf(" [x] Awaiting RPC requests"
)
<-forever
}
Writing the RPC Client
The client will send a number to the server and wait for the result.
Create a file named rpc_client.go
:
package
mainimport
( "log"
"math/rand"
"strconv"
"time"
"github.com/streadway/amqp"
)func failOnError(err error, msg string)
{ if err != nil
{ log.Fatalf("%s: %s"
, msg, err)
}
}func main()
{ conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/"
) failOnError(err, "Failed to connect to RabbitMQ"
) defer
conn.Close()
ch, err := conn.Channel() failOnError(err, "Failed to open a channel"
) defer
ch.Close()
q, err := ch.QueueDeclare( "", // name
false, // durable
false, // delete when unused
true, // exclusive
false, // no-wait
nil, // arguments
) failOnError(err, "Failed to declare a queue"
)
msgs, err := ch.Consume( q.Name, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
) failOnError(err, "Failed to register a consumer"
) corrId := randomString(32
) n := 30 // Number to send to the server
err = ch.Publish( "", // exchange
"rpc_queue", // routing key
false, // mandatory
false, // immediate
amqp.Publishing{ ContentType: "text/plain"
,
CorrelationId: corrId,
ReplyTo: q.Name, Body: []byte
(strconv.Itoa(n)),
}) failOnError(err, "Failed to publish a message"
) for d := range
msgs { if
corrId == d.CorrelationId { response, err := strconv.Atoi(string
(d.Body)) failOnError(err, "Failed to convert body to integer"
) log.Printf(" [.] Got %d"
, response) break
}
}
}func randomString(l int) string
{
rand.Seed(time.Now().UnixNano()) bytes := make([]byte
, l) for i := range
bytes { bytes[i] = byte(randInt(65, 90
))
} return string
(bytes)
}func randInt(min int, max int) int
{ return
min + rand.Intn(max-min)
}
Writing the Pub/Sub Publisher
The publisher will send messages to an exchange, which will then broadcast the messages to all bound queues.
Create a file named publisher.go
:
package
mainimport
( "log"
"time"
"github.com/streadway/amqp"
)func failOnError(err error, msg string)
{ if err != nil
{ log.Fatalf("%s: %s"
, msg, err)
}
}func main()
{ conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/"
) failOnError(err, "Failed to connect to RabbitMQ"
) defer
conn.Close()
ch, err := conn.Channel() failOnError(err, "Failed to open a channel"
) defer
ch.Close()
err = ch.ExchangeDeclare( "logs", // name
"fanout", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
) failOnError(err, "Failed to declare an exchange"
) for i := 0; i < 10
; i++ { body := "Log message "
+ strconv.Itoa(i)
err = ch.Publish( "logs", // exchange
"", // routing key
false, // mandatory
false, // immediate
amqp.Publishing{ ContentType: "text/plain"
, Body: []byte
(body),
}) failOnError(err, "Failed to publish a message"
) log.Printf(" [x] Sent %s"
, body) time.Sleep(1
* time.Second)
}
}
Writing the Pub/Sub Consumer
The consumer will receive messages from the exchange and process them.
Create a file named consumer.go
:
package
mainimport
( "log"
"github.com/streadway/amqp"
)func failOnError(err error, msg string)
{ if err != nil
{ log.Fatalf("%s: %s"
, msg, err)
}
}func main()
{ conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/"
) failOnError(err, "Failed to connect to RabbitMQ"
) defer
conn.Close()
ch, err := conn.Channel() failOnError(err, "Failed to open a channel"
) defer
ch.Close()
err