async-thread.c 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866
  1. /*
  2. * Copyright (C) 2007 Oracle. All rights reserved.
  3. * Copyright (C) 2014 Fujitsu. All rights reserved.
  4. *
  5. * This program is free software; you can redistribute it and/or
  6. * modify it under the terms of the GNU General Public
  7. * License v2 as published by the Free Software Foundation.
  8. *
  9. * This program is distributed in the hope that it will be useful,
  10. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  12. * General Public License for more details.
  13. *
  14. * You should have received a copy of the GNU General Public
  15. * License along with this program; if not, write to the
  16. * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
  17. * Boston, MA 021110-1307, USA.
  18. */
  19. #include <linux/kthread.h>
  20. #include <linux/slab.h>
  21. #include <linux/list.h>
  22. #include <linux/spinlock.h>
  23. #include <linux/freezer.h>
  24. #include <linux/workqueue.h>
  25. #include "async-thread.h"
  26. #define WORK_QUEUED_BIT 0
  27. #define WORK_DONE_BIT 1
  28. #define WORK_ORDER_DONE_BIT 2
  29. #define WORK_HIGH_PRIO_BIT 3
  30. /*
  31. * container for the kthread task pointer and the list of pending work
  32. * One of these is allocated per thread.
  33. */
  34. struct btrfs_worker_thread {
  35. /* pool we belong to */
  36. struct btrfs_workers *workers;
  37. /* list of struct btrfs_work that are waiting for service */
  38. struct list_head pending;
  39. struct list_head prio_pending;
  40. /* list of worker threads from struct btrfs_workers */
  41. struct list_head worker_list;
  42. /* kthread */
  43. struct task_struct *task;
  44. /* number of things on the pending list */
  45. atomic_t num_pending;
  46. /* reference counter for this struct */
  47. atomic_t refs;
  48. unsigned long sequence;
  49. /* protects the pending list. */
  50. spinlock_t lock;
  51. /* set to non-zero when this thread is already awake and kicking */
  52. int working;
  53. /* are we currently idle */
  54. int idle;
  55. };
  56. static int __btrfs_start_workers(struct btrfs_workers *workers);
  57. /*
  58. * btrfs_start_workers uses kthread_run, which can block waiting for memory
  59. * for a very long time. It will actually throttle on page writeback,
  60. * and so it may not make progress until after our btrfs worker threads
  61. * process all of the pending work structs in their queue
  62. *
  63. * This means we can't use btrfs_start_workers from inside a btrfs worker
  64. * thread that is used as part of cleaning dirty memory, which pretty much
  65. * involves all of the worker threads.
  66. *
  67. * Instead we have a helper queue who never has more than one thread
  68. * where we scheduler thread start operations. This worker_start struct
  69. * is used to contain the work and hold a pointer to the queue that needs
  70. * another worker.
  71. */
  72. struct worker_start {
  73. struct btrfs_work work;
  74. struct btrfs_workers *queue;
  75. };
  76. static void start_new_worker_func(struct btrfs_work *work)
  77. {
  78. struct worker_start *start;
  79. start = container_of(work, struct worker_start, work);
  80. __btrfs_start_workers(start->queue);
  81. kfree(start);
  82. }
  83. /*
  84. * helper function to move a thread onto the idle list after it
  85. * has finished some requests.
  86. */
  87. static void check_idle_worker(struct btrfs_worker_thread *worker)
  88. {
  89. if (!worker->idle && atomic_read(&worker->num_pending) <
  90. worker->workers->idle_thresh / 2) {
  91. unsigned long flags;
  92. spin_lock_irqsave(&worker->workers->lock, flags);
  93. worker->idle = 1;
  94. /* the list may be empty if the worker is just starting */
  95. if (!list_empty(&worker->worker_list) &&
  96. !worker->workers->stopping) {
  97. list_move(&worker->worker_list,
  98. &worker->workers->idle_list);
  99. }
  100. spin_unlock_irqrestore(&worker->workers->lock, flags);
  101. }
  102. }
  103. /*
  104. * helper function to move a thread off the idle list after new
  105. * pending work is added.
  106. */
  107. static void check_busy_worker(struct btrfs_worker_thread *worker)
  108. {
  109. if (worker->idle && atomic_read(&worker->num_pending) >=
  110. worker->workers->idle_thresh) {
  111. unsigned long flags;
  112. spin_lock_irqsave(&worker->workers->lock, flags);
  113. worker->idle = 0;
  114. if (!list_empty(&worker->worker_list) &&
  115. !worker->workers->stopping) {
  116. list_move_tail(&worker->worker_list,
  117. &worker->workers->worker_list);
  118. }
  119. spin_unlock_irqrestore(&worker->workers->lock, flags);
  120. }
  121. }
  122. static void check_pending_worker_creates(struct btrfs_worker_thread *worker)
  123. {
  124. struct btrfs_workers *workers = worker->workers;
  125. struct worker_start *start;
  126. unsigned long flags;
  127. rmb();
  128. if (!workers->atomic_start_pending)
  129. return;
  130. start = kzalloc(sizeof(*start), GFP_NOFS);
  131. if (!start)
  132. return;
  133. start->work.func = start_new_worker_func;
  134. start->queue = workers;
  135. spin_lock_irqsave(&workers->lock, flags);
  136. if (!workers->atomic_start_pending)
  137. goto out;
  138. workers->atomic_start_pending = 0;
  139. if (workers->num_workers + workers->num_workers_starting >=
  140. workers->max_workers)
  141. goto out;
  142. workers->num_workers_starting += 1;
  143. spin_unlock_irqrestore(&workers->lock, flags);
  144. btrfs_queue_worker(workers->atomic_worker_start, &start->work);
  145. return;
  146. out:
  147. kfree(start);
  148. spin_unlock_irqrestore(&workers->lock, flags);
  149. }
  150. static noinline void run_ordered_completions(struct btrfs_workers *workers,
  151. struct btrfs_work *work)
  152. {
  153. if (!workers->ordered)
  154. return;
  155. set_bit(WORK_DONE_BIT, &work->flags);
  156. spin_lock(&workers->order_lock);
  157. while (1) {
  158. if (!list_empty(&workers->prio_order_list)) {
  159. work = list_entry(workers->prio_order_list.next,
  160. struct btrfs_work, order_list);
  161. } else if (!list_empty(&workers->order_list)) {
  162. work = list_entry(workers->order_list.next,
  163. struct btrfs_work, order_list);
  164. } else {
  165. break;
  166. }
  167. if (!test_bit(WORK_DONE_BIT, &work->flags))
  168. break;
  169. /* we are going to call the ordered done function, but
  170. * we leave the work item on the list as a barrier so
  171. * that later work items that are done don't have their
  172. * functions called before this one returns
  173. */
  174. if (test_and_set_bit(WORK_ORDER_DONE_BIT, &work->flags))
  175. break;
  176. spin_unlock(&workers->order_lock);
  177. work->ordered_func(work);
  178. /* now take the lock again and drop our item from the list */
  179. spin_lock(&workers->order_lock);
  180. list_del(&work->order_list);
  181. spin_unlock(&workers->order_lock);
  182. /*
  183. * we don't want to call the ordered free functions
  184. * with the lock held though
  185. */
  186. work->ordered_free(work);
  187. spin_lock(&workers->order_lock);
  188. }
  189. spin_unlock(&workers->order_lock);
  190. }
  191. static void put_worker(struct btrfs_worker_thread *worker)
  192. {
  193. if (atomic_dec_and_test(&worker->refs))
  194. kfree(worker);
  195. }
  196. static int try_worker_shutdown(struct btrfs_worker_thread *worker)
  197. {
  198. int freeit = 0;
  199. spin_lock_irq(&worker->lock);
  200. spin_lock(&worker->workers->lock);
  201. if (worker->workers->num_workers > 1 &&
  202. worker->idle &&
  203. !worker->working &&
  204. !list_empty(&worker->worker_list) &&
  205. list_empty(&worker->prio_pending) &&
  206. list_empty(&worker->pending) &&
  207. atomic_read(&worker->num_pending) == 0) {
  208. freeit = 1;
  209. list_del_init(&worker->worker_list);
  210. worker->workers->num_workers--;
  211. }
  212. spin_unlock(&worker->workers->lock);
  213. spin_unlock_irq(&worker->lock);
  214. if (freeit)
  215. put_worker(worker);
  216. return freeit;
  217. }
  218. static struct btrfs_work *get_next_work(struct btrfs_worker_thread *worker,
  219. struct list_head *prio_head,
  220. struct list_head *head)
  221. {
  222. struct btrfs_work *work = NULL;
  223. struct list_head *cur = NULL;
  224. if (!list_empty(prio_head)) {
  225. cur = prio_head->next;
  226. goto out;
  227. }
  228. smp_mb();
  229. if (!list_empty(&worker->prio_pending))
  230. goto refill;
  231. if (!list_empty(head)) {
  232. cur = head->next;
  233. goto out;
  234. }
  235. refill:
  236. spin_lock_irq(&worker->lock);
  237. list_splice_tail_init(&worker->prio_pending, prio_head);
  238. list_splice_tail_init(&worker->pending, head);
  239. if (!list_empty(prio_head))
  240. cur = prio_head->next;
  241. else if (!list_empty(head))
  242. cur = head->next;
  243. spin_unlock_irq(&worker->lock);
  244. if (!cur)
  245. goto out_fail;
  246. out:
  247. work = list_entry(cur, struct btrfs_work, list);
  248. out_fail:
  249. return work;
  250. }
  251. /*
  252. * main loop for servicing work items
  253. */
  254. static int worker_loop(void *arg)
  255. {
  256. struct btrfs_worker_thread *worker = arg;
  257. struct list_head head;
  258. struct list_head prio_head;
  259. struct btrfs_work *work;
  260. INIT_LIST_HEAD(&head);
  261. INIT_LIST_HEAD(&prio_head);
  262. do {
  263. again:
  264. while (1) {
  265. work = get_next_work(worker, &prio_head, &head);
  266. if (!work)
  267. break;
  268. list_del(&work->list);
  269. clear_bit(WORK_QUEUED_BIT, &work->flags);
  270. work->worker = worker;
  271. work->func(work);
  272. atomic_dec(&worker->num_pending);
  273. /*
  274. * unless this is an ordered work queue,
  275. * 'work' was probably freed by func above.
  276. */
  277. run_ordered_completions(worker->workers, work);
  278. check_pending_worker_creates(worker);
  279. cond_resched();
  280. }
  281. spin_lock_irq(&worker->lock);
  282. check_idle_worker(worker);
  283. if (freezing(current)) {
  284. worker->working = 0;
  285. spin_unlock_irq(&worker->lock);
  286. try_to_freeze();
  287. } else {
  288. spin_unlock_irq(&worker->lock);
  289. if (!kthread_should_stop()) {
  290. cpu_relax();
  291. /*
  292. * we've dropped the lock, did someone else
  293. * jump_in?
  294. */
  295. smp_mb();
  296. if (!list_empty(&worker->pending) ||
  297. !list_empty(&worker->prio_pending))
  298. continue;
  299. /*
  300. * this short schedule allows more work to
  301. * come in without the queue functions
  302. * needing to go through wake_up_process()
  303. *
  304. * worker->working is still 1, so nobody
  305. * is going to try and wake us up
  306. */
  307. schedule_timeout(1);
  308. smp_mb();
  309. if (!list_empty(&worker->pending) ||
  310. !list_empty(&worker->prio_pending))
  311. continue;
  312. if (kthread_should_stop())
  313. break;
  314. /* still no more work?, sleep for real */
  315. spin_lock_irq(&worker->lock);
  316. set_current_state(TASK_INTERRUPTIBLE);
  317. if (!list_empty(&worker->pending) ||
  318. !list_empty(&worker->prio_pending)) {
  319. spin_unlock_irq(&worker->lock);
  320. set_current_state(TASK_RUNNING);
  321. goto again;
  322. }
  323. /*
  324. * this makes sure we get a wakeup when someone
  325. * adds something new to the queue
  326. */
  327. worker->working = 0;
  328. spin_unlock_irq(&worker->lock);
  329. if (!kthread_should_stop()) {
  330. schedule_timeout(HZ * 120);
  331. if (!worker->working &&
  332. try_worker_shutdown(worker)) {
  333. return 0;
  334. }
  335. }
  336. }
  337. __set_current_state(TASK_RUNNING);
  338. }
  339. } while (!kthread_should_stop());
  340. return 0;
  341. }
  342. /*
  343. * this will wait for all the worker threads to shutdown
  344. */
  345. void btrfs_stop_workers(struct btrfs_workers *workers)
  346. {
  347. struct list_head *cur;
  348. struct btrfs_worker_thread *worker;
  349. int can_stop;
  350. spin_lock_irq(&workers->lock);
  351. workers->stopping = 1;
  352. list_splice_init(&workers->idle_list, &workers->worker_list);
  353. while (!list_empty(&workers->worker_list)) {
  354. cur = workers->worker_list.next;
  355. worker = list_entry(cur, struct btrfs_worker_thread,
  356. worker_list);
  357. atomic_inc(&worker->refs);
  358. workers->num_workers -= 1;
  359. if (!list_empty(&worker->worker_list)) {
  360. list_del_init(&worker->worker_list);
  361. put_worker(worker);
  362. can_stop = 1;
  363. } else
  364. can_stop = 0;
  365. spin_unlock_irq(&workers->lock);
  366. if (can_stop)
  367. kthread_stop(worker->task);
  368. spin_lock_irq(&workers->lock);
  369. put_worker(worker);
  370. }
  371. spin_unlock_irq(&workers->lock);
  372. }
  373. /*
  374. * simple init on struct btrfs_workers
  375. */
  376. void btrfs_init_workers(struct btrfs_workers *workers, char *name, int max,
  377. struct btrfs_workers *async_helper)
  378. {
  379. workers->num_workers = 0;
  380. workers->num_workers_starting = 0;
  381. INIT_LIST_HEAD(&workers->worker_list);
  382. INIT_LIST_HEAD(&workers->idle_list);
  383. INIT_LIST_HEAD(&workers->order_list);
  384. INIT_LIST_HEAD(&workers->prio_order_list);
  385. spin_lock_init(&workers->lock);
  386. spin_lock_init(&workers->order_lock);
  387. workers->max_workers = max;
  388. workers->idle_thresh = 32;
  389. workers->name = name;
  390. workers->ordered = 0;
  391. workers->atomic_start_pending = 0;
  392. workers->atomic_worker_start = async_helper;
  393. workers->stopping = 0;
  394. }
  395. /*
  396. * starts new worker threads. This does not enforce the max worker
  397. * count in case you need to temporarily go past it.
  398. */
  399. static int __btrfs_start_workers(struct btrfs_workers *workers)
  400. {
  401. struct btrfs_worker_thread *worker;
  402. int ret = 0;
  403. worker = kzalloc(sizeof(*worker), GFP_NOFS);
  404. if (!worker) {
  405. ret = -ENOMEM;
  406. goto fail;
  407. }
  408. INIT_LIST_HEAD(&worker->pending);
  409. INIT_LIST_HEAD(&worker->prio_pending);
  410. INIT_LIST_HEAD(&worker->worker_list);
  411. spin_lock_init(&worker->lock);
  412. atomic_set(&worker->num_pending, 0);
  413. atomic_set(&worker->refs, 1);
  414. worker->workers = workers;
  415. worker->task = kthread_create(worker_loop, worker,
  416. "btrfs-%s-%d", workers->name,
  417. workers->num_workers + 1);
  418. if (IS_ERR(worker->task)) {
  419. ret = PTR_ERR(worker->task);
  420. goto fail;
  421. }
  422. spin_lock_irq(&workers->lock);
  423. if (workers->stopping) {
  424. spin_unlock_irq(&workers->lock);
  425. ret = -EINVAL;
  426. goto fail_kthread;
  427. }
  428. list_add_tail(&worker->worker_list, &workers->idle_list);
  429. worker->idle = 1;
  430. workers->num_workers++;
  431. workers->num_workers_starting--;
  432. WARN_ON(workers->num_workers_starting < 0);
  433. spin_unlock_irq(&workers->lock);
  434. wake_up_process(worker->task);
  435. return 0;
  436. fail_kthread:
  437. kthread_stop(worker->task);
  438. fail:
  439. kfree(worker);
  440. spin_lock_irq(&workers->lock);
  441. workers->num_workers_starting--;
  442. spin_unlock_irq(&workers->lock);
  443. return ret;
  444. }
  445. int btrfs_start_workers(struct btrfs_workers *workers)
  446. {
  447. spin_lock_irq(&workers->lock);
  448. workers->num_workers_starting++;
  449. spin_unlock_irq(&workers->lock);
  450. return __btrfs_start_workers(workers);
  451. }
  452. /*
  453. * run through the list and find a worker thread that doesn't have a lot
  454. * to do right now. This can return null if we aren't yet at the thread
  455. * count limit and all of the threads are busy.
  456. */
  457. static struct btrfs_worker_thread *next_worker(struct btrfs_workers *workers)
  458. {
  459. struct btrfs_worker_thread *worker;
  460. struct list_head *next;
  461. int enforce_min;
  462. enforce_min = (workers->num_workers + workers->num_workers_starting) <
  463. workers->max_workers;
  464. /*
  465. * if we find an idle thread, don't move it to the end of the
  466. * idle list. This improves the chance that the next submission
  467. * will reuse the same thread, and maybe catch it while it is still
  468. * working
  469. */
  470. if (!list_empty(&workers->idle_list)) {
  471. next = workers->idle_list.next;
  472. worker = list_entry(next, struct btrfs_worker_thread,
  473. worker_list);
  474. return worker;
  475. }
  476. if (enforce_min || list_empty(&workers->worker_list))
  477. return NULL;
  478. /*
  479. * if we pick a busy task, move the task to the end of the list.
  480. * hopefully this will keep things somewhat evenly balanced.
  481. * Do the move in batches based on the sequence number. This groups
  482. * requests submitted at roughly the same time onto the same worker.
  483. */
  484. next = workers->worker_list.next;
  485. worker = list_entry(next, struct btrfs_worker_thread, worker_list);
  486. worker->sequence++;
  487. if (worker->sequence % workers->idle_thresh == 0)
  488. list_move_tail(next, &workers->worker_list);
  489. return worker;
  490. }
  491. /*
  492. * selects a worker thread to take the next job. This will either find
  493. * an idle worker, start a new worker up to the max count, or just return
  494. * one of the existing busy workers.
  495. */
  496. static struct btrfs_worker_thread *find_worker(struct btrfs_workers *workers)
  497. {
  498. struct btrfs_worker_thread *worker;
  499. unsigned long flags;
  500. struct list_head *fallback;
  501. int ret;
  502. spin_lock_irqsave(&workers->lock, flags);
  503. again:
  504. worker = next_worker(workers);
  505. if (!worker) {
  506. if (workers->num_workers + workers->num_workers_starting >=
  507. workers->max_workers) {
  508. goto fallback;
  509. } else if (workers->atomic_worker_start) {
  510. workers->atomic_start_pending = 1;
  511. goto fallback;
  512. } else {
  513. workers->num_workers_starting++;
  514. spin_unlock_irqrestore(&workers->lock, flags);
  515. /* we're below the limit, start another worker */
  516. ret = __btrfs_start_workers(workers);
  517. spin_lock_irqsave(&workers->lock, flags);
  518. if (ret)
  519. goto fallback;
  520. goto again;
  521. }
  522. }
  523. goto found;
  524. fallback:
  525. fallback = NULL;
  526. /*
  527. * we have failed to find any workers, just
  528. * return the first one we can find.
  529. */
  530. if (!list_empty(&workers->worker_list))
  531. fallback = workers->worker_list.next;
  532. if (!list_empty(&workers->idle_list))
  533. fallback = workers->idle_list.next;
  534. BUG_ON(!fallback);
  535. worker = list_entry(fallback,
  536. struct btrfs_worker_thread, worker_list);
  537. found:
  538. /*
  539. * this makes sure the worker doesn't exit before it is placed
  540. * onto a busy/idle list
  541. */
  542. atomic_inc(&worker->num_pending);
  543. spin_unlock_irqrestore(&workers->lock, flags);
  544. return worker;
  545. }
  546. /*
  547. * btrfs_requeue_work just puts the work item back on the tail of the list
  548. * it was taken from. It is intended for use with long running work functions
  549. * that make some progress and want to give the cpu up for others.
  550. */
  551. void btrfs_requeue_work(struct btrfs_work *work)
  552. {
  553. struct btrfs_worker_thread *worker = work->worker;
  554. unsigned long flags;
  555. int wake = 0;
  556. if (test_and_set_bit(WORK_QUEUED_BIT, &work->flags))
  557. return;
  558. spin_lock_irqsave(&worker->lock, flags);
  559. if (test_bit(WORK_HIGH_PRIO_BIT, &work->flags))
  560. list_add_tail(&work->list, &worker->prio_pending);
  561. else
  562. list_add_tail(&work->list, &worker->pending);
  563. atomic_inc(&worker->num_pending);
  564. /* by definition we're busy, take ourselves off the idle
  565. * list
  566. */
  567. if (worker->idle) {
  568. spin_lock(&worker->workers->lock);
  569. worker->idle = 0;
  570. list_move_tail(&worker->worker_list,
  571. &worker->workers->worker_list);
  572. spin_unlock(&worker->workers->lock);
  573. }
  574. if (!worker->working) {
  575. wake = 1;
  576. worker->working = 1;
  577. }
  578. if (wake)
  579. wake_up_process(worker->task);
  580. spin_unlock_irqrestore(&worker->lock, flags);
  581. }
  582. void btrfs_set_work_high_prio(struct btrfs_work *work)
  583. {
  584. set_bit(WORK_HIGH_PRIO_BIT, &work->flags);
  585. }
  586. /*
  587. * places a struct btrfs_work into the pending queue of one of the kthreads
  588. */
  589. void btrfs_queue_worker(struct btrfs_workers *workers, struct btrfs_work *work)
  590. {
  591. struct btrfs_worker_thread *worker;
  592. unsigned long flags;
  593. int wake = 0;
  594. /* don't requeue something already on a list */
  595. if (test_and_set_bit(WORK_QUEUED_BIT, &work->flags))
  596. return;
  597. worker = find_worker(workers);
  598. if (workers->ordered) {
  599. /*
  600. * you're not allowed to do ordered queues from an
  601. * interrupt handler
  602. */
  603. spin_lock(&workers->order_lock);
  604. if (test_bit(WORK_HIGH_PRIO_BIT, &work->flags)) {
  605. list_add_tail(&work->order_list,
  606. &workers->prio_order_list);
  607. } else {
  608. list_add_tail(&work->order_list, &workers->order_list);
  609. }
  610. spin_unlock(&workers->order_lock);
  611. } else {
  612. INIT_LIST_HEAD(&work->order_list);
  613. }
  614. spin_lock_irqsave(&worker->lock, flags);
  615. if (test_bit(WORK_HIGH_PRIO_BIT, &work->flags))
  616. list_add_tail(&work->list, &worker->prio_pending);
  617. else
  618. list_add_tail(&work->list, &worker->pending);
  619. check_busy_worker(worker);
  620. /*
  621. * avoid calling into wake_up_process if this thread has already
  622. * been kicked
  623. */
  624. if (!worker->working)
  625. wake = 1;
  626. worker->working = 1;
  627. if (wake)
  628. wake_up_process(worker->task);
  629. spin_unlock_irqrestore(&worker->lock, flags);
  630. }
  631. struct btrfs_workqueue_struct {
  632. struct workqueue_struct *normal_wq;
  633. /* List head pointing to ordered work list */
  634. struct list_head ordered_list;
  635. /* Spinlock for ordered_list */
  636. spinlock_t list_lock;
  637. };
  638. struct btrfs_workqueue_struct *btrfs_alloc_workqueue(char *name,
  639. int flags,
  640. int max_active)
  641. {
  642. struct btrfs_workqueue_struct *ret = kzalloc(sizeof(*ret), GFP_NOFS);
  643. if (unlikely(!ret))
  644. return NULL;
  645. ret->normal_wq = alloc_workqueue("%s-%s", flags, max_active,
  646. "btrfs", name);
  647. if (unlikely(!ret->normal_wq)) {
  648. kfree(ret);
  649. return NULL;
  650. }
  651. INIT_LIST_HEAD(&ret->ordered_list);
  652. spin_lock_init(&ret->list_lock);
  653. return ret;
  654. }
  655. static void run_ordered_work(struct btrfs_workqueue_struct *wq)
  656. {
  657. struct list_head *list = &wq->ordered_list;
  658. struct btrfs_work_struct *work;
  659. spinlock_t *lock = &wq->list_lock;
  660. unsigned long flags;
  661. while (1) {
  662. spin_lock_irqsave(lock, flags);
  663. if (list_empty(list))
  664. break;
  665. work = list_entry(list->next, struct btrfs_work_struct,
  666. ordered_list);
  667. if (!test_bit(WORK_DONE_BIT, &work->flags))
  668. break;
  669. /*
  670. * we are going to call the ordered done function, but
  671. * we leave the work item on the list as a barrier so
  672. * that later work items that are done don't have their
  673. * functions called before this one returns
  674. */
  675. if (test_and_set_bit(WORK_ORDER_DONE_BIT, &work->flags))
  676. break;
  677. spin_unlock_irqrestore(lock, flags);
  678. work->ordered_func(work);
  679. /* now take the lock again and drop our item from the list */
  680. spin_lock_irqsave(lock, flags);
  681. list_del(&work->ordered_list);
  682. spin_unlock_irqrestore(lock, flags);
  683. /*
  684. * we don't want to call the ordered free functions
  685. * with the lock held though
  686. */
  687. work->ordered_free(work);
  688. }
  689. spin_unlock_irqrestore(lock, flags);
  690. }
  691. static void normal_work_helper(struct work_struct *arg)
  692. {
  693. struct btrfs_work_struct *work;
  694. struct btrfs_workqueue_struct *wq;
  695. int need_order = 0;
  696. work = container_of(arg, struct btrfs_work_struct, normal_work);
  697. /*
  698. * We should not touch things inside work in the following cases:
  699. * 1) after work->func() if it has no ordered_free
  700. * Since the struct is freed in work->func().
  701. * 2) after setting WORK_DONE_BIT
  702. * The work may be freed in other threads almost instantly.
  703. * So we save the needed things here.
  704. */
  705. if (work->ordered_func)
  706. need_order = 1;
  707. wq = work->wq;
  708. work->func(work);
  709. if (need_order) {
  710. set_bit(WORK_DONE_BIT, &work->flags);
  711. run_ordered_work(wq);
  712. }
  713. }
  714. void btrfs_init_work(struct btrfs_work_struct *work,
  715. void (*func)(struct btrfs_work_struct *),
  716. void (*ordered_func)(struct btrfs_work_struct *),
  717. void (*ordered_free)(struct btrfs_work_struct *))
  718. {
  719. work->func = func;
  720. work->ordered_func = ordered_func;
  721. work->ordered_free = ordered_free;
  722. INIT_WORK(&work->normal_work, normal_work_helper);
  723. INIT_LIST_HEAD(&work->ordered_list);
  724. work->flags = 0;
  725. }
  726. void btrfs_queue_work(struct btrfs_workqueue_struct *wq,
  727. struct btrfs_work_struct *work)
  728. {
  729. unsigned long flags;
  730. work->wq = wq;
  731. if (work->ordered_func) {
  732. spin_lock_irqsave(&wq->list_lock, flags);
  733. list_add_tail(&work->ordered_list, &wq->ordered_list);
  734. spin_unlock_irqrestore(&wq->list_lock, flags);
  735. }
  736. queue_work(wq->normal_wq, &work->normal_work);
  737. }
  738. void btrfs_destroy_workqueue(struct btrfs_workqueue_struct *wq)
  739. {
  740. destroy_workqueue(wq->normal_wq);
  741. kfree(wq);
  742. }
  743. void btrfs_workqueue_set_max(struct btrfs_workqueue_struct *wq, int max)
  744. {
  745. workqueue_set_max_active(wq->normal_wq, max);
  746. }