ring_buffer.c 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475
  1. /*
  2. *
  3. * Copyright (c) 2009, Microsoft Corporation.
  4. *
  5. * This program is free software; you can redistribute it and/or modify it
  6. * under the terms and conditions of the GNU General Public License,
  7. * version 2, as published by the Free Software Foundation.
  8. *
  9. * This program is distributed in the hope it will be useful, but WITHOUT
  10. * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  11. * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
  12. * more details.
  13. *
  14. * You should have received a copy of the GNU General Public License along with
  15. * this program; if not, write to the Free Software Foundation, Inc., 59 Temple
  16. * Place - Suite 330, Boston, MA 02111-1307 USA.
  17. *
  18. * Authors:
  19. * Haiyang Zhang <haiyangz@microsoft.com>
  20. * Hank Janssen <hjanssen@microsoft.com>
  21. * K. Y. Srinivasan <kys@microsoft.com>
  22. *
  23. */
  24. #define pr_fmt(fmt) KBUILD_MODNAME ": " fmt
  25. #include <linux/kernel.h>
  26. #include <linux/mm.h>
  27. #include <linux/hyperv.h>
  28. #include <linux/uio.h>
  29. #include "hyperv_vmbus.h"
  30. void hv_begin_read(struct hv_ring_buffer_info *rbi)
  31. {
  32. rbi->ring_buffer->interrupt_mask = 1;
  33. mb();
  34. }
  35. u32 hv_end_read(struct hv_ring_buffer_info *rbi)
  36. {
  37. u32 read;
  38. u32 write;
  39. rbi->ring_buffer->interrupt_mask = 0;
  40. mb();
  41. /*
  42. * Now check to see if the ring buffer is still empty.
  43. * If it is not, we raced and we need to process new
  44. * incoming messages.
  45. */
  46. hv_get_ringbuffer_availbytes(rbi, &read, &write);
  47. return read;
  48. }
  49. /*
  50. * When we write to the ring buffer, check if the host needs to
  51. * be signaled. Here is the details of this protocol:
  52. *
  53. * 1. The host guarantees that while it is draining the
  54. * ring buffer, it will set the interrupt_mask to
  55. * indicate it does not need to be interrupted when
  56. * new data is placed.
  57. *
  58. * 2. The host guarantees that it will completely drain
  59. * the ring buffer before exiting the read loop. Further,
  60. * once the ring buffer is empty, it will clear the
  61. * interrupt_mask and re-check to see if new data has
  62. * arrived.
  63. */
  64. static bool hv_need_to_signal(u32 old_write, struct hv_ring_buffer_info *rbi)
  65. {
  66. mb();
  67. if (rbi->ring_buffer->interrupt_mask)
  68. return false;
  69. /* check interrupt_mask before read_index */
  70. rmb();
  71. /*
  72. * This is the only case we need to signal when the
  73. * ring transitions from being empty to non-empty.
  74. */
  75. if (old_write == rbi->ring_buffer->read_index)
  76. return true;
  77. return false;
  78. }
  79. /*
  80. * To optimize the flow management on the send-side,
  81. * when the sender is blocked because of lack of
  82. * sufficient space in the ring buffer, potential the
  83. * consumer of the ring buffer can signal the producer.
  84. * This is controlled by the following parameters:
  85. *
  86. * 1. pending_send_sz: This is the size in bytes that the
  87. * producer is trying to send.
  88. * 2. The feature bit feat_pending_send_sz set to indicate if
  89. * the consumer of the ring will signal when the ring
  90. * state transitions from being full to a state where
  91. * there is room for the producer to send the pending packet.
  92. */
  93. static bool hv_need_to_signal_on_read(struct hv_ring_buffer_info *rbi)
  94. {
  95. u32 cur_write_sz;
  96. u32 r_size;
  97. u32 write_loc;
  98. u32 read_loc = rbi->ring_buffer->read_index;
  99. u32 pending_sz;
  100. /*
  101. * Issue a full memory barrier before making the signaling decision.
  102. * Here is the reason for having this barrier:
  103. * If the reading of the pend_sz (in this function)
  104. * were to be reordered and read before we commit the new read
  105. * index (in the calling function) we could
  106. * have a problem. If the host were to set the pending_sz after we
  107. * have sampled pending_sz and go to sleep before we commit the
  108. * read index, we could miss sending the interrupt. Issue a full
  109. * memory barrier to address this.
  110. */
  111. mb();
  112. pending_sz = rbi->ring_buffer->pending_send_sz;
  113. write_loc = rbi->ring_buffer->write_index;
  114. /* If the other end is not blocked on write don't bother. */
  115. if (pending_sz == 0)
  116. return false;
  117. r_size = rbi->ring_datasize;
  118. cur_write_sz = write_loc >= read_loc ? r_size - (write_loc - read_loc) :
  119. read_loc - write_loc;
  120. if (cur_write_sz >= pending_sz)
  121. return true;
  122. return false;
  123. }
  124. /* Get the next write location for the specified ring buffer. */
  125. static inline u32
  126. hv_get_next_write_location(struct hv_ring_buffer_info *ring_info)
  127. {
  128. u32 next = ring_info->ring_buffer->write_index;
  129. return next;
  130. }
  131. /* Set the next write location for the specified ring buffer. */
  132. static inline void
  133. hv_set_next_write_location(struct hv_ring_buffer_info *ring_info,
  134. u32 next_write_location)
  135. {
  136. ring_info->ring_buffer->write_index = next_write_location;
  137. }
  138. /* Get the next read location for the specified ring buffer. */
  139. static inline u32
  140. hv_get_next_read_location(struct hv_ring_buffer_info *ring_info)
  141. {
  142. u32 next = ring_info->ring_buffer->read_index;
  143. return next;
  144. }
  145. /*
  146. * Get the next read location + offset for the specified ring buffer.
  147. * This allows the caller to skip.
  148. */
  149. static inline u32
  150. hv_get_next_readlocation_withoffset(struct hv_ring_buffer_info *ring_info,
  151. u32 offset)
  152. {
  153. u32 next = ring_info->ring_buffer->read_index;
  154. next += offset;
  155. next %= ring_info->ring_datasize;
  156. return next;
  157. }
  158. /* Set the next read location for the specified ring buffer. */
  159. static inline void
  160. hv_set_next_read_location(struct hv_ring_buffer_info *ring_info,
  161. u32 next_read_location)
  162. {
  163. ring_info->ring_buffer->read_index = next_read_location;
  164. }
  165. /* Get the start of the ring buffer. */
  166. static inline void *
  167. hv_get_ring_buffer(struct hv_ring_buffer_info *ring_info)
  168. {
  169. return (void *)ring_info->ring_buffer->buffer;
  170. }
  171. /* Get the size of the ring buffer. */
  172. static inline u32
  173. hv_get_ring_buffersize(struct hv_ring_buffer_info *ring_info)
  174. {
  175. return ring_info->ring_datasize;
  176. }
  177. /* Get the read and write indices as u64 of the specified ring buffer. */
  178. static inline u64
  179. hv_get_ring_bufferindices(struct hv_ring_buffer_info *ring_info)
  180. {
  181. return (u64)ring_info->ring_buffer->write_index << 32;
  182. }
  183. /*
  184. * Helper routine to copy to source from ring buffer.
  185. * Assume there is enough room. Handles wrap-around in src case only!!
  186. */
  187. static u32 hv_copyfrom_ringbuffer(
  188. struct hv_ring_buffer_info *ring_info,
  189. void *dest,
  190. u32 destlen,
  191. u32 start_read_offset)
  192. {
  193. void *ring_buffer = hv_get_ring_buffer(ring_info);
  194. u32 ring_buffer_size = hv_get_ring_buffersize(ring_info);
  195. u32 frag_len;
  196. /* wrap-around detected at the src */
  197. if (destlen > ring_buffer_size - start_read_offset) {
  198. frag_len = ring_buffer_size - start_read_offset;
  199. memcpy(dest, ring_buffer + start_read_offset, frag_len);
  200. memcpy(dest + frag_len, ring_buffer, destlen - frag_len);
  201. } else
  202. memcpy(dest, ring_buffer + start_read_offset, destlen);
  203. start_read_offset += destlen;
  204. start_read_offset %= ring_buffer_size;
  205. return start_read_offset;
  206. }
  207. /*
  208. * Helper routine to copy from source to ring buffer.
  209. * Assume there is enough room. Handles wrap-around in dest case only!!
  210. */
  211. static u32 hv_copyto_ringbuffer(
  212. struct hv_ring_buffer_info *ring_info,
  213. u32 start_write_offset,
  214. void *src,
  215. u32 srclen)
  216. {
  217. void *ring_buffer = hv_get_ring_buffer(ring_info);
  218. u32 ring_buffer_size = hv_get_ring_buffersize(ring_info);
  219. u32 frag_len;
  220. /* wrap-around detected! */
  221. if (srclen > ring_buffer_size - start_write_offset) {
  222. frag_len = ring_buffer_size - start_write_offset;
  223. memcpy(ring_buffer + start_write_offset, src, frag_len);
  224. memcpy(ring_buffer, src + frag_len, srclen - frag_len);
  225. } else
  226. memcpy(ring_buffer + start_write_offset, src, srclen);
  227. start_write_offset += srclen;
  228. start_write_offset %= ring_buffer_size;
  229. return start_write_offset;
  230. }
  231. /* Get various debug metrics for the specified ring buffer. */
  232. void hv_ringbuffer_get_debuginfo(struct hv_ring_buffer_info *ring_info,
  233. struct hv_ring_buffer_debug_info *debug_info)
  234. {
  235. u32 bytes_avail_towrite;
  236. u32 bytes_avail_toread;
  237. if (ring_info->ring_buffer) {
  238. hv_get_ringbuffer_availbytes(ring_info,
  239. &bytes_avail_toread,
  240. &bytes_avail_towrite);
  241. debug_info->bytes_avail_toread = bytes_avail_toread;
  242. debug_info->bytes_avail_towrite = bytes_avail_towrite;
  243. debug_info->current_read_index =
  244. ring_info->ring_buffer->read_index;
  245. debug_info->current_write_index =
  246. ring_info->ring_buffer->write_index;
  247. debug_info->current_interrupt_mask =
  248. ring_info->ring_buffer->interrupt_mask;
  249. }
  250. }
  251. /* Initialize the ring buffer. */
  252. int hv_ringbuffer_init(struct hv_ring_buffer_info *ring_info,
  253. void *buffer, u32 buflen)
  254. {
  255. if (sizeof(struct hv_ring_buffer) != PAGE_SIZE)
  256. return -EINVAL;
  257. memset(ring_info, 0, sizeof(struct hv_ring_buffer_info));
  258. ring_info->ring_buffer = (struct hv_ring_buffer *)buffer;
  259. ring_info->ring_buffer->read_index =
  260. ring_info->ring_buffer->write_index = 0;
  261. /* Set the feature bit for enabling flow control. */
  262. ring_info->ring_buffer->feature_bits.value = 1;
  263. ring_info->ring_size = buflen;
  264. ring_info->ring_datasize = buflen - sizeof(struct hv_ring_buffer);
  265. spin_lock_init(&ring_info->ring_lock);
  266. return 0;
  267. }
  268. /* Cleanup the ring buffer. */
  269. void hv_ringbuffer_cleanup(struct hv_ring_buffer_info *ring_info)
  270. {
  271. }
  272. /* Write to the ring buffer. */
  273. int hv_ringbuffer_write(struct hv_ring_buffer_info *outring_info,
  274. struct kvec *kv_list, u32 kv_count, bool *signal, bool lock)
  275. {
  276. int i = 0;
  277. u32 bytes_avail_towrite;
  278. u32 bytes_avail_toread;
  279. u32 totalbytes_towrite = 0;
  280. u32 next_write_location;
  281. u32 old_write;
  282. u64 prev_indices = 0;
  283. unsigned long flags = 0;
  284. for (i = 0; i < kv_count; i++)
  285. totalbytes_towrite += kv_list[i].iov_len;
  286. totalbytes_towrite += sizeof(u64);
  287. if (lock)
  288. spin_lock_irqsave(&outring_info->ring_lock, flags);
  289. hv_get_ringbuffer_availbytes(outring_info,
  290. &bytes_avail_toread,
  291. &bytes_avail_towrite);
  292. /*
  293. * If there is only room for the packet, assume it is full.
  294. * Otherwise, the next time around, we think the ring buffer
  295. * is empty since the read index == write index.
  296. */
  297. if (bytes_avail_towrite <= totalbytes_towrite) {
  298. if (lock)
  299. spin_unlock_irqrestore(&outring_info->ring_lock, flags);
  300. return -EAGAIN;
  301. }
  302. /* Write to the ring buffer */
  303. next_write_location = hv_get_next_write_location(outring_info);
  304. old_write = next_write_location;
  305. for (i = 0; i < kv_count; i++) {
  306. next_write_location = hv_copyto_ringbuffer(outring_info,
  307. next_write_location,
  308. kv_list[i].iov_base,
  309. kv_list[i].iov_len);
  310. }
  311. /* Set previous packet start */
  312. prev_indices = hv_get_ring_bufferindices(outring_info);
  313. next_write_location = hv_copyto_ringbuffer(outring_info,
  314. next_write_location,
  315. &prev_indices,
  316. sizeof(u64));
  317. /* Issue a full memory barrier before updating the write index */
  318. mb();
  319. /* Now, update the write location */
  320. hv_set_next_write_location(outring_info, next_write_location);
  321. if (lock)
  322. spin_unlock_irqrestore(&outring_info->ring_lock, flags);
  323. *signal = hv_need_to_signal(old_write, outring_info);
  324. return 0;
  325. }
  326. int hv_ringbuffer_read(struct hv_ring_buffer_info *inring_info,
  327. void *buffer, u32 buflen, u32 *buffer_actual_len,
  328. u64 *requestid, bool *signal, bool raw)
  329. {
  330. u32 bytes_avail_towrite;
  331. u32 bytes_avail_toread;
  332. u32 next_read_location = 0;
  333. u64 prev_indices = 0;
  334. struct vmpacket_descriptor desc;
  335. u32 offset;
  336. u32 packetlen;
  337. int ret = 0;
  338. if (buflen <= 0)
  339. return -EINVAL;
  340. *buffer_actual_len = 0;
  341. *requestid = 0;
  342. hv_get_ringbuffer_availbytes(inring_info,
  343. &bytes_avail_toread,
  344. &bytes_avail_towrite);
  345. /* Make sure there is something to read */
  346. if (bytes_avail_toread < sizeof(desc)) {
  347. /*
  348. * No error is set when there is even no header, drivers are
  349. * supposed to analyze buffer_actual_len.
  350. */
  351. return ret;
  352. }
  353. next_read_location = hv_get_next_read_location(inring_info);
  354. next_read_location = hv_copyfrom_ringbuffer(inring_info, &desc,
  355. sizeof(desc),
  356. next_read_location);
  357. offset = raw ? 0 : (desc.offset8 << 3);
  358. packetlen = (desc.len8 << 3) - offset;
  359. *buffer_actual_len = packetlen;
  360. *requestid = desc.trans_id;
  361. if (bytes_avail_toread < packetlen + offset)
  362. return -EAGAIN;
  363. if (packetlen > buflen)
  364. return -ENOBUFS;
  365. next_read_location =
  366. hv_get_next_readlocation_withoffset(inring_info, offset);
  367. next_read_location = hv_copyfrom_ringbuffer(inring_info,
  368. buffer,
  369. packetlen,
  370. next_read_location);
  371. next_read_location = hv_copyfrom_ringbuffer(inring_info,
  372. &prev_indices,
  373. sizeof(u64),
  374. next_read_location);
  375. /*
  376. * Make sure all reads are done before we update the read index since
  377. * the writer may start writing to the read area once the read index
  378. * is updated.
  379. */
  380. mb();
  381. /* Update the read index */
  382. hv_set_next_read_location(inring_info, next_read_location);
  383. *signal = hv_need_to_signal_on_read(inring_info);
  384. return ret;
  385. }