package main import ( ) /* RPC模式 应答方 */ func failOnError(err error, msg string) { if err != nil { log.Fatalf(, msg, err) panic(fmt.Sprintf(, msg, err)) } } func fib(n int) int { if n == 0 { return 0 } else if n == 1 { return 1 } else { return fib(n-1) + fib(n-2) } } func main() { conn, err := amqp.Dial() failOnError(err, ) defer conn.Close() ch, err := conn.Channel() failOnError(err, ) defer ch.Close() q, err := ch.QueueDeclare( , , , , , // no-wait nil, // arguments ) failOnError(err, ) // 公平分发 没有这个则round-robbin err = ch.Qos( , , // global ) failOnError(err, ) // 消费,等待请求 msgs, err := ch.Consume( q.Name, , , , , , // no-wait nil, // args ) failOnError(err, ) forever := make(chan bool) go func() { d := range msgs { n, err := strconv.Atoi(string(d.Body)) failOnError(err, ) log.Printf(, n) // 计算 response := fib(n) // 回答 err = ch.Publish( "", // exchange d.ReplyTo, , , // immediate amqp.Publishing{ ContentType: , CorrelationId: d.CorrelationId, //序列号 Body: []byte(strconv.Itoa(response)), }) failOnError(err, ) // 确认回答完毕 d.Ack(false) } }() log.Printf() <-forever }
RPC 请求方:
package main import ( ) /* RPC模式 请求方 */ func failOnError(err error, msg string) { if err != nil { log.Fatalf(, msg, err) panic(fmt.Sprintf(, msg, err)) } } func randomString(l int) string { bytes := make([]byte, l) for i := 0; i < l; i++ { bytes[i] = byte(randInt(65, 90)) } return string(bytes) } func randInt(min int, max int) int { return min + rand.Intn(max-min) } func fibonacciRPC(n int) (res int, err error) { conn, err := amqp.Dial() failOnError(err, ) defer conn.Close() ch, err := conn.Channel() failOnError(err, ) defer ch.Close() // 队列声明 q, err := ch.QueueDeclare( , , , , // noWait nil, // arguments ) failOnError(err, ) msgs, err := ch.Consume( q.Name, , , , , , // no-wait nil, // args ) failOnError(err, ) corrId := randomString(32) err = ch.Publish( , , , // immediate amqp.Publishing{ ContentType: , CorrelationId: corrId, ReplyTo: q.Name, Body: []byte(strconv.Itoa(n)), }) failOnError(err, ) for d := range msgs { if corrId == d.CorrelationId { res, err = strconv.Atoi(string(d.Body)) failOnError(err, ) break } } return } func main() { rand.Seed(time.Now().UTC().UnixNano()) n := bodyFrom(os.Args) log.Printf(, n) res, err := fibonacciRPC(n) failOnError(err, ) log.Printf(, res) } func bodyFrom(args []string) int { var s string if (len(args) < 2) || os.Args[1] == "" { s = } else { s = strings.Join(args[1:], " ") } n, err := strconv.Atoi(s) failOnError(err, ) return n }