个人技术分享

概要说明

循环队列是一种使用固定大小的数组来实现队列的数据结构,它通过循环的方式使用数组空间,具有以下好处:

  1. 空间高效:循环队列避免了使用链表实现队列时可能存在的额外空间开销,因为链表的每个节点都需要额外的存储空间来保存指向下一个节点的指针。
  2. 时间效率:在循环队列中,入队和出队操作通常可以在常数时间内完成,即O(1)时间复杂度。这是因为不需要移动队列中的其他元素。
  3. 减少内存分配:由于循环队列使用固定大小的数组,它避免了动态分配内存的需要,这在某些情况下可以减少内存分配和回收的开销。
  4. 避免内存碎片:固定大小的数组可以减少内存碎片的产生,因为不需要为每个元素单独分配内存。
  5. 实现简单:循环队列的实现相对简单,只需要管理两个指针(或索引)来跟踪队列的头部和尾部。
  6. 动态使用:循环队列可以动态地使用数组空间,即使队列满了,也可以通过循环的方式继续使用数组的未使用部分。
  7. 适用于固定大小数据集:当数据集的大小已知且固定时,循环队列可以非常高效地使用内存和处理数据。
  8. 减少错误:由于循环队列的实现相对简单,它减少了因复杂实现带来的潜在错误。

避免假溢出是循环队列设计中的一个重要问题。假溢出通常发生在循环队列的头部和尾部索引在数组中相遇时,即使数组中还有空间可以存放新元素。出现假溢出的原因,从根本上来说,是因为循环队列的容量被错误地判断为已满。

队列常见问题:假溢出

以下是几种避免假溢出的策略:

  1. 使用标志位:在循环队列结构中添加一个标志位来区分队列是真正的满还是假溢出。当队列满时,设置这个标志位;当队列空时,清除这个标志位。
  2. 使用计数器:维护一个计数器来记录队列中实际的元素数量。入队时增加计数器,出队时减少计数器。即使头部和尾部索引相遇,计数器也能正确反映队列的状态。
  3. 双重索引:使用两个索引来分别表示队列的头部和尾部,同时使用一个布尔值来区分队列是满还是空。当队列满时,尾部索引会等于头部索引,但布尔值会标记队列为满。
  4. 重新计算索引:在检查队列是否满或空时,不要仅仅依赖于头部和尾部索引的比较,而是重新计算它们在数组中的相对位置。
  5. 使用额外的数组空间:在数组中预留一个额外的空间,这样即使头部和尾部索引相遇,也不会立即认为队列满了。这种方式牺牲了一点空间效率,但可以简化逻辑。
  6. 使用环形缓冲区:在某些编程语言的库中,环形缓冲区(ring buffer)是一种特殊的循环队列,它通过使用上述一些策略来避免假溢出。
  7. 动态扩容:虽然这并不直接解决假溢出问题,但通过动态扩容,可以在队列达到容量上限时创建一个新的更大的数组,并将旧数组中的元素复制到新数组中。
  8. 使用哨兵元素:在数组的末尾添加一个哨兵元素,这样即使头部和尾部索引相遇,哨兵元素也不会被误认为是队列中的有效元素。
  9. 逻辑分离:在逻辑上将满和空的状态与头部和尾部索引的相遇分离,通过额外的逻辑来确定队列的真实状态。
  10. 使用状态位图:使用位图来表示数组中哪些位置是被占用的,这样可以通过位图的状态来判断队列是否满或空。

避免假溢出的关键在于正确地维护队列的状态信息,确保在任何时候都能准确地判断队列是否真正满了或者空了。在设计循环队列时,应该根据具体的应用场景和性能要求来选择最合适的策略。

在Go语言中实现循环队列,我们需要定义一个循环队列的数据结构,并实现其基本操作,包括初始化、入队(Push)、出队(Pop)、获取队列长度等。下面是一个简单的循环队列实现示例:

循环队列的实现

package QUEUE

import (
	"strings"
	"sync"
)

type CycleQueue struct {
	data  []interface{} //存储空间
	front int           //前指针,前指针负责弹出数据移动
	rear  int           //尾指针,后指针负责添加数据移动
	cap   int           //设置切片最大容量
	mu    sync.RWMutex          // 添加互斥锁
}

