src/os/unix/ngx_freebsd_rfork_thread.c - nginx-1.7.10

Global variables defined

Functions defined

Source code


  1. /*
  2. * Copyright (C) Igor Sysoev
  3. * Copyright (C) Nginx, Inc.
  4. */


  5. #include <ngx_config.h>
  6. #include <ngx_core.h>

  7. /*
  8. * The threads implementation uses the rfork(RFPROC|RFTHREAD|RFMEM) syscall
  9. * to create threads.  All threads use the stacks of the same size mmap()ed
  10. * below the main stack.  Thus the current thread id is determined via
  11. * the stack pointer value.
  12. *
  13. * The mutex implementation uses the ngx_atomic_cmp_set() operation
  14. * to acquire a mutex and the SysV semaphore to wait on a mutex and to wake up
  15. * the waiting threads.  The light mutex does not use semaphore, so after
  16. * spinning in the lock the thread calls sched_yield().  However the light
  17. * mutexes are intended to be used with the "trylock" operation only.
  18. * The SysV semop() is a cheap syscall, particularly if it has little sembuf's
  19. * and does not use SEM_UNDO.
  20. *
  21. * The condition variable implementation uses the signal #64.
  22. * The signal handler is SIG_IGN so the kill() is a cheap syscall.
  23. * The thread waits a signal in kevent().  The use of the EVFILT_SIGNAL
  24. * is safe since FreeBSD 4.10-STABLE.
  25. *
  26. * This threads implementation currently works on i386 (486+) and amd64
  27. * platforms only.
  28. */


  29. char                 *ngx_freebsd_kern_usrstack;
  30. size_t                ngx_thread_stack_size;


  31. static size_t         rz_size;
  32. static size_t         usable_stack_size;
  33. static char          *last_stack;

  34. static ngx_uint_t     nthreads;
  35. static ngx_uint_t     max_threads;

  36. static ngx_uint_t     nkeys;
  37. static ngx_tid_t     *tids;      /* the threads tids array */
  38. void                **ngx_tls;   /* the threads tls's array */

  39. /* the thread-safe libc errno */

  40. static int   errno0;   /* the main thread's errno */
  41. static int  *errnos;   /* the threads errno's array */

  42. int *
  43. __error()
  44. {
  45.     int  tid;

  46.     tid = ngx_gettid();

  47.     return tid ? &errnos[tid - 1] : &errno0;
  48. }


  49. /*
  50. * __isthreaded enables the spinlocks in some libc functions, i.e. in malloc()
  51. * and some other places.  Nevertheless we protect our malloc()/free() calls
  52. * by own mutex that is more efficient than the spinlock.
  53. *
  54. * _spinlock() is a weak referenced stub in src/lib/libc/gen/_spinlock_stub.c
  55. * that does nothing.
  56. */

  57. extern int  __isthreaded;

  58. void
  59. _spinlock(ngx_atomic_t *lock)
  60. {
  61.     ngx_int_t  tries;

  62.     tries = 0;

  63.     for ( ;; ) {

  64.         if (*lock) {
  65.             if (ngx_ncpu > 1 && tries++ < 1000) {
  66.                 continue;
  67.             }

  68.             sched_yield();
  69.             tries = 0;

  70.         } else {
  71.             if (ngx_atomic_cmp_set(lock, 0, 1)) {
  72.                 return;
  73.             }
  74.         }
  75.     }
  76. }


  77. /*
  78. * Before FreeBSD 5.1 _spinunlock() is a simple #define in
  79. * src/lib/libc/include/spinlock.h that zeroes lock.
  80. *
  81. * Since FreeBSD 5.1 _spinunlock() is a weak referenced stub in
  82. * src/lib/libc/gen/_spinlock_stub.c that does nothing.
  83. */

  84. #ifndef _spinunlock

  85. void
  86. _spinunlock(ngx_atomic_t *lock)
  87. {
  88.     *lock = 0;
  89. }

  90. #endif


  91. ngx_err_t
  92. ngx_create_thread(ngx_tid_t *tid, ngx_thread_value_t (*func)(void *arg),
  93.     void *arg, ngx_log_t *log)
  94. {
  95.     ngx_pid_t   id;
  96.     ngx_err_t   err;
  97.     char       *stack, *stack_top;

  98.     if (nthreads >= max_threads) {
  99.         ngx_log_error(NGX_LOG_CRIT, log, 0,
  100.                       "no more than %ui threads can be created", max_threads);
  101.         return NGX_ERROR;
  102.     }

  103.     last_stack -= ngx_thread_stack_size;

  104.     stack = mmap(last_stack, usable_stack_size, PROT_READ|PROT_WRITE,
  105.                  MAP_STACK, -1, 0);

  106.     if (stack == MAP_FAILED) {
  107.         ngx_log_error(NGX_LOG_ALERT, log, ngx_errno,
  108.                       "mmap(%p:%uz, MAP_STACK) thread stack failed",
  109.                       last_stack, usable_stack_size);
  110.         return NGX_ERROR;
  111.     }

  112.     if (stack != last_stack) {
  113.         ngx_log_error(NGX_LOG_ALERT, log, 0,
  114.                       "stack %p address was changed to %p", last_stack, stack);
  115.         return NGX_ERROR;
  116.     }

  117.     stack_top = stack + usable_stack_size;

  118.     ngx_log_debug2(NGX_LOG_DEBUG_CORE, log, 0,
  119.                    "thread stack: %p-%p", stack, stack_top);

  120.     ngx_set_errno(0);

  121.     id = rfork_thread(RFPROC|RFTHREAD|RFMEM, stack_top,
  122.                       (ngx_rfork_thread_func_pt) func, arg);

  123.     err = ngx_errno;

  124.     if (id == -1) {
  125.         ngx_log_error(NGX_LOG_ALERT, log, err, "rfork() failed");

  126.     } else {
  127.         *tid = id;
  128.         nthreads = (ngx_freebsd_kern_usrstack - stack_top)
  129.                                                        / ngx_thread_stack_size;
  130.         tids[nthreads] = id;

  131.         ngx_log_debug1(NGX_LOG_DEBUG_CORE, log, 0, "rfork()ed thread: %P", id);
  132.     }

  133.     return err;
  134. }


  135. ngx_int_t
  136. ngx_init_threads(int n, size_t size, ngx_cycle_t *cycle)
  137. {
  138.     char              *red_zone, *zone;
  139.     size_t             len;
  140.     ngx_int_t          i;
  141.     struct sigaction   sa;

  142.     max_threads = n + 1;

  143.     for (i = 0; i < n; i++) {
  144.         ngx_memzero(&sa, sizeof(struct sigaction));
  145.         sa.sa_handler = SIG_IGN;
  146.         sigemptyset(&sa.sa_mask);
  147.         if (sigaction(NGX_CV_SIGNAL, &sa, NULL) == -1) {
  148.             ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
  149.                           "sigaction(%d, SIG_IGN) failed", NGX_CV_SIGNAL);
  150.             return NGX_ERROR;
  151.         }
  152.     }

  153.     len = sizeof(ngx_freebsd_kern_usrstack);
  154.     if (sysctlbyname("kern.usrstack", &ngx_freebsd_kern_usrstack, &len,
  155.                                                                 NULL, 0) == -1)
  156.     {
  157.         ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
  158.                       "sysctlbyname(kern.usrstack) failed");
  159.         return NGX_ERROR;
  160.     }

  161.     /* the main thread stack red zone */
  162.     rz_size = ngx_pagesize;
  163.     red_zone = ngx_freebsd_kern_usrstack - (size + rz_size);

  164.     ngx_log_debug2(NGX_LOG_DEBUG_CORE, cycle->log, 0,
  165.                    "usrstack: %p red zone: %p",
  166.                    ngx_freebsd_kern_usrstack, red_zone);

  167.     zone = mmap(red_zone, rz_size, PROT_NONE, MAP_ANON, -1, 0);
  168.     if (zone == MAP_FAILED) {
  169.         ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
  170.                       "mmap(%p:%uz, PROT_NONE, MAP_ANON) red zone failed",
  171.                       red_zone, rz_size);
  172.         return NGX_ERROR;
  173.     }

  174.     if (zone != red_zone) {
  175.         ngx_log_error(NGX_LOG_ALERT, cycle->log, 0,
  176.                       "red zone %p address was changed to %p", red_zone, zone);
  177.         return NGX_ERROR;
  178.     }

  179.     /* create the thread errno' array */

  180.     errnos = ngx_calloc(n * sizeof(int), cycle->log);
  181.     if (errnos == NULL) {
  182.         return NGX_ERROR;
  183.     }

  184.     /* create the thread tids array */

  185.     tids = ngx_calloc((n + 1) * sizeof(ngx_tid_t), cycle->log);
  186.     if (tids == NULL) {
  187.         return NGX_ERROR;
  188.     }

  189.     tids[0] = ngx_pid;

  190.     /* create the thread tls' array */

  191.     ngx_tls = ngx_calloc(NGX_THREAD_KEYS_MAX * (n + 1) * sizeof(void *),
  192.                          cycle->log);
  193.     if (ngx_tls == NULL) {
  194.         return NGX_ERROR;
  195.     }

  196.     nthreads = 1;

  197.     last_stack = zone + rz_size;
  198.     usable_stack_size = size;
  199.     ngx_thread_stack_size = size + rz_size;

  200.     /* allow the spinlock in libc malloc() */
  201.     __isthreaded = 1;

  202.     ngx_threaded = 1;

  203.     return NGX_OK;
  204. }


  205. ngx_tid_t
  206. ngx_thread_self(void)
  207. {
  208.     ngx_int_t  tid;

  209.     tid = ngx_gettid();

  210.     if (tids == NULL) {
  211.         return ngx_pid;
  212.     }

  213.     return tids[tid];
  214. }


  215. ngx_err_t
  216. ngx_thread_key_create(ngx_tls_key_t *key)
  217. {
  218.     if (nkeys >= NGX_THREAD_KEYS_MAX) {
  219.         return NGX_ENOMEM;
  220.     }

  221.     *key = nkeys++;

  222.     return 0;
  223. }


  224. ngx_err_t
  225. ngx_thread_set_tls(ngx_tls_key_t key, void *value)
  226. {
  227.     if (key >= NGX_THREAD_KEYS_MAX) {
  228.         return NGX_EINVAL;
  229.     }

  230.     ngx_tls[key * NGX_THREAD_KEYS_MAX + ngx_gettid()] = value;
  231.     return 0;
  232. }


  233. ngx_mutex_t *
  234. ngx_mutex_init(ngx_log_t *log, ngx_uint_t flags)
  235. {
  236.     ngx_mutex_t  *m;
  237.     union semun   op;

  238.     m = ngx_alloc(sizeof(ngx_mutex_t), log);
  239.     if (m == NULL) {
  240.         return NULL;
  241.     }

  242.     m->lock = 0;
  243.     m->log = log;

  244.     if (flags & NGX_MUTEX_LIGHT) {
  245.         m->semid = -1;
  246.         return m;
  247.     }

  248.     m->semid = semget(IPC_PRIVATE, 1, SEM_R|SEM_A);
  249.     if (m->semid == -1) {
  250.         ngx_log_error(NGX_LOG_ALERT, log, ngx_errno, "semget() failed");
  251.         return NULL;
  252.     }

  253.     op.val = 0;

  254.     if (semctl(m->semid, 0, SETVAL, op) == -1) {
  255.         ngx_log_error(NGX_LOG_ALERT, log, ngx_errno, "semctl(SETVAL) failed");

  256.         if (semctl(m->semid, 0, IPC_RMID) == -1) {
  257.             ngx_log_error(NGX_LOG_ALERT, log, ngx_errno,
  258.                           "semctl(IPC_RMID) failed");
  259.         }

  260.         return NULL;
  261.     }

  262.     return m;
  263. }


  264. void
  265. ngx_mutex_destroy(ngx_mutex_t *m)
  266. {
  267.     if (semctl(m->semid, 0, IPC_RMID) == -1) {
  268.         ngx_log_error(NGX_LOG_ALERT, m->log, ngx_errno,
  269.                       "semctl(IPC_RMID) failed");
  270.     }

  271.     ngx_free((void *) m);
  272. }


  273. ngx_int_t
  274. ngx_mutex_dolock(ngx_mutex_t *m, ngx_int_t try)
  275. {
  276.     uint32_t       lock, old;
  277.     ngx_uint_t     tries;
  278.     struct sembuf  op;

  279.     if (!ngx_threaded) {
  280.         return NGX_OK;
  281.     }

  282. #if (NGX_DEBUG)
  283.     if (try) {
  284.         ngx_log_debug2(NGX_LOG_DEBUG_MUTEX, m->log, 0,
  285.                        "try lock mutex %p lock:%XD", m, m->lock);
  286.     } else {
  287.         ngx_log_debug2(NGX_LOG_DEBUG_MUTEX, m->log, 0,
  288.                        "lock mutex %p lock:%XD", m, m->lock);
  289.     }
  290. #endif

  291.     old = m->lock;
  292.     tries = 0;

  293.     for ( ;; ) {
  294.         if (old & NGX_MUTEX_LOCK_BUSY) {

  295.             if (try) {
  296.                 return NGX_AGAIN;
  297.             }

  298.             if (ngx_ncpu > 1 && tries++ < 1000) {

  299.                 /* the spinlock is used only on the SMP system */

  300.                 old = m->lock;
  301.                 continue;
  302.             }

  303.             if (m->semid == -1) {
  304.                 sched_yield();

  305.                 tries = 0;
  306.                 old = m->lock;
  307.                 continue;
  308.             }

  309.             ngx_log_debug2(NGX_LOG_DEBUG_MUTEX, m->log, 0,
  310.                            "mutex %p lock:%XD", m, m->lock);

  311.             /*
  312.              * The mutex is locked so we increase a number
  313.              * of the threads that are waiting on the mutex
  314.              */

  315.             lock = old + 1;

  316.             if ((lock & ~NGX_MUTEX_LOCK_BUSY) > nthreads) {
  317.                 ngx_log_error(NGX_LOG_ALERT, m->log, ngx_errno,
  318.                               "%D threads wait for mutex %p, "
  319.                               "while only %ui threads are available",
  320.                               lock & ~NGX_MUTEX_LOCK_BUSY, m, nthreads);
  321.                 ngx_abort();
  322.             }

  323.             if (ngx_atomic_cmp_set(&m->lock, old, lock)) {

  324.                 ngx_log_debug2(NGX_LOG_DEBUG_MUTEX, m->log, 0,
  325.                                "wait mutex %p lock:%XD", m, m->lock);

  326.                 /*
  327.                  * The number of the waiting threads has been increased
  328.                  * and we would wait on the SysV semaphore.
  329.                  * A semaphore should wake up us more efficiently than
  330.                  * a simple sched_yield() or usleep().
  331.                  */

  332.                 op.sem_num = 0;
  333.                 op.sem_op = -1;
  334.                 op.sem_flg = 0;

  335.                 if (semop(m->semid, &op, 1) == -1) {
  336.                     ngx_log_error(NGX_LOG_ALERT, m->log, ngx_errno,
  337.                                  "semop() failed while waiting on mutex %p", m);
  338.                     ngx_abort();
  339.                 }

  340.                 ngx_log_debug2(NGX_LOG_DEBUG_MUTEX, m->log, 0,
  341.                                "mutex waked up %p lock:%XD", m, m->lock);

  342.                 tries = 0;
  343.                 old = m->lock;
  344.                 continue;
  345.             }

  346.             old = m->lock;

  347.         } else {
  348.             lock = old | NGX_MUTEX_LOCK_BUSY;

  349.             if (ngx_atomic_cmp_set(&m->lock, old, lock)) {

  350.                 /* we locked the mutex */

  351.                 break;
  352.             }

  353.             old = m->lock;
  354.         }

  355.         if (tries++ > 1000) {

  356.             ngx_log_debug1(NGX_LOG_DEBUG_MUTEX, m->log, 0,
  357.                            "mutex %p is contested", m);

  358.             /* the mutex is probably contested so we are giving up now */

  359.             sched_yield();

  360.             tries = 0;
  361.             old = m->lock;
  362.         }
  363.     }

  364.     ngx_log_debug2(NGX_LOG_DEBUG_MUTEX, m->log, 0,
  365.                    "mutex %p is locked, lock:%XD", m, m->lock);

  366.     return NGX_OK;
  367. }


  368. void
  369. ngx_mutex_unlock(ngx_mutex_t *m)
  370. {
  371.     uint32_t       lock, old;
  372.     struct sembuf  op;

  373.     if (!ngx_threaded) {
  374.         return;
  375.     }

  376.     old = m->lock;

  377.     if (!(old & NGX_MUTEX_LOCK_BUSY)) {
  378.         ngx_log_error(NGX_LOG_ALERT, m->log, 0,
  379.                       "trying to unlock the free mutex %p", m);
  380.         ngx_abort();
  381.     }

  382.     /* free the mutex */

  383. #if 0
  384.     ngx_log_debug2(NGX_LOG_DEBUG_MUTEX, m->log, 0,
  385.                    "unlock mutex %p lock:%XD", m, old);
  386. #endif

  387.     for ( ;; ) {
  388.         lock = old & ~NGX_MUTEX_LOCK_BUSY;

  389.         if (ngx_atomic_cmp_set(&m->lock, old, lock)) {
  390.             break;
  391.         }

  392.         old = m->lock;
  393.     }

  394.     if (m->semid == -1) {
  395.         ngx_log_debug1(NGX_LOG_DEBUG_MUTEX, m->log, 0,
  396.                        "mutex %p is unlocked", m);

  397.         return;
  398.     }

  399.     /* check whether we need to wake up a waiting thread */

  400.     old = m->lock;

  401.     for ( ;; ) {
  402.         if (old & NGX_MUTEX_LOCK_BUSY) {

  403.             /* the mutex is just locked by another thread */

  404.             break;
  405.         }

  406.         if (old == 0) {
  407.             break;
  408.         }

  409.         /* there are the waiting threads */

  410.         lock = old - 1;

  411.         if (ngx_atomic_cmp_set(&m->lock, old, lock)) {

  412.             /* wake up the thread that waits on semaphore */

  413.             ngx_log_debug1(NGX_LOG_DEBUG_MUTEX, m->log, 0,
  414.                            "wake up mutex %p", m);

  415.             op.sem_num = 0;
  416.             op.sem_op = 1;
  417.             op.sem_flg = 0;

  418.             if (semop(m->semid, &op, 1) == -1) {
  419.                 ngx_log_error(NGX_LOG_ALERT, m->log, ngx_errno,
  420.                               "semop() failed while waking up on mutex %p", m);
  421.                 ngx_abort();
  422.             }

  423.             break;
  424.         }

  425.         old = m->lock;
  426.     }

  427.     ngx_log_debug1(NGX_LOG_DEBUG_MUTEX, m->log, 0,
  428.                    "mutex %p is unlocked", m);

  429.     return;
  430. }


  431. ngx_cond_t *
  432. ngx_cond_init(ngx_log_t *log)
  433. {
  434.     ngx_cond_t  *cv;

  435.     cv = ngx_alloc(sizeof(ngx_cond_t), log);
  436.     if (cv == NULL) {
  437.         return NULL;
  438.     }

  439.     cv->signo = NGX_CV_SIGNAL;
  440.     cv->tid = -1;
  441.     cv->log = log;
  442.     cv->kq = -1;

  443.     return cv;
  444. }


  445. void
  446. ngx_cond_destroy(ngx_cond_t *cv)
  447. {
  448.     if (close(cv->kq) == -1) {
  449.         ngx_log_error(NGX_LOG_ALERT, cv->log, ngx_errno,
  450.                       "kqueue close() failed");
  451.     }

  452.     ngx_free(cv);
  453. }


  454. ngx_int_t
  455. ngx_cond_wait(ngx_cond_t *cv, ngx_mutex_t *m)
  456. {
  457.     int              n;
  458.     ngx_err_t        err;
  459.     struct kevent    kev;
  460.     struct timespec  ts;

  461.     if (cv->kq == -1) {

  462.         /*
  463.          * We have to add the EVFILT_SIGNAL filter in the rfork()ed thread.
  464.          * Otherwise the thread would not get a signal event.
  465.          *
  466.          * However, we have not to open the kqueue in the thread,
  467.          * it is simply handy do it together.
  468.          */

  469.         cv->kq = kqueue();
  470.         if (cv->kq == -1) {
  471.             ngx_log_error(NGX_LOG_ALERT, cv->log, ngx_errno, "kqueue() failed");
  472.             return NGX_ERROR;
  473.         }

  474.         ngx_log_debug2(NGX_LOG_DEBUG_CORE, cv->log, 0,
  475.                        "cv kq:%d signo:%d", cv->kq, cv->signo);

  476.         kev.ident = cv->signo;
  477.         kev.filter = EVFILT_SIGNAL;
  478.         kev.flags = EV_ADD;
  479.         kev.fflags = 0;
  480.         kev.data = 0;
  481.         kev.udata = NULL;

  482.         ts.tv_sec = 0;
  483.         ts.tv_nsec = 0;

  484.         if (kevent(cv->kq, &kev, 1, NULL, 0, &ts) == -1) {
  485.             ngx_log_error(NGX_LOG_ALERT, cv->log, ngx_errno, "kevent() failed");
  486.             return NGX_ERROR;
  487.         }

  488.         cv->tid = ngx_thread_self();
  489.     }

  490.     ngx_mutex_unlock(m);

  491.     ngx_log_debug3(NGX_LOG_DEBUG_CORE, cv->log, 0,
  492.                    "cv %p wait, kq:%d, signo:%d", cv, cv->kq, cv->signo);

  493.     for ( ;; ) {
  494.         n = kevent(cv->kq, NULL, 0, &kev, 1, NULL);

  495.         ngx_log_debug2(NGX_LOG_DEBUG_CORE, cv->log, 0,
  496.                        "cv %p kevent: %d", cv, n);

  497.         if (n == -1) {
  498.             err = ngx_errno;
  499.             ngx_log_error((err == NGX_EINTR) ? NGX_LOG_INFO : NGX_LOG_ALERT,
  500.                           cv->log, ngx_errno,
  501.                           "kevent() failed while waiting condition variable %p",
  502.                           cv);

  503.             if (err == NGX_EINTR) {
  504.                 break;
  505.             }

  506.             return NGX_ERROR;
  507.         }

  508.         if (n == 0) {
  509.             ngx_log_error(NGX_LOG_ALERT, cv->log, 0,
  510.                           "kevent() returned no events "
  511.                           "while waiting condition variable %p",
  512.                           cv);
  513.             continue;
  514.         }

  515.         if (kev.filter != EVFILT_SIGNAL) {
  516.             ngx_log_error(NGX_LOG_ALERT, cv->log, 0,
  517.                           "kevent() returned unexpected events: %d "
  518.                           "while waiting condition variable %p",
  519.                           kev.filter, cv);
  520.             continue;
  521.         }

  522.         if (kev.ident != (uintptr_t) cv->signo) {
  523.             ngx_log_error(NGX_LOG_ALERT, cv->log, 0,
  524.                           "kevent() returned unexpected signal: %d ",
  525.                           "while waiting condition variable %p",
  526.                           kev.ident, cv);
  527.             continue;
  528.         }

  529.         break;
  530.     }

  531.     ngx_log_debug1(NGX_LOG_DEBUG_CORE, cv->log, 0, "cv %p is waked up", cv);

  532.     ngx_mutex_lock(m);

  533.     return NGX_OK;
  534. }


  535. ngx_int_t
  536. ngx_cond_signal(ngx_cond_t *cv)
  537. {
  538.     ngx_err_t  err;

  539.     ngx_log_debug3(NGX_LOG_DEBUG_CORE, cv->log, 0,
  540.                    "cv %p to signal %P %d",
  541.                    cv, cv->tid, cv->signo);

  542.     if (cv->tid == -1) {
  543.         return NGX_OK;
  544.     }

  545.     if (kill(cv->tid, cv->signo) == -1) {

  546.         err = ngx_errno;

  547.         ngx_log_error(NGX_LOG_ALERT, cv->log, err,
  548.                      "kill() failed while signaling condition variable %p", cv);

  549.         if (err == NGX_ESRCH) {
  550.             cv->tid = -1;
  551.         }

  552.         return NGX_ERROR;
  553.     }

  554.     ngx_log_debug1(NGX_LOG_DEBUG_CORE, cv->log, 0, "cv %p is signaled", cv);

  555.     return NGX_OK;
  556. }