服务器之家:专注于服务器技术及软件下载分享
分类导航

PHP教程|ASP.NET教程|Java教程|ASP教程|编程技术|正则表达式|C/C++|IOS|C#|Swift|Android|VB|R语言|JavaScript|易语言|vb.net|

服务器之家 - 编程语言 - C/C++ - C/C++ 原生API实现线程池的方法

C/C++ 原生API实现线程池的方法

2022-02-17 16:44lyshark C/C++

线程池,简单来说就是有一堆已经创建好的线程,接下来通过本文给大家介绍C/C++ 原生API实现线程池的方法,感兴趣的朋友跟随小编一起看看吧

线程池有两个核心的概念,一个是任务队列,一个是工作线程队列。任务队列负责存放主线程需要处理的任务,工作线程队列其实是一个死循环,负责从任务队列中取出和运行任务,可以看成是一个生产者和多个消费l者的模型。在一些高并发的网络应用中,线程池也是常用的技术。陈硕大神推荐的C++多线程服务端编程模式为:one loop per thread + thread pool,通常会有单独的线程负责接受来自客户端的请求,对请求稍作解析后将数据处理的任务提交到专门的计算线程池。

ThreadPool 线程池同步事件: 线程池内的线程函数同样支持互斥锁,信号控制,内核事件控制,临界区控制.

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
#include <Windows.h>
#include <iostream>
#include <stdlib.h>
 
unsigned long g_count = 0;
 
// --------------------------------------------------------------
// 线程池同步-互斥量同步
void NTAPI TaskHandlerMutex(PTP_CALLBACK_INSTANCE Instance, PVOID Context, PTP_WORK Work)
{
    // 锁定资源
    WaitForSingleObject(*(HANDLE *)Context, INFINITE);
 
    for (int x = 0; x < 100; x++)
    {
        printf("线程ID: %d ---> 子线程: %d \n", GetCurrentThreadId(), x);
        g_count = g_count + 1;
    }
 
    // 解锁资源
    ReleaseMutexWhenCallbackReturns(Instance, *(HANDLE*)Context);
}
 
void TestMutex()
{
    // 创建互斥量
    HANDLE hMutex = CreateMutex(NULL, FALSE, NULL);
 
    PTP_WORK pool = CreateThreadpoolWork((PTP_WORK_CALLBACK)TaskHandlerMutex, &hMutex, NULL);
 
    for (int i = 0; i < 1000; i++)
    {
        SubmitThreadpoolWork(pool);
    }
 
    WaitForThreadpoolWorkCallbacks(pool, FALSE);
    CloseThreadpoolWork(pool);
    CloseHandle(hMutex);
 
    printf("相加后 ---> %d \n", g_count);
}
 
// --------------------------------------------------------------
// 线程池同步-事件内核对象
void NTAPI TaskHandlerKern(PTP_CALLBACK_INSTANCE Instance, PVOID Context, PTP_WORK Work)
{
    // 锁定资源
    WaitForSingleObject(*(HANDLE *)Context, INFINITE);
 
    for (int x = 0; x < 100; x++)
    {
        printf("线程ID: %d ---> 子线程: %d \n", GetCurrentThreadId(), x);
        g_count = g_count + 1;
    }
 
    // 解锁资源
    SetEventWhenCallbackReturns(Instance, *(HANDLE*)Context);
}
 
void TestKern()
{
    HANDLE hEvent = CreateEvent(NULL, FALSE, FALSE, NULL);
    SetEvent(hEvent);
 
    PTP_WORK pwk = CreateThreadpoolWork((PTP_WORK_CALLBACK)TaskHandlerKern, &hEvent, NULL);
 
    for (int i = 0; i < 1000; i++)
    {
        SubmitThreadpoolWork(pwk);
    }
 
    WaitForThreadpoolWorkCallbacks(pwk, FALSE);
    CloseThreadpoolWork(pwk);
 
    printf("相加后 ---> %d \n", g_count);
}
 
// --------------------------------------------------------------
// 线程池同步-信号量同步
void NTAPI TaskHandlerSemaphore(PTP_CALLBACK_INSTANCE Instance, PVOID Context, PTP_WORK Work)
{
    // 锁定资源
    WaitForSingleObject(*(HANDLE *)Context, INFINITE);
 
    for (int x = 0; x < 100; x++)
    {
        printf("线程ID: %d ---> 子线程: %d \n", GetCurrentThreadId(), x);
        g_count = g_count + 1;
    }
 
    // 解锁资源
    ReleaseSemaphoreWhenCallbackReturns(Instance, *(HANDLE*)Context, 1);
}
 
void TestSemaphore()
{
    // 创建信号量为100
    HANDLE hSemaphore = CreateSemaphore(NULL, 0, 100, NULL);
 
    ReleaseSemaphore(hSemaphore, 10, NULL);
 
    PTP_WORK pwk = CreateThreadpoolWork((PTP_WORK_CALLBACK)TaskHandlerSemaphore, &hSemaphore, NULL);
 
    for (int i = 0; i < 1000; i++)
    {
        SubmitThreadpoolWork(pwk);
    }
 
    WaitForThreadpoolWorkCallbacks(pwk, FALSE);
    CloseThreadpoolWork(pwk);
    CloseHandle(hSemaphore);
 
    printf("相加后 ---> %d \n", g_count);
}
 
