async-thread.c 26 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024
  1. /*
  2. * Copyright (C) 2007 Oracle. All rights reserved.
  3. * Copyright (C) 2014 Fujitsu. All rights reserved.
  4. *
  5. * This program is free software; you can redistribute it and/or
  6. * modify it under the terms of the GNU General Public
  7. * License v2 as published by the Free Software Foundation.
  8. *
  9. * This program is distributed in the hope that it will be useful,
  10. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  12. * General Public License for more details.
  13. *
  14. * You should have received a copy of the GNU General Public
  15. * License along with this program; if not, write to the
  16. * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
  17. * Boston, MA 021110-1307, USA.
  18. */
  19. #include <linux/kthread.h>
  20. #include <linux/slab.h>
  21. #include <linux/list.h>
  22. #include <linux/spinlock.h>
  23. #include <linux/freezer.h>
  24. #include <linux/workqueue.h>
  25. #include "async-thread.h"
  26. #define WORK_QUEUED_BIT 0
  27. #define WORK_DONE_BIT 1
  28. #define WORK_ORDER_DONE_BIT 2
  29. #define WORK_HIGH_PRIO_BIT 3
  30. #define NO_THRESHOLD (-1)
  31. #define DFT_THRESHOLD (32)
  32. /*
  33. * container for the kthread task pointer and the list of pending work
  34. * One of these is allocated per thread.
  35. */
  36. struct btrfs_worker_thread {
  37. /* pool we belong to */
  38. struct btrfs_workers *workers;
  39. /* list of struct btrfs_work that are waiting for service */
  40. struct list_head pending;
  41. struct list_head prio_pending;
  42. /* list of worker threads from struct btrfs_workers */
  43. struct list_head worker_list;
  44. /* kthread */
  45. struct task_struct *task;
  46. /* number of things on the pending list */
  47. atomic_t num_pending;
  48. /* reference counter for this struct */
  49. atomic_t refs;
  50. unsigned long sequence;
  51. /* protects the pending list. */
  52. spinlock_t lock;
  53. /* set to non-zero when this thread is already awake and kicking */
  54. int working;
  55. /* are we currently idle */
  56. int idle;
  57. };
  58. static int __btrfs_start_workers(struct btrfs_workers *workers);
  59. /*
  60. * btrfs_start_workers uses kthread_run, which can block waiting for memory
  61. * for a very long time. It will actually throttle on page writeback,
  62. * and so it may not make progress until after our btrfs worker threads
  63. * process all of the pending work structs in their queue
  64. *
  65. * This means we can't use btrfs_start_workers from inside a btrfs worker
  66. * thread that is used as part of cleaning dirty memory, which pretty much
  67. * involves all of the worker threads.
  68. *
  69. * Instead we have a helper queue who never has more than one thread
  70. * where we scheduler thread start operations. This worker_start struct
  71. * is used to contain the work and hold a pointer to the queue that needs
  72. * another worker.
  73. */
  74. struct worker_start {
  75. struct btrfs_work work;
  76. struct btrfs_workers *queue;
  77. };
  78. static void start_new_worker_func(struct btrfs_work *work)
  79. {
  80. struct worker_start *start;
  81. start = container_of(work, struct worker_start, work);
  82. __btrfs_start_workers(start->queue);
  83. kfree(start);
  84. }
  85. /*
  86. * helper function to move a thread onto the idle list after it
  87. * has finished some requests.
  88. */
  89. static void check_idle_worker(struct btrfs_worker_thread *worker)
  90. {
  91. if (!worker->idle && atomic_read(&worker->num_pending) <
  92. worker->workers->idle_thresh / 2) {
  93. unsigned long flags;
  94. spin_lock_irqsave(&worker->workers->lock, flags);
  95. worker->idle = 1;
  96. /* the list may be empty if the worker is just starting */
  97. if (!list_empty(&worker->worker_list) &&
  98. !worker->workers->stopping) {
  99. list_move(&worker->worker_list,
  100. &worker->workers->idle_list);
  101. }
  102. spin_unlock_irqrestore(&worker->workers->lock, flags);
  103. }
  104. }
  105. /*
  106. * helper function to move a thread off the idle list after new
  107. * pending work is added.
  108. */
  109. static void check_busy_worker(struct btrfs_worker_thread *worker)
  110. {
  111. if (worker->idle && atomic_read(&worker->num_pending) >=
  112. worker->workers->idle_thresh) {
  113. unsigned long flags;
  114. spin_lock_irqsave(&worker->workers->lock, flags);
  115. worker->idle = 0;
  116. if (!list_empty(&worker->worker_list) &&
  117. !worker->workers->stopping) {
  118. list_move_tail(&worker->worker_list,
  119. &worker->workers->worker_list);
  120. }
  121. spin_unlock_irqrestore(&worker->workers->lock, flags);
  122. }
  123. }
  124. static void check_pending_worker_creates(struct btrfs_worker_thread *worker)
  125. {
  126. struct btrfs_workers *workers = worker->workers;
  127. struct worker_start *start;
  128. unsigned long flags;
  129. rmb();
  130. if (!workers->atomic_start_pending)
  131. return;
  132. start = kzalloc(sizeof(*start), GFP_NOFS);
  133. if (!start)
  134. return;
  135. start->work.func = start_new_worker_func;
  136. start->queue = workers;
  137. spin_lock_irqsave(&workers->lock, flags);
  138. if (!workers->atomic_start_pending)
  139. goto out;
  140. workers->atomic_start_pending = 0;
  141. if (workers->num_workers + workers->num_workers_starting >=
  142. workers->max_workers)
  143. goto out;
  144. workers->num_workers_starting += 1;
  145. spin_unlock_irqrestore(&workers->lock, flags);
  146. btrfs_queue_worker(workers->atomic_worker_start, &start->work);
  147. return;
  148. out:
  149. kfree(start);
  150. spin_unlock_irqrestore(&workers->lock, flags);
  151. }
  152. static noinline void run_ordered_completions(struct btrfs_workers *workers,
  153. struct btrfs_work *work)
  154. {
  155. if (!workers->ordered)
  156. return;
  157. set_bit(WORK_DONE_BIT, &work->flags);
  158. spin_lock(&workers->order_lock);
  159. while (1) {
  160. if (!list_empty(&workers->prio_order_list)) {
  161. work = list_entry(workers->prio_order_list.next,
  162. struct btrfs_work, order_list);
  163. } else if (!list_empty(&workers->order_list)) {
  164. work = list_entry(workers->order_list.next,
  165. struct btrfs_work, order_list);
  166. } else {
  167. break;
  168. }
  169. if (!test_bit(WORK_DONE_BIT, &work->flags))
  170. break;
  171. /* we are going to call the ordered done function, but
  172. * we leave the work item on the list as a barrier so
  173. * that later work items that are done don't have their
  174. * functions called before this one returns
  175. */
  176. if (test_and_set_bit(WORK_ORDER_DONE_BIT, &work->flags))
  177. break;
  178. spin_unlock(&workers->order_lock);
  179. work->ordered_func(work);
  180. /* now take the lock again and drop our item from the list */
  181. spin_lock(&workers->order_lock);
  182. list_del(&work->order_list);
  183. spin_unlock(&workers->order_lock);
  184. /*
  185. * we don't want to call the ordered free functions
  186. * with the lock held though
  187. */
  188. work->ordered_free(work);
  189. spin_lock(&workers->order_lock);
  190. }
  191. spin_unlock(&workers->order_lock);
  192. }
  193. static void put_worker(struct btrfs_worker_thread *worker)
  194. {
  195. if (atomic_dec_and_test(&worker->refs))
  196. kfree(worker);
  197. }
  198. static int try_worker_shutdown(struct btrfs_worker_thread *worker)
  199. {
  200. int freeit = 0;
  201. spin_lock_irq(&worker->lock);
  202. spin_lock(&worker->workers->lock);
  203. if (worker->workers->num_workers > 1 &&
  204. worker->idle &&
  205. !worker->working &&
  206. !list_empty(&worker->worker_list) &&
  207. list_empty(&worker->prio_pending) &&
  208. list_empty(&worker->pending) &&
  209. atomic_read(&worker->num_pending) == 0) {
  210. freeit = 1;
  211. list_del_init(&worker->worker_list);
  212. worker->workers->num_workers--;
  213. }
  214. spin_unlock(&worker->workers->lock);
  215. spin_unlock_irq(&worker->lock);
  216. if (freeit)
  217. put_worker(worker);
  218. return freeit;
  219. }
  220. static struct btrfs_work *get_next_work(struct btrfs_worker_thread *worker,
  221. struct list_head *prio_head,
  222. struct list_head *head)
  223. {
  224. struct btrfs_work *work = NULL;
  225. struct list_head *cur = NULL;
  226. if (!list_empty(prio_head)) {
  227. cur = prio_head->next;
  228. goto out;
  229. }
  230. smp_mb();
  231. if (!list_empty(&worker->prio_pending))
  232. goto refill;
  233. if (!list_empty(head)) {
  234. cur = head->next;
  235. goto out;
  236. }
  237. refill:
  238. spin_lock_irq(&worker->lock);
  239. list_splice_tail_init(&worker->prio_pending, prio_head);
  240. list_splice_tail_init(&worker->pending, head);
  241. if (!list_empty(prio_head))
  242. cur = prio_head->next;
  243. else if (!list_empty(head))
  244. cur = head->next;
  245. spin_unlock_irq(&worker->lock);
  246. if (!cur)
  247. goto out_fail;
  248. out:
  249. work = list_entry(cur, struct btrfs_work, list);
  250. out_fail:
  251. return work;
  252. }
  253. /*
  254. * main loop for servicing work items
  255. */
  256. static int worker_loop(void *arg)
  257. {
  258. struct btrfs_worker_thread *worker = arg;
  259. struct list_head head;
  260. struct list_head prio_head;
  261. struct btrfs_work *work;
  262. INIT_LIST_HEAD(&head);
  263. INIT_LIST_HEAD(&prio_head);
  264. do {
  265. again:
  266. while (1) {
  267. work = get_next_work(worker, &prio_head, &head);
  268. if (!work)
  269. break;
  270. list_del(&work->list);
  271. clear_bit(WORK_QUEUED_BIT, &work->flags);
  272. work->worker = worker;
  273. work->func(work);
  274. atomic_dec(&worker->num_pending);
  275. /*
  276. * unless this is an ordered work queue,
  277. * 'work' was probably freed by func above.
  278. */
  279. run_ordered_completions(worker->workers, work);
  280. check_pending_worker_creates(worker);
  281. cond_resched();
  282. }
  283. spin_lock_irq(&worker->lock);
  284. check_idle_worker(worker);
  285. if (freezing(current)) {
  286. worker->working = 0;
  287. spin_unlock_irq(&worker->lock);
  288. try_to_freeze();
  289. } else {
  290. spin_unlock_irq(&worker->lock);
  291. if (!kthread_should_stop()) {
  292. cpu_relax();
  293. /*
  294. * we've dropped the lock, did someone else
  295. * jump_in?
  296. */
  297. smp_mb();
  298. if (!list_empty(&worker->pending) ||
  299. !list_empty(&worker->prio_pending))
  300. continue;
  301. /*
  302. * this short schedule allows more work to
  303. * come in without the queue functions
  304. * needing to go through wake_up_process()
  305. *
  306. * worker->working is still 1, so nobody
  307. * is going to try and wake us up
  308. */
  309. schedule_timeout(1);
  310. smp_mb();
  311. if (!list_empty(&worker->pending) ||
  312. !list_empty(&worker->prio_pending))
  313. continue;
  314. if (kthread_should_stop())
  315. break;
  316. /* still no more work?, sleep for real */
  317. spin_lock_irq(&worker->lock);
  318. set_current_state(TASK_INTERRUPTIBLE);
  319. if (!list_empty(&worker->pending) ||
  320. !list_empty(&worker->prio_pending)) {
  321. spin_unlock_irq(&worker->lock);
  322. set_current_state(TASK_RUNNING);
  323. goto again;
  324. }
  325. /*
  326. * this makes sure we get a wakeup when someone
  327. * adds something new to the queue
  328. */
  329. worker->working = 0;
  330. spin_unlock_irq(&worker->lock);
  331. if (!kthread_should_stop()) {
  332. schedule_timeout(HZ * 120);
  333. if (!worker->working &&
  334. try_worker_shutdown(worker)) {
  335. return 0;
  336. }
  337. }
  338. }
  339. __set_current_state(TASK_RUNNING);
  340. }
  341. } while (!kthread_should_stop());
  342. return 0;
  343. }
  344. /*
  345. * this will wait for all the worker threads to shutdown
  346. */
  347. void btrfs_stop_workers(struct btrfs_workers *workers)
  348. {
  349. struct list_head *cur;
  350. struct btrfs_worker_thread *worker;
  351. int can_stop;
  352. spin_lock_irq(&workers->lock);
  353. workers->stopping = 1;
  354. list_splice_init(&workers->idle_list, &workers->worker_list);
  355. while (!list_empty(&workers->worker_list)) {
  356. cur = workers->worker_list.next;
  357. worker = list_entry(cur, struct btrfs_worker_thread,
  358. worker_list);
  359. atomic_inc(&worker->refs);
  360. workers->num_workers -= 1;
  361. if (!list_empty(&worker->worker_list)) {
  362. list_del_init(&worker->worker_list);
  363. put_worker(worker);
  364. can_stop = 1;
  365. } else
  366. can_stop = 0;
  367. spin_unlock_irq(&workers->lock);
  368. if (can_stop)
  369. kthread_stop(worker->task);
  370. spin_lock_irq(&workers->lock);
  371. put_worker(worker);
  372. }
  373. spin_unlock_irq(&workers->lock);
  374. }
  375. /*
  376. * simple init on struct btrfs_workers
  377. */
  378. void btrfs_init_workers(struct btrfs_workers *workers, char *name, int max,
  379. struct btrfs_workers *async_helper)
  380. {
  381. workers->num_workers = 0;
  382. workers->num_workers_starting = 0;
  383. INIT_LIST_HEAD(&workers->worker_list);
  384. INIT_LIST_HEAD(&workers->idle_list);
  385. INIT_LIST_HEAD(&workers->order_list);
  386. INIT_LIST_HEAD(&workers->prio_order_list);
  387. spin_lock_init(&workers->lock);
  388. spin_lock_init(&workers->order_lock);
  389. workers->max_workers = max;
  390. workers->idle_thresh = 32;
  391. workers->name = name;
  392. workers->ordered = 0;
  393. workers->atomic_start_pending = 0;
  394. workers->atomic_worker_start = async_helper;
  395. workers->stopping = 0;
  396. }
  397. /*
  398. * starts new worker threads. This does not enforce the max worker
  399. * count in case you need to temporarily go past it.
  400. */
  401. static int __btrfs_start_workers(struct btrfs_workers *workers)
  402. {
  403. struct btrfs_worker_thread *worker;
  404. int ret = 0;
  405. worker = kzalloc(sizeof(*worker), GFP_NOFS);
  406. if (!worker) {
  407. ret = -ENOMEM;
  408. goto fail;
  409. }
  410. INIT_LIST_HEAD(&worker->pending);
  411. INIT_LIST_HEAD(&worker->prio_pending);
  412. INIT_LIST_HEAD(&worker->worker_list);
  413. spin_lock_init(&worker->lock);
  414. atomic_set(&worker->num_pending, 0);
  415. atomic_set(&worker->refs, 1);
  416. worker->workers = workers;
  417. worker->task = kthread_create(worker_loop, worker,
  418. "btrfs-%s-%d", workers->name,
  419. workers->num_workers + 1);
  420. if (IS_ERR(worker->task)) {
  421. ret = PTR_ERR(worker->task);
  422. goto fail;
  423. }
  424. spin_lock_irq(&workers->lock);
  425. if (workers->stopping) {
  426. spin_unlock_irq(&workers->lock);
  427. ret = -EINVAL;
  428. goto fail_kthread;
  429. }
  430. list_add_tail(&worker->worker_list, &workers->idle_list);
  431. worker->idle = 1;
  432. workers->num_workers++;
  433. workers->num_workers_starting--;
  434. WARN_ON(workers->num_workers_starting < 0);
  435. spin_unlock_irq(&workers->lock);
  436. wake_up_process(worker->task);
  437. return 0;
  438. fail_kthread:
  439. kthread_stop(worker->task);
  440. fail:
  441. kfree(worker);
  442. spin_lock_irq(&workers->lock);
  443. workers->num_workers_starting--;
  444. spin_unlock_irq(&workers->lock);
  445. return ret;
  446. }
  447. int btrfs_start_workers(struct btrfs_workers *workers)
  448. {
  449. spin_lock_irq(&workers->lock);
  450. workers->num_workers_starting++;
  451. spin_unlock_irq(&workers->lock);
  452. return __btrfs_start_workers(workers);
  453. }
  454. /*
  455. * run through the list and find a worker thread that doesn't have a lot
  456. * to do right now. This can return null if we aren't yet at the thread
  457. * count limit and all of the threads are busy.
  458. */
  459. static struct btrfs_worker_thread *next_worker(struct btrfs_workers *workers)
  460. {
  461. struct btrfs_worker_thread *worker;
  462. struct list_head *next;
  463. int enforce_min;
  464. enforce_min = (workers->num_workers + workers->num_workers_starting) <
  465. workers->max_workers;
  466. /*
  467. * if we find an idle thread, don't move it to the end of the
  468. * idle list. This improves the chance that the next submission
  469. * will reuse the same thread, and maybe catch it while it is still
  470. * working
  471. */
  472. if (!list_empty(&workers->idle_list)) {
  473. next = workers->idle_list.next;
  474. worker = list_entry(next, struct btrfs_worker_thread,
  475. worker_list);
  476. return worker;
  477. }
  478. if (enforce_min || list_empty(&workers->worker_list))
  479. return NULL;
  480. /*
  481. * if we pick a busy task, move the task to the end of the list.
  482. * hopefully this will keep things somewhat evenly balanced.
  483. * Do the move in batches based on the sequence number. This groups
  484. * requests submitted at roughly the same time onto the same worker.
  485. */
  486. next = workers->worker_list.next;
  487. worker = list_entry(next, struct btrfs_worker_thread, worker_list);
  488. worker->sequence++;
  489. if (worker->sequence % workers->idle_thresh == 0)
  490. list_move_tail(next, &workers->worker_list);
  491. return worker;
  492. }
  493. /*
  494. * selects a worker thread to take the next job. This will either find
  495. * an idle worker, start a new worker up to the max count, or just return
  496. * one of the existing busy workers.
  497. */
  498. static struct btrfs_worker_thread *find_worker(struct btrfs_workers *workers)
  499. {
  500. struct btrfs_worker_thread *worker;
  501. unsigned long flags;
  502. struct list_head *fallback;
  503. int ret;
  504. spin_lock_irqsave(&workers->lock, flags);
  505. again:
  506. worker = next_worker(workers);
  507. if (!worker) {
  508. if (workers->num_workers + workers->num_workers_starting >=
  509. workers->max_workers) {
  510. goto fallback;
  511. } else if (workers->atomic_worker_start) {
  512. workers->atomic_start_pending = 1;
  513. goto fallback;
  514. } else {
  515. workers->num_workers_starting++;
  516. spin_unlock_irqrestore(&workers->lock, flags);
  517. /* we're below the limit, start another worker */
  518. ret = __btrfs_start_workers(workers);
  519. spin_lock_irqsave(&workers->lock, flags);
  520. if (ret)
  521. goto fallback;
  522. goto again;
  523. }
  524. }
  525. goto found;
  526. fallback:
  527. fallback = NULL;
  528. /*
  529. * we have failed to find any workers, just
  530. * return the first one we can find.
  531. */
  532. if (!list_empty(&workers->worker_list))
  533. fallback = workers->worker_list.next;
  534. if (!list_empty(&workers->idle_list))
  535. fallback = workers->idle_list.next;
  536. BUG_ON(!fallback);
  537. worker = list_entry(fallback,
  538. struct btrfs_worker_thread, worker_list);
  539. found:
  540. /*
  541. * this makes sure the worker doesn't exit before it is placed
  542. * onto a busy/idle list
  543. */
  544. atomic_inc(&worker->num_pending);
  545. spin_unlock_irqrestore(&workers->lock, flags);
  546. return worker;
  547. }
  548. /*
  549. * btrfs_requeue_work just puts the work item back on the tail of the list
  550. * it was taken from. It is intended for use with long running work functions
  551. * that make some progress and want to give the cpu up for others.
  552. */
  553. void btrfs_requeue_work(struct btrfs_work *work)
  554. {
  555. struct btrfs_worker_thread *worker = work->worker;
  556. unsigned long flags;
  557. int wake = 0;
  558. if (test_and_set_bit(WORK_QUEUED_BIT, &work->flags))
  559. return;
  560. spin_lock_irqsave(&worker->lock, flags);
  561. if (test_bit(WORK_HIGH_PRIO_BIT, &work->flags))
  562. list_add_tail(&work->list, &worker->prio_pending);
  563. else
  564. list_add_tail(&work->list, &worker->pending);
  565. atomic_inc(&worker->num_pending);
  566. /* by definition we're busy, take ourselves off the idle
  567. * list
  568. */
  569. if (worker->idle) {
  570. spin_lock(&worker->workers->lock);
  571. worker->idle = 0;
  572. list_move_tail(&worker->worker_list,
  573. &worker->workers->worker_list);
  574. spin_unlock(&worker->workers->lock);
  575. }
  576. if (!worker->working) {
  577. wake = 1;
  578. worker->working = 1;
  579. }
  580. if (wake)
  581. wake_up_process(worker->task);
  582. spin_unlock_irqrestore(&worker->lock, flags);
  583. }
  584. void btrfs_set_work_high_prio(struct btrfs_work *work)
  585. {
  586. set_bit(WORK_HIGH_PRIO_BIT, &work->flags);
  587. }
  588. /*
  589. * places a struct btrfs_work into the pending queue of one of the kthreads
  590. */
  591. void btrfs_queue_worker(struct btrfs_workers *workers, struct btrfs_work *work)
  592. {
  593. struct btrfs_worker_thread *worker;
  594. unsigned long flags;
  595. int wake = 0;
  596. /* don't requeue something already on a list */
  597. if (test_and_set_bit(WORK_QUEUED_BIT, &work->flags))
  598. return;
  599. worker = find_worker(workers);
  600. if (workers->ordered) {
  601. /*
  602. * you're not allowed to do ordered queues from an
  603. * interrupt handler
  604. */
  605. spin_lock(&workers->order_lock);
  606. if (test_bit(WORK_HIGH_PRIO_BIT, &work->flags)) {
  607. list_add_tail(&work->order_list,
  608. &workers->prio_order_list);
  609. } else {
  610. list_add_tail(&work->order_list, &workers->order_list);
  611. }
  612. spin_unlock(&workers->order_lock);
  613. } else {
  614. INIT_LIST_HEAD(&work->order_list);
  615. }
  616. spin_lock_irqsave(&worker->lock, flags);
  617. if (test_bit(WORK_HIGH_PRIO_BIT, &work->flags))
  618. list_add_tail(&work->list, &worker->prio_pending);
  619. else
  620. list_add_tail(&work->list, &worker->pending);
  621. check_busy_worker(worker);
  622. /*
  623. * avoid calling into wake_up_process if this thread has already
  624. * been kicked
  625. */
  626. if (!worker->working)
  627. wake = 1;
  628. worker->working = 1;
  629. if (wake)
  630. wake_up_process(worker->task);
  631. spin_unlock_irqrestore(&worker->lock, flags);
  632. }
  633. struct __btrfs_workqueue_struct {
  634. struct workqueue_struct *normal_wq;
  635. /* List head pointing to ordered work list */
  636. struct list_head ordered_list;
  637. /* Spinlock for ordered_list */
  638. spinlock_t list_lock;
  639. /* Thresholding related variants */
  640. atomic_t pending;
  641. int max_active;
  642. int current_max;
  643. int thresh;
  644. unsigned int count;
  645. spinlock_t thres_lock;
  646. };
  647. struct btrfs_workqueue_struct {
  648. struct __btrfs_workqueue_struct *normal;
  649. struct __btrfs_workqueue_struct *high;
  650. };
  651. static inline struct __btrfs_workqueue_struct
  652. *__btrfs_alloc_workqueue(char *name, int flags, int max_active, int thresh)
  653. {
  654. struct __btrfs_workqueue_struct *ret = kzalloc(sizeof(*ret), GFP_NOFS);
  655. if (unlikely(!ret))
  656. return NULL;
  657. ret->max_active = max_active;
  658. atomic_set(&ret->pending, 0);
  659. if (thresh == 0)
  660. thresh = DFT_THRESHOLD;
  661. /* For low threshold, disabling threshold is a better choice */
  662. if (thresh < DFT_THRESHOLD) {
  663. ret->current_max = max_active;
  664. ret->thresh = NO_THRESHOLD;
  665. } else {
  666. ret->current_max = 1;
  667. ret->thresh = thresh;
  668. }
  669. if (flags & WQ_HIGHPRI)
  670. ret->normal_wq = alloc_workqueue("%s-%s-high", flags,
  671. ret->max_active,
  672. "btrfs", name);
  673. else
  674. ret->normal_wq = alloc_workqueue("%s-%s", flags,
  675. ret->max_active, "btrfs",
  676. name);
  677. if (unlikely(!ret->normal_wq)) {
  678. kfree(ret);
  679. return NULL;
  680. }
  681. INIT_LIST_HEAD(&ret->ordered_list);
  682. spin_lock_init(&ret->list_lock);
  683. spin_lock_init(&ret->thres_lock);
  684. return ret;
  685. }
  686. static inline void
  687. __btrfs_destroy_workqueue(struct __btrfs_workqueue_struct *wq);
  688. struct btrfs_workqueue_struct *btrfs_alloc_workqueue(char *name,
  689. int flags,
  690. int max_active,
  691. int thresh)
  692. {
  693. struct btrfs_workqueue_struct *ret = kzalloc(sizeof(*ret), GFP_NOFS);
  694. if (unlikely(!ret))
  695. return NULL;
  696. ret->normal = __btrfs_alloc_workqueue(name, flags & ~WQ_HIGHPRI,
  697. max_active, thresh);
  698. if (unlikely(!ret->normal)) {
  699. kfree(ret);
  700. return NULL;
  701. }
  702. if (flags & WQ_HIGHPRI) {
  703. ret->high = __btrfs_alloc_workqueue(name, flags, max_active,
  704. thresh);
  705. if (unlikely(!ret->high)) {
  706. __btrfs_destroy_workqueue(ret->normal);
  707. kfree(ret);
  708. return NULL;
  709. }
  710. }
  711. return ret;
  712. }
  713. /*
  714. * Hook for threshold which will be called in btrfs_queue_work.
  715. * This hook WILL be called in IRQ handler context,
  716. * so workqueue_set_max_active MUST NOT be called in this hook
  717. */
  718. static inline void thresh_queue_hook(struct __btrfs_workqueue_struct *wq)
  719. {
  720. if (wq->thresh == NO_THRESHOLD)
  721. return;
  722. atomic_inc(&wq->pending);
  723. }
  724. /*
  725. * Hook for threshold which will be called before executing the work,
  726. * This hook is called in kthread content.
  727. * So workqueue_set_max_active is called here.
  728. */
  729. static inline void thresh_exec_hook(struct __btrfs_workqueue_struct *wq)
  730. {
  731. int new_max_active;
  732. long pending;
  733. int need_change = 0;
  734. if (wq->thresh == NO_THRESHOLD)
  735. return;
  736. atomic_dec(&wq->pending);
  737. spin_lock(&wq->thres_lock);
  738. /*
  739. * Use wq->count to limit the calling frequency of
  740. * workqueue_set_max_active.
  741. */
  742. wq->count++;
  743. wq->count %= (wq->thresh / 4);
  744. if (!wq->count)
  745. goto out;
  746. new_max_active = wq->current_max;
  747. /*
  748. * pending may be changed later, but it's OK since we really
  749. * don't need it so accurate to calculate new_max_active.
  750. */
  751. pending = atomic_read(&wq->pending);
  752. if (pending > wq->thresh)
  753. new_max_active++;
  754. if (pending < wq->thresh / 2)
  755. new_max_active--;
  756. new_max_active = clamp_val(new_max_active, 1, wq->max_active);
  757. if (new_max_active != wq->current_max) {
  758. need_change = 1;
  759. wq->current_max = new_max_active;
  760. }
  761. out:
  762. spin_unlock(&wq->thres_lock);
  763. if (need_change) {
  764. workqueue_set_max_active(wq->normal_wq, wq->current_max);
  765. }
  766. }
  767. static void run_ordered_work(struct __btrfs_workqueue_struct *wq)
  768. {
  769. struct list_head *list = &wq->ordered_list;
  770. struct btrfs_work_struct *work;
  771. spinlock_t *lock = &wq->list_lock;
  772. unsigned long flags;
  773. while (1) {
  774. spin_lock_irqsave(lock, flags);
  775. if (list_empty(list))
  776. break;
  777. work = list_entry(list->next, struct btrfs_work_struct,
  778. ordered_list);
  779. if (!test_bit(WORK_DONE_BIT, &work->flags))
  780. break;
  781. /*
  782. * we are going to call the ordered done function, but
  783. * we leave the work item on the list as a barrier so
  784. * that later work items that are done don't have their
  785. * functions called before this one returns
  786. */
  787. if (test_and_set_bit(WORK_ORDER_DONE_BIT, &work->flags))
  788. break;
  789. spin_unlock_irqrestore(lock, flags);
  790. work->ordered_func(work);
  791. /* now take the lock again and drop our item from the list */
  792. spin_lock_irqsave(lock, flags);
  793. list_del(&work->ordered_list);
  794. spin_unlock_irqrestore(lock, flags);
  795. /*
  796. * we don't want to call the ordered free functions
  797. * with the lock held though
  798. */
  799. work->ordered_free(work);
  800. }
  801. spin_unlock_irqrestore(lock, flags);
  802. }
  803. static void normal_work_helper(struct work_struct *arg)
  804. {
  805. struct btrfs_work_struct *work;
  806. struct __btrfs_workqueue_struct *wq;
  807. int need_order = 0;
  808. work = container_of(arg, struct btrfs_work_struct, normal_work);
  809. /*
  810. * We should not touch things inside work in the following cases:
  811. * 1) after work->func() if it has no ordered_free
  812. * Since the struct is freed in work->func().
  813. * 2) after setting WORK_DONE_BIT
  814. * The work may be freed in other threads almost instantly.
  815. * So we save the needed things here.
  816. */
  817. if (work->ordered_func)
  818. need_order = 1;
  819. wq = work->wq;
  820. thresh_exec_hook(wq);
  821. work->func(work);
  822. if (need_order) {
  823. set_bit(WORK_DONE_BIT, &work->flags);
  824. run_ordered_work(wq);
  825. }
  826. }
  827. void btrfs_init_work(struct btrfs_work_struct *work,
  828. void (*func)(struct btrfs_work_struct *),
  829. void (*ordered_func)(struct btrfs_work_struct *),
  830. void (*ordered_free)(struct btrfs_work_struct *))
  831. {
  832. work->func = func;
  833. work->ordered_func = ordered_func;
  834. work->ordered_free = ordered_free;
  835. INIT_WORK(&work->normal_work, normal_work_helper);
  836. INIT_LIST_HEAD(&work->ordered_list);
  837. work->flags = 0;
  838. }
  839. static inline void __btrfs_queue_work(struct __btrfs_workqueue_struct *wq,
  840. struct btrfs_work_struct *work)
  841. {
  842. unsigned long flags;
  843. work->wq = wq;
  844. thresh_queue_hook(wq);
  845. if (work->ordered_func) {
  846. spin_lock_irqsave(&wq->list_lock, flags);
  847. list_add_tail(&work->ordered_list, &wq->ordered_list);
  848. spin_unlock_irqrestore(&wq->list_lock, flags);
  849. }
  850. queue_work(wq->normal_wq, &work->normal_work);
  851. }
  852. void btrfs_queue_work(struct btrfs_workqueue_struct *wq,
  853. struct btrfs_work_struct *work)
  854. {
  855. struct __btrfs_workqueue_struct *dest_wq;
  856. if (test_bit(WORK_HIGH_PRIO_BIT, &work->flags) && wq->high)
  857. dest_wq = wq->high;
  858. else
  859. dest_wq = wq->normal;
  860. __btrfs_queue_work(dest_wq, work);
  861. }
  862. static inline void
  863. __btrfs_destroy_workqueue(struct __btrfs_workqueue_struct *wq)
  864. {
  865. destroy_workqueue(wq->normal_wq);
  866. kfree(wq);
  867. }
  868. void btrfs_destroy_workqueue(struct btrfs_workqueue_struct *wq)
  869. {
  870. if (!wq)
  871. return;
  872. if (wq->high)
  873. __btrfs_destroy_workqueue(wq->high);
  874. __btrfs_destroy_workqueue(wq->normal);
  875. }
  876. void btrfs_workqueue_set_max(struct btrfs_workqueue_struct *wq, int max)
  877. {
  878. wq->normal->max_active = max;
  879. if (wq->high)
  880. wq->high->max_active = max;
  881. }
  882. void btrfs_set_work_high_priority(struct btrfs_work_struct *work)
  883. {
  884. set_bit(WORK_HIGH_PRIO_BIT, &work->flags);
  885. }