ring_buffer.c 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461
  1. /*
  2. *
  3. * Copyright (c) 2009, Microsoft Corporation.
  4. *
  5. * This program is free software; you can redistribute it and/or modify it
  6. * under the terms and conditions of the GNU General Public License,
  7. * version 2, as published by the Free Software Foundation.
  8. *
  9. * This program is distributed in the hope it will be useful, but WITHOUT
  10. * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  11. * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
  12. * more details.
  13. *
  14. * You should have received a copy of the GNU General Public License along with
  15. * this program; if not, write to the Free Software Foundation, Inc., 59 Temple
  16. * Place - Suite 330, Boston, MA 02111-1307 USA.
  17. *
  18. * Authors:
  19. * Haiyang Zhang <haiyangz@microsoft.com>
  20. * Hank Janssen <hjanssen@microsoft.com>
  21. * K. Y. Srinivasan <kys@microsoft.com>
  22. *
  23. */
  24. #define pr_fmt(fmt) KBUILD_MODNAME ": " fmt
  25. #include <linux/kernel.h>
  26. #include <linux/mm.h>
  27. #include <linux/hyperv.h>
  28. #include <linux/uio.h>
  29. #include "hyperv_vmbus.h"
  30. void hv_begin_read(struct hv_ring_buffer_info *rbi)
  31. {
  32. rbi->ring_buffer->interrupt_mask = 1;
  33. mb();
  34. }
  35. u32 hv_end_read(struct hv_ring_buffer_info *rbi)
  36. {
  37. u32 read;
  38. u32 write;
  39. rbi->ring_buffer->interrupt_mask = 0;
  40. mb();
  41. /*
  42. * Now check to see if the ring buffer is still empty.
  43. * If it is not, we raced and we need to process new
  44. * incoming messages.
  45. */
  46. hv_get_ringbuffer_availbytes(rbi, &read, &write);
  47. return read;
  48. }
  49. /*
  50. * When we write to the ring buffer, check if the host needs to
  51. * be signaled. Here is the details of this protocol:
  52. *
  53. * 1. The host guarantees that while it is draining the
  54. * ring buffer, it will set the interrupt_mask to
  55. * indicate it does not need to be interrupted when
  56. * new data is placed.
  57. *
  58. * 2. The host guarantees that it will completely drain
  59. * the ring buffer before exiting the read loop. Further,
  60. * once the ring buffer is empty, it will clear the
  61. * interrupt_mask and re-check to see if new data has
  62. * arrived.
  63. */
  64. static bool hv_need_to_signal(u32 old_write, struct hv_ring_buffer_info *rbi)
  65. {
  66. mb();
  67. if (rbi->ring_buffer->interrupt_mask)
  68. return false;
  69. /* check interrupt_mask before read_index */
  70. rmb();
  71. /*
  72. * This is the only case we need to signal when the
  73. * ring transitions from being empty to non-empty.
  74. */
  75. if (old_write == rbi->ring_buffer->read_index)
  76. return true;
  77. return false;
  78. }
  79. /*
  80. * To optimize the flow management on the send-side,
  81. * when the sender is blocked because of lack of
  82. * sufficient space in the ring buffer, potential the
  83. * consumer of the ring buffer can signal the producer.
  84. * This is controlled by the following parameters:
  85. *
  86. * 1. pending_send_sz: This is the size in bytes that the
  87. * producer is trying to send.
  88. * 2. The feature bit feat_pending_send_sz set to indicate if
  89. * the consumer of the ring will signal when the ring
  90. * state transitions from being full to a state where
  91. * there is room for the producer to send the pending packet.
  92. */
  93. static bool hv_need_to_signal_on_read(u32 prev_write_sz,
  94. struct hv_ring_buffer_info *rbi)
  95. {
  96. u32 cur_write_sz;
  97. u32 r_size;
  98. u32 write_loc = rbi->ring_buffer->write_index;
  99. u32 read_loc = rbi->ring_buffer->read_index;
  100. u32 pending_sz = rbi->ring_buffer->pending_send_sz;
  101. /* If the other end is not blocked on write don't bother. */
  102. if (pending_sz == 0)
  103. return false;
  104. r_size = rbi->ring_datasize;
  105. cur_write_sz = write_loc >= read_loc ? r_size - (write_loc - read_loc) :
  106. read_loc - write_loc;
  107. if ((prev_write_sz < pending_sz) && (cur_write_sz >= pending_sz))
  108. return true;
  109. return false;
  110. }
  111. /* Get the next write location for the specified ring buffer. */
  112. static inline u32
  113. hv_get_next_write_location(struct hv_ring_buffer_info *ring_info)
  114. {
  115. u32 next = ring_info->ring_buffer->write_index;
  116. return next;
  117. }
  118. /* Set the next write location for the specified ring buffer. */
  119. static inline void
  120. hv_set_next_write_location(struct hv_ring_buffer_info *ring_info,
  121. u32 next_write_location)
  122. {
  123. ring_info->ring_buffer->write_index = next_write_location;
  124. }
  125. /* Get the next read location for the specified ring buffer. */
  126. static inline u32
  127. hv_get_next_read_location(struct hv_ring_buffer_info *ring_info)
  128. {
  129. u32 next = ring_info->ring_buffer->read_index;
  130. return next;
  131. }
  132. /*
  133. * Get the next read location + offset for the specified ring buffer.
  134. * This allows the caller to skip.
  135. */
  136. static inline u32
  137. hv_get_next_readlocation_withoffset(struct hv_ring_buffer_info *ring_info,
  138. u32 offset)
  139. {
  140. u32 next = ring_info->ring_buffer->read_index;
  141. next += offset;
  142. next %= ring_info->ring_datasize;
  143. return next;
  144. }
  145. /* Set the next read location for the specified ring buffer. */
  146. static inline void
  147. hv_set_next_read_location(struct hv_ring_buffer_info *ring_info,
  148. u32 next_read_location)
  149. {
  150. ring_info->ring_buffer->read_index = next_read_location;
  151. }
  152. /* Get the start of the ring buffer. */
  153. static inline void *
  154. hv_get_ring_buffer(struct hv_ring_buffer_info *ring_info)
  155. {
  156. return (void *)ring_info->ring_buffer->buffer;
  157. }
  158. /* Get the size of the ring buffer. */
  159. static inline u32
  160. hv_get_ring_buffersize(struct hv_ring_buffer_info *ring_info)
  161. {
  162. return ring_info->ring_datasize;
  163. }
  164. /* Get the read and write indices as u64 of the specified ring buffer. */
  165. static inline u64
  166. hv_get_ring_bufferindices(struct hv_ring_buffer_info *ring_info)
  167. {
  168. return (u64)ring_info->ring_buffer->write_index << 32;
  169. }
  170. /*
  171. * Helper routine to copy to source from ring buffer.
  172. * Assume there is enough room. Handles wrap-around in src case only!!
  173. */
  174. static u32 hv_copyfrom_ringbuffer(
  175. struct hv_ring_buffer_info *ring_info,
  176. void *dest,
  177. u32 destlen,
  178. u32 start_read_offset)
  179. {
  180. void *ring_buffer = hv_get_ring_buffer(ring_info);
  181. u32 ring_buffer_size = hv_get_ring_buffersize(ring_info);
  182. u32 frag_len;
  183. /* wrap-around detected at the src */
  184. if (destlen > ring_buffer_size - start_read_offset) {
  185. frag_len = ring_buffer_size - start_read_offset;
  186. memcpy(dest, ring_buffer + start_read_offset, frag_len);
  187. memcpy(dest + frag_len, ring_buffer, destlen - frag_len);
  188. } else
  189. memcpy(dest, ring_buffer + start_read_offset, destlen);
  190. start_read_offset += destlen;
  191. start_read_offset %= ring_buffer_size;
  192. return start_read_offset;
  193. }
  194. /*
  195. * Helper routine to copy from source to ring buffer.
  196. * Assume there is enough room. Handles wrap-around in dest case only!!
  197. */
  198. static u32 hv_copyto_ringbuffer(
  199. struct hv_ring_buffer_info *ring_info,
  200. u32 start_write_offset,
  201. void *src,
  202. u32 srclen)
  203. {
  204. void *ring_buffer = hv_get_ring_buffer(ring_info);
  205. u32 ring_buffer_size = hv_get_ring_buffersize(ring_info);
  206. u32 frag_len;
  207. /* wrap-around detected! */
  208. if (srclen > ring_buffer_size - start_write_offset) {
  209. frag_len = ring_buffer_size - start_write_offset;
  210. memcpy(ring_buffer + start_write_offset, src, frag_len);
  211. memcpy(ring_buffer, src + frag_len, srclen - frag_len);
  212. } else
  213. memcpy(ring_buffer + start_write_offset, src, srclen);
  214. start_write_offset += srclen;
  215. start_write_offset %= ring_buffer_size;
  216. return start_write_offset;
  217. }
  218. /* Get various debug metrics for the specified ring buffer. */
  219. void hv_ringbuffer_get_debuginfo(struct hv_ring_buffer_info *ring_info,
  220. struct hv_ring_buffer_debug_info *debug_info)
  221. {
  222. u32 bytes_avail_towrite;
  223. u32 bytes_avail_toread;
  224. if (ring_info->ring_buffer) {
  225. hv_get_ringbuffer_availbytes(ring_info,
  226. &bytes_avail_toread,
  227. &bytes_avail_towrite);
  228. debug_info->bytes_avail_toread = bytes_avail_toread;
  229. debug_info->bytes_avail_towrite = bytes_avail_towrite;
  230. debug_info->current_read_index =
  231. ring_info->ring_buffer->read_index;
  232. debug_info->current_write_index =
  233. ring_info->ring_buffer->write_index;
  234. debug_info->current_interrupt_mask =
  235. ring_info->ring_buffer->interrupt_mask;
  236. }
  237. }
  238. /* Initialize the ring buffer. */
  239. int hv_ringbuffer_init(struct hv_ring_buffer_info *ring_info,
  240. void *buffer, u32 buflen)
  241. {
  242. if (sizeof(struct hv_ring_buffer) != PAGE_SIZE)
  243. return -EINVAL;
  244. memset(ring_info, 0, sizeof(struct hv_ring_buffer_info));
  245. ring_info->ring_buffer = (struct hv_ring_buffer *)buffer;
  246. ring_info->ring_buffer->read_index =
  247. ring_info->ring_buffer->write_index = 0;
  248. /* Set the feature bit for enabling flow control. */
  249. ring_info->ring_buffer->feature_bits.value = 1;
  250. ring_info->ring_size = buflen;
  251. ring_info->ring_datasize = buflen - sizeof(struct hv_ring_buffer);
  252. spin_lock_init(&ring_info->ring_lock);
  253. return 0;
  254. }
  255. /* Cleanup the ring buffer. */
  256. void hv_ringbuffer_cleanup(struct hv_ring_buffer_info *ring_info)
  257. {
  258. }
  259. /* Write to the ring buffer. */
  260. int hv_ringbuffer_write(struct hv_ring_buffer_info *outring_info,
  261. struct kvec *kv_list, u32 kv_count, bool *signal, bool lock)
  262. {
  263. int i = 0;
  264. u32 bytes_avail_towrite;
  265. u32 bytes_avail_toread;
  266. u32 totalbytes_towrite = 0;
  267. u32 next_write_location;
  268. u32 old_write;
  269. u64 prev_indices = 0;
  270. unsigned long flags = 0;
  271. for (i = 0; i < kv_count; i++)
  272. totalbytes_towrite += kv_list[i].iov_len;
  273. totalbytes_towrite += sizeof(u64);
  274. if (lock)
  275. spin_lock_irqsave(&outring_info->ring_lock, flags);
  276. hv_get_ringbuffer_availbytes(outring_info,
  277. &bytes_avail_toread,
  278. &bytes_avail_towrite);
  279. /*
  280. * If there is only room for the packet, assume it is full.
  281. * Otherwise, the next time around, we think the ring buffer
  282. * is empty since the read index == write index.
  283. */
  284. if (bytes_avail_towrite <= totalbytes_towrite) {
  285. if (lock)
  286. spin_unlock_irqrestore(&outring_info->ring_lock, flags);
  287. return -EAGAIN;
  288. }
  289. /* Write to the ring buffer */
  290. next_write_location = hv_get_next_write_location(outring_info);
  291. old_write = next_write_location;
  292. for (i = 0; i < kv_count; i++) {
  293. next_write_location = hv_copyto_ringbuffer(outring_info,
  294. next_write_location,
  295. kv_list[i].iov_base,
  296. kv_list[i].iov_len);
  297. }
  298. /* Set previous packet start */
  299. prev_indices = hv_get_ring_bufferindices(outring_info);
  300. next_write_location = hv_copyto_ringbuffer(outring_info,
  301. next_write_location,
  302. &prev_indices,
  303. sizeof(u64));
  304. /* Issue a full memory barrier before updating the write index */
  305. mb();
  306. /* Now, update the write location */
  307. hv_set_next_write_location(outring_info, next_write_location);
  308. if (lock)
  309. spin_unlock_irqrestore(&outring_info->ring_lock, flags);
  310. *signal = hv_need_to_signal(old_write, outring_info);
  311. return 0;
  312. }
  313. int hv_ringbuffer_read(struct hv_ring_buffer_info *inring_info,
  314. void *buffer, u32 buflen, u32 *buffer_actual_len,
  315. u64 *requestid, bool *signal, bool raw)
  316. {
  317. u32 bytes_avail_towrite;
  318. u32 bytes_avail_toread;
  319. u32 next_read_location = 0;
  320. u64 prev_indices = 0;
  321. struct vmpacket_descriptor desc;
  322. u32 offset;
  323. u32 packetlen;
  324. int ret = 0;
  325. if (buflen <= 0)
  326. return -EINVAL;
  327. *buffer_actual_len = 0;
  328. *requestid = 0;
  329. hv_get_ringbuffer_availbytes(inring_info,
  330. &bytes_avail_toread,
  331. &bytes_avail_towrite);
  332. /* Make sure there is something to read */
  333. if (bytes_avail_toread < sizeof(desc)) {
  334. /*
  335. * No error is set when there is even no header, drivers are
  336. * supposed to analyze buffer_actual_len.
  337. */
  338. return ret;
  339. }
  340. next_read_location = hv_get_next_read_location(inring_info);
  341. next_read_location = hv_copyfrom_ringbuffer(inring_info, &desc,
  342. sizeof(desc),
  343. next_read_location);
  344. offset = raw ? 0 : (desc.offset8 << 3);
  345. packetlen = (desc.len8 << 3) - offset;
  346. *buffer_actual_len = packetlen;
  347. *requestid = desc.trans_id;
  348. if (bytes_avail_toread < packetlen + offset)
  349. return -EAGAIN;
  350. if (packetlen > buflen)
  351. return -ENOBUFS;
  352. next_read_location =
  353. hv_get_next_readlocation_withoffset(inring_info, offset);
  354. next_read_location = hv_copyfrom_ringbuffer(inring_info,
  355. buffer,
  356. packetlen,
  357. next_read_location);
  358. next_read_location = hv_copyfrom_ringbuffer(inring_info,
  359. &prev_indices,
  360. sizeof(u64),
  361. next_read_location);
  362. /*
  363. * Make sure all reads are done before we update the read index since
  364. * the writer may start writing to the read area once the read index
  365. * is updated.
  366. */
  367. mb();
  368. /* Update the read index */
  369. hv_set_next_read_location(inring_info, next_read_location);
  370. *signal = hv_need_to_signal_on_read(bytes_avail_towrite, inring_info);
  371. return ret;
  372. }