前言
趁着有空余时间,就想着撸一个mini的生产-消费消息队列,说干就干了。自己是个javer,这次实现,特意换用了go。没错,是零基础上手go,顺便可以学学go。
前置知识:
- go基本语法
- 消息队列概念,也就三个:生产者、消费者、队列
目的
- 没想着实现多复杂,因为时间有限,就mini就好,mini到什么程度呢
- 使用双向链表数据结构作为队列
- 有多个topic可供生产者生成消息和消费者消费消息
- 支持生产者并发写
- 支持消费者读,且ok后,从队列删除
- 消息不丢失(持久化)
- 高性能(先这样想)
设计
整体架构
协议
通讯协议底层使用tcp,mq是基于tcp自定义了一个协议,协议如下
type Msg struct { Id int64 TopicLen int64 Topic string // 1-consumer 2-producer 3-comsumer-ack 4-error MsgType int64 // 消息类型 Len int64 // 消息长度 Payload []byte // 消息 }
Payload使用字节数组,是因为不管数据是什么,只当做字节数组来处理即可。Msg承载着生产者生产的消息,消费者消费的消息,ACK、和错误消息,前两者会有负载,而后两者负载和长度都为空
协议的编解码处理,就是对字节的处理,接下来有从字节转为Msg,和从Msg转为字节两个函数
func BytesToMsg(reader io.Reader) Msg { m := Msg{} var buf [128]byte n, err := reader.Read(buf[:]) if err != nil { fmt.Println("read failed, err:", err) } fmt.Println("read bytes:", n) // id buff := bytes.NewBuffer(buf[0:8]) binary.Read(buff, binary.LittleEndian, &m.Id) // topiclen buff = bytes.NewBuffer(buf[8:16]) binary.Read(buff, binary.LittleEndian, &m.TopicLen) // topic msgLastIndex := 16 + m.TopicLen m.Topic = string(buf[16: msgLastIndex]) // msgtype buff = bytes.NewBuffer(buf[msgLastIndex : msgLastIndex + 8]) binary.Read(buff, binary.LittleEndian, &m.MsgType) buff = bytes.NewBuffer(buf[msgLastIndex : msgLastIndex + 16]) binary.Read(buff, binary.LittleEndian, &m.Len) if m.Len <= 0 { return m } m.Payload = buf[msgLastIndex + 16:] return m } func MsgToBytes(msg Msg) []byte { msg.TopicLen = int64(len([]byte(msg.Topic))) msg.Len = int64(len([]byte(msg.Payload))) var data []byte buf := bytes.NewBuffer([]byte{}) binary.Write(buf, binary.LittleEndian, msg.Id) data = append(data, buf.Bytes()...) buf = bytes.NewBuffer([]byte{}) binary.Write(buf, binary.LittleEndian, msg.TopicLen) data = append(data, buf.Bytes()...) data = append(data, []byte(msg.Topic)...) buf = bytes.NewBuffer([]byte{}) binary.Write(buf, binary.LittleEndian, msg.MsgType) data = append(data, buf.Bytes()...) buf = bytes.NewBuffer([]byte{}) binary.Write(buf, binary.LittleEndian, msg.Len) data = append(data, buf.Bytes()...) data = append(data, []byte(msg.Payload)...) return data }
队列
使用container/list,实现先入先出,生产者在队尾写,消费者在队头读取
package broker import ( "container/list" "sync" ) type Queue struct { len int data list.List } var lock sync.Mutex func (queue *Queue) offer(msg Msg) { queue.data.PushBack(msg) queue.len = queue.data.Len() } func (queue *Queue) poll() Msg{ if queue.len == 0 { return Msg{} } msg := queue.data.Front() return msg.Value.(Msg) } func (queue *Queue) delete(id int64) { lock.Lock() for msg := queue.data.Front(); msg != nil; msg = msg.Next() { if msg.Value.(Msg).Id == id { queue.data.Remove(msg) queue.len = queue.data.Len() break } } lock.Unlock() }
方法offer往队列里插入数据,poll从队列头读取数据素,delete根据消息ID从队列删除数据。这里使用Queue结构体对List进行封装,其实是有必要的,List作为底层的数据结构,我们希望隐藏更多的底层操作,只给客户提供基本的操作
delete操作是在消费者消费成功且发送ACK后,对消息从队列里移除的,因为消费者可以多个同时消费,所以这里进入临界区时加锁(em,加锁是否就一定会影响对性能有较大的影响呢)
broker
broker作为服务器角色,负责接收连接,接收和响应请求
package broker import ( "bufio" "net" "os" "sync" "time" ) var topics = sync.Map{} func handleErr(conn net.Conn) { defer func() { if err := recover(); err != nil { println(err.(string)) conn.Write(MsgToBytes(Msg{MsgType: 4})) } }() } func Process(conn net.Conn) { handleErr(conn) reader := bufio.NewReader(conn) msg := BytesToMsg(reader) queue, ok := topics.Load(msg.Topic) var res Msg if msg.MsgType == 1 { // comsumer if queue == nil || queue.(*Queue).len == 0{ return } msg = queue.(*Queue).poll() msg.MsgType = 1 res = msg } else if msg.MsgType == 2 { // producer if ! ok { queue = &Queue{} queue.(*Queue).data.Init() topics.Store(msg.Topic, queue) } queue.(*Queue).offer(msg) res = Msg{Id: msg.Id, MsgType: 2} } else if msg.MsgType == 3 { // consumer ack if queue == nil { return } queue.(*Queue).delete(msg.Id) } conn.Write(MsgToBytes(res)) }
MsgType等于1时,直接消费消息;MsgType等于2时是生产者生产消息,如果队列为空,那么还需创建一个新的队列,放在对应的topic下;MsgType等于3时,代表消费者成功消费,可以
删除消息
我们说消息不丢失,这里实现不完全,我就实现了持久化(持久化也没全部实现)。思路就是该topic对应的队列里的消息,按协议格式进行序列化,当broker启动时,从文件恢复
持久化需要考虑的是增量还是全量,需要保存多久,这些都会影响实现的难度和性能(想想Kafka和Redis的持久化),这里表示简单实现就好:定时器定时保存
func Save() { ticker := time.NewTicker(60) for { select { case <-ticker.C: topics.Range(func(key, value interface{}) bool { if value == nil { return false } file, _ := os.Open(key.(string)) if file == nil { file, _ = os.Create(key.(string)) } for msg := value.(*Queue).data.Front(); msg != nil; msg = msg.Next() { file.Write(MsgToBytes(msg.Value.(Msg))) } _ := file.Close() return false }) default: time.Sleep(1) } } }
有一个问题是,当上面的delete操作时,这里的file文件需不需要跟着delete掉对应的消息?答案是需要删除的,如果不删除,只能等下一次的全量持久化来覆盖了,中间就有脏数据问题
下面是启动逻辑
package main import ( "awesomeProject/broker" "fmt" "net" ) func main() { listen, err := net.Listen("tcp", "127.0.0.1:12345") if err != nil { fmt.Print("listen failed, err:", err) return } go broker.Save() for { conn, err := listen.Accept() if err != nil { fmt.Print("accept failed, err:", err) continue } go broker.Process(conn) } }
生产者
package main import ( "awesomeProject/broker" "fmt" "net" ) func produce() { conn, err := net.Dial("tcp", "127.0.0.1:12345") if err != nil { fmt.Print("connect failed, err:", err) } defer conn.Close() msg := broker.Msg{Id: 1102, Topic: "topic-test", MsgType: 2, Payload: []byte("我")} n, err := conn.Write(broker.MsgToBytes(msg)) if err != nil { fmt.Print("write failed, err:", err) } fmt.Print(n) }
消费者
package main import ( "awesomeProject/broker" "bytes" "fmt" "net" ) func comsume() { conn, err := net.Dial("tcp", "127.0.0.1:12345") if err != nil { fmt.Print("connect failed, err:", err) } defer conn.Close() msg := broker.Msg{Topic: "topic-test", MsgType: 1} n, err := conn.Write(broker.MsgToBytes(msg)) if err != nil { fmt.Println("write failed, err:", err) } fmt.Println("n", n) var res [128]byte conn.Read(res[:]) buf := bytes.NewBuffer(res[:]) receMsg := broker.BytesToMsg(buf) fmt.Print(receMsg) // ack conn, _ = net.Dial("tcp", "127.0.0.1:12345") l, e := conn.Write(broker.MsgToBytes(broker.Msg{Id: receMsg.Id, Topic: receMsg.Topic, MsgType: 3})) if e != nil { fmt.Println("write failed, err:", err) } fmt.Println("l:", l) }
消费者这里ack时重新创建了连接,如果不创建连接的话,那服务端那里就需要一直从conn读取数据,直到结束。思考一下,像RabbitMQ的ack就有自动和手工的ack,如果是手工的ack,必然需要一个新的连接,因为不知道客户端什么时候发送ack,自动的话,当然可以使用同一个连接,but这里就简单创建一条新连接吧
启动
先启动broker,再启动producer,然后启动comsumer,OK,能跑,能实现发送消息到队列,从队列消费消息
总结
整体虽然简单,但毕竟是使用go实现的,就是看似一顿操作猛如虎,实质慌如狗。第一时间就被go的gopath和go mod困扰住,后面语法的使用,比如指针,传值传引用等,最头疼的就是类型转换,作为一个javer,使用go进行类型转换,着实被狠狠得虐了一番。
到此这篇关于使用go实现一个超级mini的消息队列的示例代码的文章就介绍到这了,更多相关go mini消息队列内容请搜索服务器之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持服务器之家!
原文链接:https://juejin.cn/post/7041085344481017887