ptr_ring.h 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675
  1. /*
  2. * Definitions for the 'struct ptr_ring' datastructure.
  3. *
  4. * Author:
  5. * Michael S. Tsirkin <mst@redhat.com>
  6. *
  7. * Copyright (C) 2016 Red Hat, Inc.
  8. *
  9. * This program is free software; you can redistribute it and/or modify it
  10. * under the terms of the GNU General Public License as published by the
  11. * Free Software Foundation; either version 2 of the License, or (at your
  12. * option) any later version.
  13. *
  14. * This is a limited-size FIFO maintaining pointers in FIFO order, with
  15. * one CPU producing entries and another consuming entries from a FIFO.
  16. *
  17. * This implementation tries to minimize cache-contention when there is a
  18. * single producer and a single consumer CPU.
  19. */
  20. #ifndef _LINUX_PTR_RING_H
  21. #define _LINUX_PTR_RING_H 1
  22. #ifdef __KERNEL__
  23. #include <linux/spinlock.h>
  24. #include <linux/cache.h>
  25. #include <linux/types.h>
  26. #include <linux/compiler.h>
  27. #include <linux/cache.h>
  28. #include <linux/slab.h>
  29. #include <asm/errno.h>
  30. #endif
  31. struct ptr_ring {
  32. int producer ____cacheline_aligned_in_smp;
  33. spinlock_t producer_lock;
  34. int consumer_head ____cacheline_aligned_in_smp; /* next valid entry */
  35. int consumer_tail; /* next entry to invalidate */
  36. spinlock_t consumer_lock;
  37. /* Shared consumer/producer data */
  38. /* Read-only by both the producer and the consumer */
  39. int size ____cacheline_aligned_in_smp; /* max entries in queue */
  40. int batch; /* number of entries to consume in a batch */
  41. void **queue;
  42. };
  43. /* Note: callers invoking this in a loop must use a compiler barrier,
  44. * for example cpu_relax().
  45. *
  46. * NB: this is unlike __ptr_ring_empty in that callers must hold producer_lock:
  47. * see e.g. ptr_ring_full.
  48. */
  49. static inline bool __ptr_ring_full(struct ptr_ring *r)
  50. {
  51. return r->queue[r->producer];
  52. }
  53. static inline bool ptr_ring_full(struct ptr_ring *r)
  54. {
  55. bool ret;
  56. spin_lock(&r->producer_lock);
  57. ret = __ptr_ring_full(r);
  58. spin_unlock(&r->producer_lock);
  59. return ret;
  60. }
  61. static inline bool ptr_ring_full_irq(struct ptr_ring *r)
  62. {
  63. bool ret;
  64. spin_lock_irq(&r->producer_lock);
  65. ret = __ptr_ring_full(r);
  66. spin_unlock_irq(&r->producer_lock);
  67. return ret;
  68. }
  69. static inline bool ptr_ring_full_any(struct ptr_ring *r)
  70. {
  71. unsigned long flags;
  72. bool ret;
  73. spin_lock_irqsave(&r->producer_lock, flags);
  74. ret = __ptr_ring_full(r);
  75. spin_unlock_irqrestore(&r->producer_lock, flags);
  76. return ret;
  77. }
  78. static inline bool ptr_ring_full_bh(struct ptr_ring *r)
  79. {
  80. bool ret;
  81. spin_lock_bh(&r->producer_lock);
  82. ret = __ptr_ring_full(r);
  83. spin_unlock_bh(&r->producer_lock);
  84. return ret;
  85. }
  86. /* Note: callers invoking this in a loop must use a compiler barrier,
  87. * for example cpu_relax(). Callers must hold producer_lock.
  88. * Callers are responsible for making sure pointer that is being queued
  89. * points to a valid data.
  90. */
  91. static inline int __ptr_ring_produce(struct ptr_ring *r, void *ptr)
  92. {
  93. if (unlikely(!r->size) || r->queue[r->producer])
  94. return -ENOSPC;
  95. /* Make sure the pointer we are storing points to a valid data. */
  96. /* Pairs with smp_read_barrier_depends in __ptr_ring_consume. */
  97. smp_wmb();
  98. WRITE_ONCE(r->queue[r->producer++], ptr);
  99. if (unlikely(r->producer >= r->size))
  100. r->producer = 0;
  101. return 0;
  102. }
  103. /*
  104. * Note: resize (below) nests producer lock within consumer lock, so if you
  105. * consume in interrupt or BH context, you must disable interrupts/BH when
  106. * calling this.
  107. */
  108. static inline int ptr_ring_produce(struct ptr_ring *r, void *ptr)
  109. {
  110. int ret;
  111. spin_lock(&r->producer_lock);
  112. ret = __ptr_ring_produce(r, ptr);
  113. spin_unlock(&r->producer_lock);
  114. return ret;
  115. }
  116. static inline int ptr_ring_produce_irq(struct ptr_ring *r, void *ptr)
  117. {
  118. int ret;
  119. spin_lock_irq(&r->producer_lock);
  120. ret = __ptr_ring_produce(r, ptr);
  121. spin_unlock_irq(&r->producer_lock);
  122. return ret;
  123. }
  124. static inline int ptr_ring_produce_any(struct ptr_ring *r, void *ptr)
  125. {
  126. unsigned long flags;
  127. int ret;
  128. spin_lock_irqsave(&r->producer_lock, flags);
  129. ret = __ptr_ring_produce(r, ptr);
  130. spin_unlock_irqrestore(&r->producer_lock, flags);
  131. return ret;
  132. }
  133. static inline int ptr_ring_produce_bh(struct ptr_ring *r, void *ptr)
  134. {
  135. int ret;
  136. spin_lock_bh(&r->producer_lock);
  137. ret = __ptr_ring_produce(r, ptr);
  138. spin_unlock_bh(&r->producer_lock);
  139. return ret;
  140. }
  141. static inline void *__ptr_ring_peek(struct ptr_ring *r)
  142. {
  143. if (likely(r->size))
  144. return READ_ONCE(r->queue[r->consumer_head]);
  145. return NULL;
  146. }
  147. /*
  148. * Test ring empty status without taking any locks.
  149. *
  150. * NB: This is only safe to call if ring is never resized.
  151. *
  152. * However, if some other CPU consumes ring entries at the same time, the value
  153. * returned is not guaranteed to be correct.
  154. *
  155. * In this case - to avoid incorrectly detecting the ring
  156. * as empty - the CPU consuming the ring entries is responsible
  157. * for either consuming all ring entries until the ring is empty,
  158. * or synchronizing with some other CPU and causing it to
  159. * re-test __ptr_ring_empty and/or consume the ring enteries
  160. * after the synchronization point.
  161. *
  162. * Note: callers invoking this in a loop must use a compiler barrier,
  163. * for example cpu_relax().
  164. */
  165. static inline bool __ptr_ring_empty(struct ptr_ring *r)
  166. {
  167. if (likely(r->size))
  168. return !r->queue[READ_ONCE(r->consumer_head)];
  169. return true;
  170. }
  171. static inline bool ptr_ring_empty(struct ptr_ring *r)
  172. {
  173. bool ret;
  174. spin_lock(&r->consumer_lock);
  175. ret = __ptr_ring_empty(r);
  176. spin_unlock(&r->consumer_lock);
  177. return ret;
  178. }
  179. static inline bool ptr_ring_empty_irq(struct ptr_ring *r)
  180. {
  181. bool ret;
  182. spin_lock_irq(&r->consumer_lock);
  183. ret = __ptr_ring_empty(r);
  184. spin_unlock_irq(&r->consumer_lock);
  185. return ret;
  186. }
  187. static inline bool ptr_ring_empty_any(struct ptr_ring *r)
  188. {
  189. unsigned long flags;
  190. bool ret;
  191. spin_lock_irqsave(&r->consumer_lock, flags);
  192. ret = __ptr_ring_empty(r);
  193. spin_unlock_irqrestore(&r->consumer_lock, flags);
  194. return ret;
  195. }
  196. static inline bool ptr_ring_empty_bh(struct ptr_ring *r)
  197. {
  198. bool ret;
  199. spin_lock_bh(&r->consumer_lock);
  200. ret = __ptr_ring_empty(r);
  201. spin_unlock_bh(&r->consumer_lock);
  202. return ret;
  203. }
  204. /* Must only be called after __ptr_ring_peek returned !NULL */
  205. static inline void __ptr_ring_discard_one(struct ptr_ring *r)
  206. {
  207. /* Fundamentally, what we want to do is update consumer
  208. * index and zero out the entry so producer can reuse it.
  209. * Doing it naively at each consume would be as simple as:
  210. * consumer = r->consumer;
  211. * r->queue[consumer++] = NULL;
  212. * if (unlikely(consumer >= r->size))
  213. * consumer = 0;
  214. * r->consumer = consumer;
  215. * but that is suboptimal when the ring is full as producer is writing
  216. * out new entries in the same cache line. Defer these updates until a
  217. * batch of entries has been consumed.
  218. */
  219. /* Note: we must keep consumer_head valid at all times for __ptr_ring_empty
  220. * to work correctly.
  221. */
  222. int consumer_head = r->consumer_head;
  223. int head = consumer_head++;
  224. /* Once we have processed enough entries invalidate them in
  225. * the ring all at once so producer can reuse their space in the ring.
  226. * We also do this when we reach end of the ring - not mandatory
  227. * but helps keep the implementation simple.
  228. */
  229. if (unlikely(consumer_head - r->consumer_tail >= r->batch ||
  230. consumer_head >= r->size)) {
  231. /* Zero out entries in the reverse order: this way we touch the
  232. * cache line that producer might currently be reading the last;
  233. * producer won't make progress and touch other cache lines
  234. * besides the first one until we write out all entries.
  235. */
  236. while (likely(head >= r->consumer_tail))
  237. r->queue[head--] = NULL;
  238. r->consumer_tail = consumer_head;
  239. }
  240. if (unlikely(consumer_head >= r->size)) {
  241. consumer_head = 0;
  242. r->consumer_tail = 0;
  243. }
  244. /* matching READ_ONCE in __ptr_ring_empty for lockless tests */
  245. WRITE_ONCE(r->consumer_head, consumer_head);
  246. }
  247. static inline void *__ptr_ring_consume(struct ptr_ring *r)
  248. {
  249. void *ptr;
  250. ptr = __ptr_ring_peek(r);
  251. if (ptr)
  252. __ptr_ring_discard_one(r);
  253. /* Make sure anyone accessing data through the pointer is up to date. */
  254. /* Pairs with smp_wmb in __ptr_ring_produce. */
  255. smp_read_barrier_depends();
  256. return ptr;
  257. }
  258. static inline int __ptr_ring_consume_batched(struct ptr_ring *r,
  259. void **array, int n)
  260. {
  261. void *ptr;
  262. int i;
  263. for (i = 0; i < n; i++) {
  264. ptr = __ptr_ring_consume(r);
  265. if (!ptr)
  266. break;
  267. array[i] = ptr;
  268. }
  269. return i;
  270. }
  271. /*
  272. * Note: resize (below) nests producer lock within consumer lock, so if you
  273. * call this in interrupt or BH context, you must disable interrupts/BH when
  274. * producing.
  275. */
  276. static inline void *ptr_ring_consume(struct ptr_ring *r)
  277. {
  278. void *ptr;
  279. spin_lock(&r->consumer_lock);
  280. ptr = __ptr_ring_consume(r);
  281. spin_unlock(&r->consumer_lock);
  282. return ptr;
  283. }
  284. static inline void *ptr_ring_consume_irq(struct ptr_ring *r)
  285. {
  286. void *ptr;
  287. spin_lock_irq(&r->consumer_lock);
  288. ptr = __ptr_ring_consume(r);
  289. spin_unlock_irq(&r->consumer_lock);
  290. return ptr;
  291. }
  292. static inline void *ptr_ring_consume_any(struct ptr_ring *r)
  293. {
  294. unsigned long flags;
  295. void *ptr;
  296. spin_lock_irqsave(&r->consumer_lock, flags);
  297. ptr = __ptr_ring_consume(r);
  298. spin_unlock_irqrestore(&r->consumer_lock, flags);
  299. return ptr;
  300. }
  301. static inline void *ptr_ring_consume_bh(struct ptr_ring *r)
  302. {
  303. void *ptr;
  304. spin_lock_bh(&r->consumer_lock);
  305. ptr = __ptr_ring_consume(r);
  306. spin_unlock_bh(&r->consumer_lock);
  307. return ptr;
  308. }
  309. static inline int ptr_ring_consume_batched(struct ptr_ring *r,
  310. void **array, int n)
  311. {
  312. int ret;
  313. spin_lock(&r->consumer_lock);
  314. ret = __ptr_ring_consume_batched(r, array, n);
  315. spin_unlock(&r->consumer_lock);
  316. return ret;
  317. }
  318. static inline int ptr_ring_consume_batched_irq(struct ptr_ring *r,
  319. void **array, int n)
  320. {
  321. int ret;
  322. spin_lock_irq(&r->consumer_lock);
  323. ret = __ptr_ring_consume_batched(r, array, n);
  324. spin_unlock_irq(&r->consumer_lock);
  325. return ret;
  326. }
  327. static inline int ptr_ring_consume_batched_any(struct ptr_ring *r,
  328. void **array, int n)
  329. {
  330. unsigned long flags;
  331. int ret;
  332. spin_lock_irqsave(&r->consumer_lock, flags);
  333. ret = __ptr_ring_consume_batched(r, array, n);
  334. spin_unlock_irqrestore(&r->consumer_lock, flags);
  335. return ret;
  336. }
  337. static inline int ptr_ring_consume_batched_bh(struct ptr_ring *r,
  338. void **array, int n)
  339. {
  340. int ret;
  341. spin_lock_bh(&r->consumer_lock);
  342. ret = __ptr_ring_consume_batched(r, array, n);
  343. spin_unlock_bh(&r->consumer_lock);
  344. return ret;
  345. }
  346. /* Cast to structure type and call a function without discarding from FIFO.
  347. * Function must return a value.
  348. * Callers must take consumer_lock.
  349. */
  350. #define __PTR_RING_PEEK_CALL(r, f) ((f)(__ptr_ring_peek(r)))
  351. #define PTR_RING_PEEK_CALL(r, f) ({ \
  352. typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \
  353. \
  354. spin_lock(&(r)->consumer_lock); \
  355. __PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \
  356. spin_unlock(&(r)->consumer_lock); \
  357. __PTR_RING_PEEK_CALL_v; \
  358. })
  359. #define PTR_RING_PEEK_CALL_IRQ(r, f) ({ \
  360. typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \
  361. \
  362. spin_lock_irq(&(r)->consumer_lock); \
  363. __PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \
  364. spin_unlock_irq(&(r)->consumer_lock); \
  365. __PTR_RING_PEEK_CALL_v; \
  366. })
  367. #define PTR_RING_PEEK_CALL_BH(r, f) ({ \
  368. typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \
  369. \
  370. spin_lock_bh(&(r)->consumer_lock); \
  371. __PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \
  372. spin_unlock_bh(&(r)->consumer_lock); \
  373. __PTR_RING_PEEK_CALL_v; \
  374. })
  375. #define PTR_RING_PEEK_CALL_ANY(r, f) ({ \
  376. typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \
  377. unsigned long __PTR_RING_PEEK_CALL_f;\
  378. \
  379. spin_lock_irqsave(&(r)->consumer_lock, __PTR_RING_PEEK_CALL_f); \
  380. __PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \
  381. spin_unlock_irqrestore(&(r)->consumer_lock, __PTR_RING_PEEK_CALL_f); \
  382. __PTR_RING_PEEK_CALL_v; \
  383. })
  384. /* Not all gfp_t flags (besides GFP_KERNEL) are allowed. See
  385. * documentation for vmalloc for which of them are legal.
  386. */
  387. static inline void **__ptr_ring_init_queue_alloc(unsigned int size, gfp_t gfp)
  388. {
  389. if (size > KMALLOC_MAX_SIZE / sizeof(void *))
  390. return NULL;
  391. return kvmalloc_array(size, sizeof(void *), gfp | __GFP_ZERO);
  392. }
  393. static inline void __ptr_ring_set_size(struct ptr_ring *r, int size)
  394. {
  395. r->size = size;
  396. r->batch = SMP_CACHE_BYTES * 2 / sizeof(*(r->queue));
  397. /* We need to set batch at least to 1 to make logic
  398. * in __ptr_ring_discard_one work correctly.
  399. * Batching too much (because ring is small) would cause a lot of
  400. * burstiness. Needs tuning, for now disable batching.
  401. */
  402. if (r->batch > r->size / 2 || !r->batch)
  403. r->batch = 1;
  404. }
  405. static inline int ptr_ring_init(struct ptr_ring *r, int size, gfp_t gfp)
  406. {
  407. r->queue = __ptr_ring_init_queue_alloc(size, gfp);
  408. if (!r->queue)
  409. return -ENOMEM;
  410. __ptr_ring_set_size(r, size);
  411. r->producer = r->consumer_head = r->consumer_tail = 0;
  412. spin_lock_init(&r->producer_lock);
  413. spin_lock_init(&r->consumer_lock);
  414. return 0;
  415. }
  416. /*
  417. * Return entries into ring. Destroy entries that don't fit.
  418. *
  419. * Note: this is expected to be a rare slow path operation.
  420. *
  421. * Note: producer lock is nested within consumer lock, so if you
  422. * resize you must make sure all uses nest correctly.
  423. * In particular if you consume ring in interrupt or BH context, you must
  424. * disable interrupts/BH when doing so.
  425. */
  426. static inline void ptr_ring_unconsume(struct ptr_ring *r, void **batch, int n,
  427. void (*destroy)(void *))
  428. {
  429. unsigned long flags;
  430. int head;
  431. spin_lock_irqsave(&r->consumer_lock, flags);
  432. spin_lock(&r->producer_lock);
  433. if (!r->size)
  434. goto done;
  435. /*
  436. * Clean out buffered entries (for simplicity). This way following code
  437. * can test entries for NULL and if not assume they are valid.
  438. */
  439. head = r->consumer_head - 1;
  440. while (likely(head >= r->consumer_tail))
  441. r->queue[head--] = NULL;
  442. r->consumer_tail = r->consumer_head;
  443. /*
  444. * Go over entries in batch, start moving head back and copy entries.
  445. * Stop when we run into previously unconsumed entries.
  446. */
  447. while (n) {
  448. head = r->consumer_head - 1;
  449. if (head < 0)
  450. head = r->size - 1;
  451. if (r->queue[head]) {
  452. /* This batch entry will have to be destroyed. */
  453. goto done;
  454. }
  455. r->queue[head] = batch[--n];
  456. r->consumer_tail = head;
  457. /* matching READ_ONCE in __ptr_ring_empty for lockless tests */
  458. WRITE_ONCE(r->consumer_head, head);
  459. }
  460. done:
  461. /* Destroy all entries left in the batch. */
  462. while (n)
  463. destroy(batch[--n]);
  464. spin_unlock(&r->producer_lock);
  465. spin_unlock_irqrestore(&r->consumer_lock, flags);
  466. }
  467. static inline void **__ptr_ring_swap_queue(struct ptr_ring *r, void **queue,
  468. int size, gfp_t gfp,
  469. void (*destroy)(void *))
  470. {
  471. int producer = 0;
  472. void **old;
  473. void *ptr;
  474. while ((ptr = __ptr_ring_consume(r)))
  475. if (producer < size)
  476. queue[producer++] = ptr;
  477. else if (destroy)
  478. destroy(ptr);
  479. __ptr_ring_set_size(r, size);
  480. r->producer = producer;
  481. r->consumer_head = 0;
  482. r->consumer_tail = 0;
  483. old = r->queue;
  484. r->queue = queue;
  485. return old;
  486. }
  487. /*
  488. * Note: producer lock is nested within consumer lock, so if you
  489. * resize you must make sure all uses nest correctly.
  490. * In particular if you consume ring in interrupt or BH context, you must
  491. * disable interrupts/BH when doing so.
  492. */
  493. static inline int ptr_ring_resize(struct ptr_ring *r, int size, gfp_t gfp,
  494. void (*destroy)(void *))
  495. {
  496. unsigned long flags;
  497. void **queue = __ptr_ring_init_queue_alloc(size, gfp);
  498. void **old;
  499. if (!queue)
  500. return -ENOMEM;
  501. spin_lock_irqsave(&(r)->consumer_lock, flags);
  502. spin_lock(&(r)->producer_lock);
  503. old = __ptr_ring_swap_queue(r, queue, size, gfp, destroy);
  504. spin_unlock(&(r)->producer_lock);
  505. spin_unlock_irqrestore(&(r)->consumer_lock, flags);
  506. kvfree(old);
  507. return 0;
  508. }
  509. /*
  510. * Note: producer lock is nested within consumer lock, so if you
  511. * resize you must make sure all uses nest correctly.
  512. * In particular if you consume ring in interrupt or BH context, you must
  513. * disable interrupts/BH when doing so.
  514. */
  515. static inline int ptr_ring_resize_multiple(struct ptr_ring **rings,
  516. unsigned int nrings,
  517. int size,
  518. gfp_t gfp, void (*destroy)(void *))
  519. {
  520. unsigned long flags;
  521. void ***queues;
  522. int i;
  523. queues = kmalloc_array(nrings, sizeof(*queues), gfp);
  524. if (!queues)
  525. goto noqueues;
  526. for (i = 0; i < nrings; ++i) {
  527. queues[i] = __ptr_ring_init_queue_alloc(size, gfp);
  528. if (!queues[i])
  529. goto nomem;
  530. }
  531. for (i = 0; i < nrings; ++i) {
  532. spin_lock_irqsave(&(rings[i])->consumer_lock, flags);
  533. spin_lock(&(rings[i])->producer_lock);
  534. queues[i] = __ptr_ring_swap_queue(rings[i], queues[i],
  535. size, gfp, destroy);
  536. spin_unlock(&(rings[i])->producer_lock);
  537. spin_unlock_irqrestore(&(rings[i])->consumer_lock, flags);
  538. }
  539. for (i = 0; i < nrings; ++i)
  540. kvfree(queues[i]);
  541. kfree(queues);
  542. return 0;
  543. nomem:
  544. while (--i >= 0)
  545. kvfree(queues[i]);
  546. kfree(queues);
  547. noqueues:
  548. return -ENOMEM;
  549. }
  550. static inline void ptr_ring_cleanup(struct ptr_ring *r, void (*destroy)(void *))
  551. {
  552. void *ptr;
  553. if (destroy)
  554. while ((ptr = ptr_ring_consume(r)))
  555. destroy(ptr);
  556. kvfree(r->queue);
  557. }
  558. #endif /* _LINUX_PTR_RING_H */