blob: 1dc3bf7e7ef2a9e59235c4e8af770f0d7e065340 [file] [log] [blame]
#include "kvm/threadpool.h"
#include "kvm/mutex.h"
#include "kvm/kvm.h"
#include <linux/kernel.h>
#include <linux/list.h>
#include <pthread.h>
#include <stdbool.h>
static DEFINE_MUTEX(job_mutex);
static DEFINE_MUTEX(thread_mutex);
static pthread_cond_t job_cond = PTHREAD_COND_INITIALIZER;
static LIST_HEAD(head);
static pthread_t *threads;
static long threadcount;
static bool running;
static struct thread_pool__job *thread_pool__job_pop_locked(void)
{
struct thread_pool__job *job;
if (list_empty(&head))
return NULL;
job = list_first_entry(&head, struct thread_pool__job, queue);
list_del_init(&job->queue);
return job;
}
static void thread_pool__job_push_locked(struct thread_pool__job *job)
{
list_add_tail(&job->queue, &head);
}
static struct thread_pool__job *thread_pool__job_pop(void)
{
struct thread_pool__job *job;
mutex_lock(&job_mutex);
job = thread_pool__job_pop_locked();
mutex_unlock(&job_mutex);
return job;
}
static void thread_pool__job_push(struct thread_pool__job *job)
{
mutex_lock(&job_mutex);
thread_pool__job_push_locked(job);
mutex_unlock(&job_mutex);
}
static void thread_pool__handle_job(struct thread_pool__job *job)
{
while (job) {
job->callback(job->kvm, job->data);
mutex_lock(&job->mutex);
if (--job->signalcount > 0)
/* If the job was signaled again while we were working */
thread_pool__job_push(job);
mutex_unlock(&job->mutex);
job = thread_pool__job_pop();
}
}
static void thread_pool__threadfunc_cleanup(void *param)
{
mutex_unlock(&job_mutex);
}
static void *thread_pool__threadfunc(void *param)
{
pthread_cleanup_push(thread_pool__threadfunc_cleanup, NULL);
kvm__set_thread_name("threadpool-worker");
while (running) {
struct thread_pool__job *curjob = NULL;
mutex_lock(&job_mutex);
while (running && (curjob = thread_pool__job_pop_locked()) == NULL)
pthread_cond_wait(&job_cond, &job_mutex.mutex);
mutex_unlock(&job_mutex);
if (running)
thread_pool__handle_job(curjob);
}
pthread_cleanup_pop(0);
return NULL;
}
static int thread_pool__addthread(void)
{
int res;
void *newthreads;
mutex_lock(&thread_mutex);
newthreads = realloc(threads, (threadcount + 1) * sizeof(pthread_t));
if (newthreads == NULL) {
mutex_unlock(&thread_mutex);
return -1;
}
threads = newthreads;
res = pthread_create(threads + threadcount, NULL,
thread_pool__threadfunc, NULL);
if (res == 0)
threadcount++;
mutex_unlock(&thread_mutex);
return res;
}
int thread_pool__init(struct kvm *kvm)
{
unsigned long i;
unsigned int thread_count = sysconf(_SC_NPROCESSORS_ONLN);
running = true;
for (i = 0; i < thread_count; i++)
if (thread_pool__addthread() < 0)
return i;
return i;
}
late_init(thread_pool__init);
int thread_pool__exit(struct kvm *kvm)
{
int i;
void *NUL = NULL;
running = false;
for (i = 0; i < threadcount; i++) {
mutex_lock(&job_mutex);
pthread_cond_signal(&job_cond);
mutex_unlock(&job_mutex);
}
for (i = 0; i < threadcount; i++) {
pthread_join(threads[i], NUL);
}
return 0;
}
late_exit(thread_pool__exit);
void thread_pool__do_job(struct thread_pool__job *job)
{
struct thread_pool__job *jobinfo = job;
if (jobinfo == NULL || jobinfo->callback == NULL)
return;
mutex_lock(&jobinfo->mutex);
if (jobinfo->signalcount++ == 0)
thread_pool__job_push(job);
mutex_unlock(&jobinfo->mutex);
mutex_lock(&job_mutex);
pthread_cond_signal(&job_cond);
mutex_unlock(&job_mutex);
}
void thread_pool__cancel_job(struct thread_pool__job *job)
{
bool running;
/*
* If the job is queued but not running, remove it. Otherwise, wait for
* the signalcount to drop to 0, indicating that it has finished
* running. We assume that nobody is queueing this job -
* thread_pool__do_job() isn't called - while this function is running.
*/
do {
mutex_lock(&job_mutex);
if (list_empty(&job->queue)) {
running = job->signalcount > 0;
} else {
list_del_init(&job->queue);
job->signalcount = 0;
running = false;
}
mutex_unlock(&job_mutex);
} while (running);
}