// --------------------------------------------------------------
// 线程池同步-临界区
void NTAPI TaskHandlerLeave(PTP_CALLBACK_INSTANCE Instance, PVOID Context, PTP_WORK Work)
{
    // 锁定资源
    EnterCriticalSection((CRITICAL_SECTION*)Context);
 
    for (int x = 0; x < 100; x++)
    {
        printf("线程ID: %d ---> 子线程: %d \n", GetCurrentThreadId(), x);
        g_count = g_count + 1;
    }
 
    // 解锁资源
    LeaveCriticalSectionWhenCallbackReturns(Instance, (CRITICAL_SECTION*)Context);
}
 
void TestLeave()
{
    CRITICAL_SECTION cs;
    InitializeCriticalSection(&cs);
 
    PTP_WORK pwk = CreateThreadpoolWork((PTP_WORK_CALLBACK)TaskHandlerLeave, &cs, NULL);
 
    for (int i = 0; i < 1000; i++)
    {
        SubmitThreadpoolWork(pwk);
    }
 
    WaitForThreadpoolWorkCallbacks(pwk, FALSE);
    DeleteCriticalSection(&cs);
    CloseThreadpoolWork(pwk);
 
    printf("相加后 ---> %d \n", g_count);
}
 
int main(int argc,char *argv)
{
    //TestMutex();
    //TestKern();
    //TestSemaphore();
    TestLeave();
 
    system("pause");
    return 0;
}

简单的IO读写:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
#include <Windows.h>
#include <iostream>
#include <stdlib.h>
 
// 简单的异步文本读写
int ReadWriteIO()
{
    char enContent[] = "hello lyshark";
    char deContent[255] = { 0 };
 
    // 异步写文件
    HANDLE hFileWrite = CreateFile(L"d://test.txt", GENERIC_WRITE, 0, NULL, OPEN_ALWAYS, FILE_FLAG_SEQUENTIAL_SCAN, NULL);
    if (INVALID_HANDLE_VALUE == hFileWrite)
    {
        return 0;
    }
 
    WriteFile(hFileWrite, enContent, strlen(enContent), NULL, NULL);
    FlushFileBuffers(hFileWrite);
 
    CancelSynchronousIo(hFileWrite);
    CloseHandle(hFileWrite);
 
    // 异步读文件
 
    HANDLE hFileRead = CreateFile(L"d://test.txt", GENERIC_READ, 0, NULL, OPEN_ALWAYS, NULL, NULL);
    if (INVALID_HANDLE_VALUE == hFileRead)
    {
        return 0;
    }
 
    ReadFile(hFileRead, deContent, 255, NULL, NULL);
    CloseHandle(hFileRead);
    std::cout << "读出内容: " << deContent << std::endl;
    return 1;
}
 
 
// 通过IO获取文件大小
int GetFileSize()
{
    HANDLE hFile = CreateFile(L"d://test.txt", 0, 0, NULL, OPEN_EXISTING, NULL, NULL);
    if (INVALID_HANDLE_VALUE == hFile)
    {
        return 0;
    }
 
    ULARGE_INTEGER ulFileSize;
    ulFileSize.LowPart = GetFileSize(hFile, &ulFileSize.HighPart);
 
    LARGE_INTEGER lFileSize;
    BOOL ret = GetFileSizeEx(hFile, &lFileSize);
 
    std::cout << "文件大小A: " << ulFileSize.QuadPart << " bytes" << std::endl;
    std::cout << "文件大小B: " << lFileSize.QuadPart << " bytes" << std::endl;
    CloseHandle(hFile);
 
    return 1;
}
 
// 通过IO设置文件指针和文件尾
int SetFilePointer()
{
    char deContent[255] = { 0 };
    DWORD readCount = 0;
 
    HANDLE hFile = CreateFile(L"d://test.txt", GENERIC_WRITE, 0, NULL, OPEN_ALWAYS, NULL, NULL);
    if (INVALID_HANDLE_VALUE == hFile)
    {
        return 0;
    }
 
    LARGE_INTEGER liMove;
 
    // 设置移动位置
    liMove.QuadPart = 2;
    SetFilePointerEx(hFile, liMove, NULL, FILE_BEGIN);
 
    // 移动到文件末尾
    SetEndOfFile(hFile);
 
    ReadFile(hFile, deContent, 255, &readCount, NULL);
    std::cout << "移动指针后读取: " << deContent << " 读入长度: " << readCount << std::endl;
 
    CloseHandle(hFile);
 
    // 设置编码格式
    _wsetlocale(LC_ALL, L"chs");
    setlocale(LC_ALL, "chs");
    wprintf(L"%s", deContent);
}
 
int main(int argc,char *argv)
{
    // 读写IO
    ReadWriteIO();
 
    // 取文件长度
    GetFileSize();
 
    // 设置文件指针
    SetFilePointer();
 
    return 0;
}

到此这篇关于C/C++ 原生API实现线程池的文章就介绍到这了,更多相关C++实现线程池内容请搜索服务器之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持服务器之家!

原文链接:https://www.cnblogs.com/LyShark/p/15493202.html

延伸 · 阅读

精彩推荐