func NewCycleQueue(cap int) *CycleQueue {
	return &CycleQueue{
		data:  make([]interface{}, cap),
		cap:   cap,
		front: 0,
		rear:  0,
	}
}

// 入队操作
// 判断队列是否队满,队满则不允许添加数据
func (q *CycleQueue) Push(data interface{}) bool {
	q.mu.Lock()
	defer q.mu.Unlock() // 确保在函数退出时释放锁

	//check queue is full
	if q.QueueLength() == q.cap { //队列已满时,不执行入队操作
		Log.Error("push err queue is full")
		return false
	}
	q.data[q.rear] = data         //将元素放入队列尾部
	q.rear = (q.rear + 1) % q.cap //尾部元素指向下一个空间位置,取模运算保证了索引不越界(余数一定小于除数)
	return true
}

// 出队操作
// 需要考虑: 队队为空没有数据返回了
func (q *CycleQueue) Pop() interface{} {
	q.mu.Lock()
	defer q.mu.Unlock()

	if q.rear == q.front {
		return nil
	}

	data := q.data[q.front]
	q.data[q.front] = nil // 使用nil来避免内存泄露
	q.front = (q.front + 1) % q.cap
	return data
}

// 循环队列长度计算,考虑哨兵空间
func (q *CycleQueue) QueueLength() int {
	if q.rear >= q.front {
		return q.rear - q.front
	}
	return q.cap - q.front + q.rear
}

func (q *CycleQueue) FindDataByRequestId(requestId string) interface{} {
	q.mu.Lock()
	defer q.mu.Unlock()

	// 计算队列中的有效元素数量
	length := q.QueueLength()

	// 从front指针开始遍历队列
	index := q.front
	for i := 0; i < length; i++ {
		// 检查当前元素是否包含requestId
		if data, ok := q.data[index].(string); ok && strings.Contains(data, requestId) {
			// 找到元素后,从队列中移除
			q.RemoveAtIndex(index)
			return data
		}
		// 移动到下一个元素
		index = (index + 1) % q.cap
	}
	// 如果没有找到,返回nil
	return nil
}

// 新增RemoveAtIndex方法,用于在循环队列中移除指定索引的元素
func (q *CycleQueue) RemoveAtIndex(index int) {
	if index < 0 || index >= q.cap {
		return // 索引越界检查
	}

	// 将index位置的元素复制到最后一个元素的位置
	copy(q.data[index:], q.data[index+1:q.rear])

	// 更新队列的front和rear指针
	if index == q.rear { // 如果移除的是最后一个元素
		q.rear = index // rear指针向前移动
	} else if index < q.rear { // 如果移除的是中间的元素
		q.rear-- // rear指针向前移动
	}

	// 如果移除的是front指针前的元素
	if index < q.front {
		q.front = index // front指针向后移动
	}
}

循环队列的应用

package server

import (
    "encoding/json"
    "strconv"
    "time"
)

//队列通道
type Channel struct {
    queue                  *MeetGo.CycleQueue
}

//队列参数项
type QueueItem struct {
    event    string
    msg      map[string]interface{}
    response *chan Response
}

//返回参数
type Response struct {
	err    interface{}
	result map[string]interface{}
}

var globalChannel *Channel


//创建队列通道
func NewChannel() {
    channel.queue = QUEUE.NewCycleQueue(1000)
    go channel.handle()
    return
}

//处理队列数据
func (channel *Channel) handle() {
    for {
        item := channel.queue.Pop()

        if item == nil {
            time.Sleep(40 * time.Millisecond)
            continue
        }
        channel.process(item)
        time.Sleep(1 * time.Millisecond)
    }
}

func (channel *Channel) process(inter interface{}) {
    item := inter.(QueueItem)
}

func (channel *Channel) PushSignal(item QueueItem) {
    Channel.queue.Push(item)
}

func (channel *Channel) onRequest(event string, msg map[string]interface{}) chan Response {
	response := make(chan Response, 1)
	item := QueueItem{
		event,
		msg,
		&response,
	}
	channel.PushSignal(item)
	return response

}

func main(){
    globalChannel = NewChannel()
    globalChannel.onRequset()
    println(response)
}