前言

最近在看 NSQ 源码,兴致上来,俺也想设计一个简单的 Message Queue (消息队列) 玩一玩。使用典型的 PUB/SUB 模型。

目前代码还不是很完善,只是实现了基本发布、订阅功能。后续有时间再优化吧。

TCP 协议设计

第一步设计协议,参考了 NSQ 的协议设计,我们这里做了精简,只使用了 PUB/SUB 指令。

指令

SUB

订阅某个 Topic 消息

SUB <topic_name>\n

<topic_name> 主题名称

成功响应

OK

失败响应

E_INVALID

消息数据异步响应。

PUB

向某个 Topic 发送消息

PUB <topic_name>\n
[ 4-byte size in bytes ][ N-byte binary data ]


4-byte 数据长度
<topic_name> 主题名称

成功响应

OK

失败响应

E_INVALID

响应数据结构

[x][x][x][x][x][x][x][x][x][x][x][x]...
|  (int32) ||  (int32) || (binary)
|  4-byte  ||  4-byte  || N-byte
------------------------------------...
    size     frame type     data

帧类型

// 响应类型
FrameTypeResponse int32 = 0 
// 错误类型
FrameTypeError    int32 = 1
// 消息类型
FrameTypeMessage  int32 = 2

消息数据结构

[x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x]...
|       (int64)        ||    ||      (hex string encoded in ASCII)           || (binary)
|       8-byte         ||    ||                 16-byte                      || N-byte
------------------------------------------------------------------------------------------...
  nanosecond timestamp    ^^                   message ID                       message body
                       (uint16)
                        2-byte
                       attempts

流程图

Simple-MQ-Diagram

具体实现

https://github.com/mangoim/simple-mq