ptr_ring.h 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395
  1. /*
  2. * Definitions for the 'struct ptr_ring' datastructure.
  3. *
  4. * Author:
  5. * Michael S. Tsirkin <mst@redhat.com>
  6. *
  7. * Copyright (C) 2016 Red Hat, Inc.
  8. *
  9. * This program is free software; you can redistribute it and/or modify it
  10. * under the terms of the GNU General Public License as published by the
  11. * Free Software Foundation; either version 2 of the License, or (at your
  12. * option) any later version.
  13. *
  14. * This is a limited-size FIFO maintaining pointers in FIFO order, with
  15. * one CPU producing entries and another consuming entries from a FIFO.
  16. *
  17. * This implementation tries to minimize cache-contention when there is a
  18. * single producer and a single consumer CPU.
  19. */
  20. #ifndef _LINUX_PTR_RING_H
  21. #define _LINUX_PTR_RING_H 1
  22. #ifdef __KERNEL__
  23. #include <linux/spinlock.h>
  24. #include <linux/cache.h>
  25. #include <linux/types.h>
  26. #include <linux/compiler.h>
  27. #include <linux/cache.h>
  28. #include <linux/slab.h>
  29. #include <asm/errno.h>
  30. #endif
  31. struct ptr_ring {
  32. int producer ____cacheline_aligned_in_smp;
  33. spinlock_t producer_lock;
  34. int consumer ____cacheline_aligned_in_smp;
  35. spinlock_t consumer_lock;
  36. /* Shared consumer/producer data */
  37. /* Read-only by both the producer and the consumer */
  38. int size ____cacheline_aligned_in_smp; /* max entries in queue */
  39. void **queue;
  40. };
  41. /* Note: callers invoking this in a loop must use a compiler barrier,
  42. * for example cpu_relax(). If ring is ever resized, callers must hold
  43. * producer_lock - see e.g. ptr_ring_full. Otherwise, if callers don't hold
  44. * producer_lock, the next call to __ptr_ring_produce may fail.
  45. */
  46. static inline bool __ptr_ring_full(struct ptr_ring *r)
  47. {
  48. return r->queue[r->producer];
  49. }
  50. static inline bool ptr_ring_full(struct ptr_ring *r)
  51. {
  52. bool ret;
  53. spin_lock(&r->producer_lock);
  54. ret = __ptr_ring_full(r);
  55. spin_unlock(&r->producer_lock);
  56. return ret;
  57. }
  58. static inline bool ptr_ring_full_irq(struct ptr_ring *r)
  59. {
  60. bool ret;
  61. spin_lock_irq(&r->producer_lock);
  62. ret = __ptr_ring_full(r);
  63. spin_unlock_irq(&r->producer_lock);
  64. return ret;
  65. }
  66. static inline bool ptr_ring_full_any(struct ptr_ring *r)
  67. {
  68. unsigned long flags;
  69. bool ret;
  70. spin_lock_irqsave(&r->producer_lock, flags);
  71. ret = __ptr_ring_full(r);
  72. spin_unlock_irqrestore(&r->producer_lock, flags);
  73. return ret;
  74. }
  75. static inline bool ptr_ring_full_bh(struct ptr_ring *r)
  76. {
  77. bool ret;
  78. spin_lock_bh(&r->producer_lock);
  79. ret = __ptr_ring_full(r);
  80. spin_unlock_bh(&r->producer_lock);
  81. return ret;
  82. }
  83. /* Note: callers invoking this in a loop must use a compiler barrier,
  84. * for example cpu_relax(). Callers must hold producer_lock.
  85. */
  86. static inline int __ptr_ring_produce(struct ptr_ring *r, void *ptr)
  87. {
  88. if (unlikely(!r->size) || r->queue[r->producer])
  89. return -ENOSPC;
  90. r->queue[r->producer++] = ptr;
  91. if (unlikely(r->producer >= r->size))
  92. r->producer = 0;
  93. return 0;
  94. }
  95. static inline int ptr_ring_produce(struct ptr_ring *r, void *ptr)
  96. {
  97. int ret;
  98. spin_lock(&r->producer_lock);
  99. ret = __ptr_ring_produce(r, ptr);
  100. spin_unlock(&r->producer_lock);
  101. return ret;
  102. }
  103. static inline int ptr_ring_produce_irq(struct ptr_ring *r, void *ptr)
  104. {
  105. int ret;
  106. spin_lock_irq(&r->producer_lock);
  107. ret = __ptr_ring_produce(r, ptr);
  108. spin_unlock_irq(&r->producer_lock);
  109. return ret;
  110. }
  111. static inline int ptr_ring_produce_any(struct ptr_ring *r, void *ptr)
  112. {
  113. unsigned long flags;
  114. int ret;
  115. spin_lock_irqsave(&r->producer_lock, flags);
  116. ret = __ptr_ring_produce(r, ptr);
  117. spin_unlock_irqrestore(&r->producer_lock, flags);
  118. return ret;
  119. }
  120. static inline int ptr_ring_produce_bh(struct ptr_ring *r, void *ptr)
  121. {
  122. int ret;
  123. spin_lock_bh(&r->producer_lock);
  124. ret = __ptr_ring_produce(r, ptr);
  125. spin_unlock_bh(&r->producer_lock);
  126. return ret;
  127. }
  128. /* Note: callers invoking this in a loop must use a compiler barrier,
  129. * for example cpu_relax(). Callers must take consumer_lock
  130. * if they dereference the pointer - see e.g. PTR_RING_PEEK_CALL.
  131. * If ring is never resized, and if the pointer is merely
  132. * tested, there's no need to take the lock - see e.g. __ptr_ring_empty.
  133. */
  134. static inline void *__ptr_ring_peek(struct ptr_ring *r)
  135. {
  136. if (likely(r->size))
  137. return r->queue[r->consumer];
  138. return NULL;
  139. }
  140. /* Note: callers invoking this in a loop must use a compiler barrier,
  141. * for example cpu_relax(). Callers must take consumer_lock
  142. * if the ring is ever resized - see e.g. ptr_ring_empty.
  143. */
  144. static inline bool __ptr_ring_empty(struct ptr_ring *r)
  145. {
  146. return !__ptr_ring_peek(r);
  147. }
  148. static inline bool ptr_ring_empty(struct ptr_ring *r)
  149. {
  150. bool ret;
  151. spin_lock(&r->consumer_lock);
  152. ret = __ptr_ring_empty(r);
  153. spin_unlock(&r->consumer_lock);
  154. return ret;
  155. }
  156. static inline bool ptr_ring_empty_irq(struct ptr_ring *r)
  157. {
  158. bool ret;
  159. spin_lock_irq(&r->consumer_lock);
  160. ret = __ptr_ring_empty(r);
  161. spin_unlock_irq(&r->consumer_lock);
  162. return ret;
  163. }
  164. static inline bool ptr_ring_empty_any(struct ptr_ring *r)
  165. {
  166. unsigned long flags;
  167. bool ret;
  168. spin_lock_irqsave(&r->consumer_lock, flags);
  169. ret = __ptr_ring_empty(r);
  170. spin_unlock_irqrestore(&r->consumer_lock, flags);
  171. return ret;
  172. }
  173. static inline bool ptr_ring_empty_bh(struct ptr_ring *r)
  174. {
  175. bool ret;
  176. spin_lock_bh(&r->consumer_lock);
  177. ret = __ptr_ring_empty(r);
  178. spin_unlock_bh(&r->consumer_lock);
  179. return ret;
  180. }
  181. /* Must only be called after __ptr_ring_peek returned !NULL */
  182. static inline void __ptr_ring_discard_one(struct ptr_ring *r)
  183. {
  184. r->queue[r->consumer++] = NULL;
  185. if (unlikely(r->consumer >= r->size))
  186. r->consumer = 0;
  187. }
  188. static inline void *__ptr_ring_consume(struct ptr_ring *r)
  189. {
  190. void *ptr;
  191. ptr = __ptr_ring_peek(r);
  192. if (ptr)
  193. __ptr_ring_discard_one(r);
  194. return ptr;
  195. }
  196. static inline void *ptr_ring_consume(struct ptr_ring *r)
  197. {
  198. void *ptr;
  199. spin_lock(&r->consumer_lock);
  200. ptr = __ptr_ring_consume(r);
  201. spin_unlock(&r->consumer_lock);
  202. return ptr;
  203. }
  204. static inline void *ptr_ring_consume_irq(struct ptr_ring *r)
  205. {
  206. void *ptr;
  207. spin_lock_irq(&r->consumer_lock);
  208. ptr = __ptr_ring_consume(r);
  209. spin_unlock_irq(&r->consumer_lock);
  210. return ptr;
  211. }
  212. static inline void *ptr_ring_consume_any(struct ptr_ring *r)
  213. {
  214. unsigned long flags;
  215. void *ptr;
  216. spin_lock_irqsave(&r->consumer_lock, flags);
  217. ptr = __ptr_ring_consume(r);
  218. spin_unlock_irqrestore(&r->consumer_lock, flags);
  219. return ptr;
  220. }
  221. static inline void *ptr_ring_consume_bh(struct ptr_ring *r)
  222. {
  223. void *ptr;
  224. spin_lock_bh(&r->consumer_lock);
  225. ptr = __ptr_ring_consume(r);
  226. spin_unlock_bh(&r->consumer_lock);
  227. return ptr;
  228. }
  229. /* Cast to structure type and call a function without discarding from FIFO.
  230. * Function must return a value.
  231. * Callers must take consumer_lock.
  232. */
  233. #define __PTR_RING_PEEK_CALL(r, f) ((f)(__ptr_ring_peek(r)))
  234. #define PTR_RING_PEEK_CALL(r, f) ({ \
  235. typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \
  236. \
  237. spin_lock(&(r)->consumer_lock); \
  238. __PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \
  239. spin_unlock(&(r)->consumer_lock); \
  240. __PTR_RING_PEEK_CALL_v; \
  241. })
  242. #define PTR_RING_PEEK_CALL_IRQ(r, f) ({ \
  243. typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \
  244. \
  245. spin_lock_irq(&(r)->consumer_lock); \
  246. __PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \
  247. spin_unlock_irq(&(r)->consumer_lock); \
  248. __PTR_RING_PEEK_CALL_v; \
  249. })
  250. #define PTR_RING_PEEK_CALL_BH(r, f) ({ \
  251. typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \
  252. \
  253. spin_lock_bh(&(r)->consumer_lock); \
  254. __PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \
  255. spin_unlock_bh(&(r)->consumer_lock); \
  256. __PTR_RING_PEEK_CALL_v; \
  257. })
  258. #define PTR_RING_PEEK_CALL_ANY(r, f) ({ \
  259. typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \
  260. unsigned long __PTR_RING_PEEK_CALL_f;\
  261. \
  262. spin_lock_irqsave(&(r)->consumer_lock, __PTR_RING_PEEK_CALL_f); \
  263. __PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \
  264. spin_unlock_irqrestore(&(r)->consumer_lock, __PTR_RING_PEEK_CALL_f); \
  265. __PTR_RING_PEEK_CALL_v; \
  266. })
  267. static inline void **__ptr_ring_init_queue_alloc(int size, gfp_t gfp)
  268. {
  269. return kzalloc(ALIGN(size * sizeof(void *), SMP_CACHE_BYTES), gfp);
  270. }
  271. static inline int ptr_ring_init(struct ptr_ring *r, int size, gfp_t gfp)
  272. {
  273. r->queue = __ptr_ring_init_queue_alloc(size, gfp);
  274. if (!r->queue)
  275. return -ENOMEM;
  276. r->size = size;
  277. r->producer = r->consumer = 0;
  278. spin_lock_init(&r->producer_lock);
  279. spin_lock_init(&r->consumer_lock);
  280. return 0;
  281. }
  282. static inline int ptr_ring_resize(struct ptr_ring *r, int size, gfp_t gfp,
  283. void (*destroy)(void *))
  284. {
  285. unsigned long flags;
  286. int producer = 0;
  287. void **queue = __ptr_ring_init_queue_alloc(size, gfp);
  288. void **old;
  289. void *ptr;
  290. if (!queue)
  291. return -ENOMEM;
  292. spin_lock_irqsave(&(r)->producer_lock, flags);
  293. while ((ptr = ptr_ring_consume(r)))
  294. if (producer < size)
  295. queue[producer++] = ptr;
  296. else if (destroy)
  297. destroy(ptr);
  298. r->size = size;
  299. r->producer = producer;
  300. r->consumer = 0;
  301. old = r->queue;
  302. r->queue = queue;
  303. spin_unlock_irqrestore(&(r)->producer_lock, flags);
  304. kfree(old);
  305. return 0;
  306. }
  307. static inline void ptr_ring_cleanup(struct ptr_ring *r, void (*destroy)(void *))
  308. {
  309. void *ptr;
  310. if (destroy)
  311. while ((ptr = ptr_ring_consume(r)))
  312. destroy(ptr);
  313. kfree(r->queue);
  314. }
  315. #endif /* _LINUX_PTR_RING_H */