1
0

unique_queue.go 1.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566
  1. package sync
  2. import (
  3. "fmt"
  4. )
  5. // UniqueQueue is a queue which guarantees only one instance of same
  6. // identity is in the line. Instances with same identity will be
  7. // discarded if there is already one in the line.
  8. //
  9. // This queue is particularly useful for preventing duplicated task
  10. // of same purpose.
  11. type UniqueQueue struct {
  12. table *StatusTable
  13. queue chan string
  14. }
  15. // NewUniqueQueue initializes and returns a new UniqueQueue object.
  16. func NewUniqueQueue(queueLength int) *UniqueQueue {
  17. if queueLength <= 0 {
  18. queueLength = 100
  19. }
  20. return &UniqueQueue{
  21. table: NewStatusTable(),
  22. queue: make(chan string, queueLength),
  23. }
  24. }
  25. // Queue returns channel of queue for retrieving instances.
  26. func (q *UniqueQueue) Queue() <-chan string {
  27. return q.queue
  28. }
  29. // Exist returns true if there is an instance with given identity
  30. // exists in the queue.
  31. func (q *UniqueQueue) Exist(id any) bool {
  32. return q.table.IsRunning(fmt.Sprintf("%v", id))
  33. }
  34. // AddFunc adds new instance to the queue with a custom runnable function,
  35. // the queue is blocked until the function exits.
  36. func (q *UniqueQueue) AddFunc(id any, fn func()) {
  37. if q.Exist(id) {
  38. return
  39. }
  40. idStr := fmt.Sprintf("%v", id)
  41. q.table.Lock()
  42. q.table.pool[idStr] = true
  43. if fn != nil {
  44. fn()
  45. }
  46. q.table.Unlock()
  47. q.queue <- idStr
  48. }
  49. // Add adds new instance to the queue.
  50. func (q *UniqueQueue) Add(id any) {
  51. q.AddFunc(id, nil)
  52. }
  53. // Remove removes instance from the queue.
  54. func (q *UniqueQueue) Remove(id any) {
  55. q.table.Stop(fmt.Sprintf("%v", id))
  56. }