服务器之家:专注于服务器技术及软件下载分享
分类导航

PHP教程|ASP.NET教程|Java教程|ASP教程|编程技术|正则表达式|C/C++|IOS|C#|Swift|Android|VB|R语言|JavaScript|易语言|vb.net|

服务器之家 - 编程语言 - C/C++ - c++线程池实现方法

c++线程池实现方法

2021-02-28 11:55liujian0616 C/C++

这篇文章主要介绍了c++线程池实现方法,实例分析了C++线程池的原理与相关实现技巧,需要的朋友可以参考下

本文实例讲述了c++线程池实现方法。分享给大家供大家参考。具体分析如下:

下面这个线程池是我在工作中用到过的,原理还是建立一个任务队列,让多个线程互斥的在队列中取出任务,然后执行,显然,队列是要加锁的

环境:ubuntu linux

文件名:locker.h

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
#ifndef LOCKER_H_
#define LOCKER_H_
#include "pthread.h"
class locker
{
public:
  locker();
  virtual ~locker();
  bool lock();
  void unlock();
private:
  pthread_mutex_t   m_mutex;
};
#endif /* LOCKER_H_ */

文件名:locker.cpp

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
#include "locker.h"
locker::locker()
{
  pthread_mutex_init(&m_mutex, 0);
}
locker::~locker()
{
  pthread_mutex_destroy(&m_mutex);
}
bool locker::lock()
{
  if(0 == pthread_mutex_lock(&m_mutex))
    return true;
  return false;
}
void locker::unlock()
{
  pthread_mutex_unlock(&m_mutex);
}

文件名:task_list.h

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
#ifndef TASK_LIST_H_
#define TASK_LIST_H_
#include "list"
#include "locker.h"
#include "netinet/in.h"
#include "semaphore.h"
using namespace std;
typedef void* (*THREAD_FUNC)(void*);
// 线程池中运行的任务,对于下行任务,sin中包含目的地址信息
// parm0指向发出数据的对象,parm1指向数据,parm2为数据的长度
typedef struct
{
  THREAD_FUNC func;
  void* parm0;
  void* parm1;
  void* parm2;
} task_info;
typedef list<task_info*> TASK_LIST;
typedef list<task_info*>::iterator PTASK_LIST;
class task_list
{
public:
  task_list();
  virtual ~task_list();
  void append_task(task_info* tsk);
  task_info* fetch_task();
private:
  TASK_LIST m_tasklist;
  locker m_lk;
  sem_t m_sem;
};
#endif /* TASK_LIST_H_ */

文件名:task_list.cpp

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
#include "task_list.h"
task_list::task_list()
{
  // Init Semaphore
  sem_init(&m_sem, 0, 0);
  m_tasklist.clear();
}
task_list::~task_list()
{
  while(!m_tasklist.empty())
  {
    task_info* tr = m_tasklist.front();
    m_tasklist.pop_front();
    if(tr)
      delete tr;
  }
  // Destroy Semaphore
  sem_destroy(&m_sem);
}
void task_list::append_task(task_info* tsk)
{
  // Lock before Modify the list
  m_lk.lock();
  m_tasklist.push_back(tsk);
  m_lk.unlock();
  // Increase the Semaphore
  sem_post(&m_sem);
}
task_info* task_list::fetch_task()
{
  task_info* tr = NULL;
  sem_wait(&m_sem);
  m_lk.lock();
  tr = m_tasklist.front();
  m_tasklist.pop_front();
  m_lk.unlock();
  return tr;
}

文件名:thread_pool.h

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
#ifndef THREAD_POOL_H_
#define THREAD_POOL_H_
#include "task_list.h"
#include "pthread.h"
#define DEFAULT_THREAD_COUNT  4
#define MAXIMUM_THREAD_COUNT  1000
class thread_pool
{
public:
  thread_pool();
  virtual ~thread_pool();
  int create_threads(int n = DEFAULT_THREAD_COUNT);
  void delete_threads();
  void set_tasklist(task_list* plist);
  void del_tasklist();
protected:
  static void* thread_func(void* parm);
  task_info* get_task();
private:
  int       m_thread_cnt;
  pthread_t    m_pids[MAXIMUM_THREAD_COUNT];
  task_list*   m_tasklist;
};
#endif /* THREAD_POOL_H_ */

文件名:thread_pool.cpp

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
#include "thread_pool.h"
thread_pool::thread_pool()
{
  m_thread_cnt = 0;
  m_tasklist = NULL;
}
thread_pool::~thread_pool()
{
  delete_threads();
}
task_info* thread_pool::get_task()
{
  task_info* tr;
  if (m_tasklist)
  {
    tr = m_tasklist->fetch_task();
    return tr;
  }
  return NULL;
}
void* thread_pool::thread_func(void* parm)
{
  thread_pool *ptp = static_cast<thread_pool*> (parm);
  task_info *task;
  while (true)
  {
    task = ptp->get_task();
    if (task)
    {
      (*task->func)(task);
      //delete task; //func负责释放task_info
    }
  }
  return NULL;
}
int thread_pool::create_threads(int n)
{
  if (n > MAXIMUM_THREAD_COUNT)
    n = MAXIMUM_THREAD_COUNT;
  delete_threads();
  for (int i = 0; i < n; i++)
  {
    int ret = pthread_create(&m_pids[i], NULL, thread_func, (void*) this);
    if (ret != 0)
      break;
    m_thread_cnt++;
  }
  return m_thread_cnt;
}
void thread_pool::delete_threads()
{
  for (int i = 0; i < m_thread_cnt; i++)
  {
    void* retval;
    pthread_cancel(m_pids[i]);
    pthread_join(m_pids[i], &retval);
  }
  m_thread_cnt = 0;
}
void thread_pool::set_tasklist(task_list* plist)
{
  m_tasklist = plist;
}
void thread_pool::del_tasklist()
{
  m_tasklist = NULL;
}

文件名:test.cpp

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
#include "unistd.h"
#include "stdio.h"
#include "stdlib.h"
#include "task_list.h"
#include "thread_pool.h"
void* fun(void *parm)
{
  task_info* ptk = (task_info*)parm;
  pid_t tid = pthread_self();
  int count = (int)ptk->parm0;
  printf("count=%d, tid=%d\n", count, tid);
  return NULL;
}
int main()
{
  int count = 0;
  thread_pool tp;
  task_list tl;
  tp.create_threads(4 - 1);
  tp.set_tasklist(&tl);
  while (1)
  {
    task_info* pti = NULL;
    pti = (task_info *) malloc(sizeof(task_info));
    pti->func = fun;
    pti->parm0 = (void *)count;
    tl.append_task(pti);
    count++;
    sleep(2);
  }
// printf("hello,world\n");
  return 0;
}

编译运行,我是用ecplise建立的automake工程,所以只要修改一下Makefile.am就可以编译成功了
文件名:Makefile.am

?
1
2
3
4
5
bin_PROGRAMS=test
test_SOURCES=test.cpp locker.h locker.cpp \
              task_list.h task_list.cpp \
              thread_pool.h thread_pool.cpp
test_LDADD=-lpthread

执行结果:

?
1
2
3
4
5
6
7
8
9
10
count=0, tid=-1219888272
count=1, tid=-1219888272
count=2, tid=-1228280976
count=3, tid=-1236673680
count=4, tid=-1219888272
count=5, tid=-1228280976
count=6, tid=-1236673680
count=7, tid=-1219888272
count=8, tid=-1228280976
count=9, tid=-1236673680

希望本文所述对大家的C++程序设计有所帮助。

延伸 · 阅读

精彩推荐