服务器之家:专注于服务器技术及软件下载分享
分类导航

Linux|Centos|Ubuntu|系统进程|Fedora|注册表|Bios|Solaris|Windows7|Windows10|Windows11|windows server|

服务器之家 - 服务器系统 - Linux - Linux高性能网络编程十谈 | 协程

Linux高性能网络编程十谈 | 协程

2023-11-01 17:19未知服务器之家 Linux

在讲协程之前,先解决上一篇文章《Linux高性能网络编程十谈|多进程和多线程》留下的思考题: (1)如果在多线程程序中fork()子进程,会发生什么,我们要考虑那些问题? 首先我们会想到如果一个有多个线程的程序fork出来的子进程是

在讲协程之前,先解决上一篇文章《Linux高性能网络编程十谈|多进程和多线程》留下的思考题:

(1)如果在多线程程序中fork()子进程,会发生什么,我们要考虑那些问题?

  • 首先我们会想到如果一个有多个线程的程序fork出来的子进程是否也是多个线程呢?不是,fork出来的子进程只有一个执行线程,并不会把线程也复制过来;
  • 其次fork出来的子进程都会继承父进程的部分数据,包括锁,句柄等,也就是说在父进程被锁的临界区,在子进程也会被锁,这样可能导致在子进程逻辑中继续加锁,导致出现死锁情况;
  • 最后使用pthread_atfork解决多线程下的fork问题,如下代码注释掉pthread_atfork这一行代码,线程在父进程和子进程执行process函数中重复加锁,导致死锁,如果使用pthread_atfork,则正常:
#include <pthread.h>
#include <stdio.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <time.h>
#include <unistd.h>

pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;

void *process(void *arg) {
  printf("pid = %d begin ...\n", static_cast<int>(getpid()));
  pthread_mutex_lock(&mutex);
  struct timespec ts = {2, 0};
  nanosleep(&ts, NULL);
  pthread_mutex_unlock(&mutex);
  printf("pid = %d end ...\n", static_cast<int>(getpid()));
  return NULL;
}

void prepare(void) { pthread_mutex_unlock(&mutex); }

void parent(void) { pthread_mutex_lock(&mutex); }

int main(void) {
  // pthread_atfork(prepare, parent, NULL);
  printf("pid = %d Entering main ...\n", static_cast<int>(getpid()));
  pthread_t tid;
  pthread_create(&tid, NULL, process, NULL);
  struct timespec ts = {1, 0};
  nanosleep(&ts, NULL);
  pid_t pid = fork();
  if (fork() == 0) {
    process(NULL);
  } else {
    waitpid(pid, NULL, 0);
  }
  pthread_join(tid, NULL);
  printf("pid = %d Exiting main ...\n", static_cast<int>(getpid()));
  return 0;
}

int pthread_atfork(void (*prepare)(void), void (*parent)(void), void (*child)(void))在fork()之前调用,当调用fork时,内部创建子进程前在父进程中会调用prepare,内部创建子进程成功后,父进程会调用parent,子进程会调用child;

(2)在多线程程序中,某个线程挂了,整个进程会挂么?

  • 如果线程是非法访问内存引起的崩溃,那么进程一定会崩溃,因为在进程中,各个线程的地址空间是共享的,某个线程破坏了某个地址段,其他线程也会受到到影响,这个时候操作系统与其保留其他线程,不如直接kill掉整个进程;
  • 如果某个线程内的行为导致默认动作是停止或终止,则不管是否对其他线程是否有影响,整个进程都会停止或终止;
  • 如果线程是因为自身退出(pthread_exit())或者各个线程捕获信号可能不会挂掉整个进程,具体可以下面一个问题;

(3)如果需要将进程信号发送给某个线程,该如何处理?

  • 首先线程可独立地屏蔽某些信号,使用系统函数pthread_sigmask(),所以线程通常可以共享进程的信号,如果不需要则可以通过系统函数屏蔽;
  • 其次可调用pthread_kill(pthread_t thread, int signo),将信号发送给同一进程内指定的线程(包括自己);

第一部分:协程原理

如果您了解golang,协程应该不陌生,随意用golang写一个http server,性能都可能超过nginx,主要原因是内部使用轻量的协程,那下面我们就一起了解协程是什么?

协程就是 用户态线程,协程的调度完全由开发者进行控制,因此实现协程的关键也就是 实现一个用户态线程的调度器,由于协程是在用户态中实现调度,避免了内核态的上下文切换造成的性能损失,从而突破了线程在IO上的性能瓶颈。

我们以ucontext库为例子来说明协程是怎么运行的?(其他的协程实现方式类似)

#if defined(__APPLE__)
#define _XOPEN_SOURCE 600
#endif

#include <stdio.h>
#include <ucontext.h>

static ucontext_t ctx_main, ctx_coro;

void coroutine() {
  printf("Inside coroutine\n");
  swapcontext(&ctx_coro, &ctx_main); // 切换回主协程
  printf("Coroutine finished\n");
}

int main() {
  char coro_stack[8192];

  getcontext(&ctx_coro); // 获取协程上下文
  ctx_coro.uc_stack.ss_sp = coro_stack;
  ctx_coro.uc_stack.ss_size = sizeof(coro_stack);
  ctx_coro.uc_link = &ctx_main; // 当协程结束时,切换回主协程
  makecontext(&ctx_coro, coroutine, 0); // 设置协程的入口点

  printf("Before coroutine\n");
  swapcontext(&ctx_main, &ctx_coro); // 切换到协程
  printf("Back in main\n");

  return 0;
}

以上代码的输出(mac上运行):

Before coroutine
Inside coroutine
Back in main

以上代码的流程是:

(1)通过getcontext保留当前栈的运行上下文到ucontext_t中;

(2)通过makecontext修改ucontext_t指向coroutine入口函数;

(3)通过swapcontext切换协程;

看了如上代码,如果之前对协程没有了解的,还是比较懵,为什么getcontext能保留运行的上下文呢?我们先看一下内存中数据块分布:

Linux高性能网络编程十谈 | 协程堆栈图

一个函数执行会经过如下步骤:

(1)把参数加入栈中,如果有其他参数没有入栈,那么使用某些寄存器传递;

(2)把当前指令的下一条指令地址压入栈中;

(3)跳转到函数体执行:

(4)把EBP压入栈中,指向上一个函数堆栈帧中的帧指针的位置;

(5)保存调用前后需要保存不变的寄存器的值;

(6)将局部变量压入栈中;

从上面代码看出,当函数执行完需要恢复到上一次执行入口的寄存器地址,那getcontext只需要把当前恢复入口的地址存起来和加上必要栈信息是否就能实现保留协程栈,getcontext的确是这么做的:

movq        (%rsp), %rcx
movq        %rcx, oRIP(%rdi)
leaq        8(%rsp), %rcx         /* Exclude the return address.  */
movq        %rcx, oRSP(%rdi)

(%rsp)中保存的即是函数返回地址,也就是执行完getcontext这个函数之后需要执行的下一个指令的地址,通过context保存相关寄存器的值主要是rip值,同时把当前栈的rsp值也保存,这样便可以通过这些数据恢复context以再次继续执行。

同样我们调用swapcontext取出context信息,通过恢复下一个需要执行的函数入口实现协程切换:

movq        (%rsp), %rcx
movq        %rcx, oRIP(%rdi)
leaq        8(%rsp), %rcx       /* Exclude the return address.  */
movq        %rcx, oRSP(%rdi)

当前rsp指向的地址中存储的是返回地址,即调用swapcontext后当前协程需要执行的下一个指令地址,同时将swapcontext第二个参数的栈恢复,就进入下一个协程的入口函数。

第二部分:协程类型

目前开源有很多协程,根据运行时协程栈的分配方式分为有栈协程和无栈协程,根据调度过程中调度权的目标分为对称协程和非对称协程,下面我们来简单了解一下:

1、有栈协程和无栈协程

(1)如果每个协程都有自己的调用栈,类似于线程的调用栈就是有栈协程,微信的libco、Golang中的 goroutine、Lua中的协程都是有栈协程。

实现方式上面应该已经了解了,在内存中给每个协程开辟一个栈内存,当协程挂起时会将它的运行时上下文(即栈空间)从系统栈中保存至其所分配的栈内存中,当协程恢复时会将其运行时上下文从栈内存中恢复至系统栈中;

采用有栈协程有优点也有缺点,优点是可以任意嵌套,只要保留了当前栈的信息,可以任意的切换到其他协程中,而缺点则是性能有一定的损失,在保留栈空间信息的拷入拷出都会影响性能,同时栈的扩大和缩小需要实现动态,这里会导致内存浪费;

(2)与有栈协程相反,无栈协程不会为各个协程开辟相应的调用栈。无栈协程通常是基于状态机或闭包来实现,类似ES6、Dart中的await/async、Python的Generator、Kotlin中的协程、C++20中的cooroutine都是无栈协程;

使用无栈协程不需要修改调用栈,也无需额外的内存来保存调用栈,因此它的开销会更小,同时无需要考虑栈需要动态扩大缩小的问题,但是相比于保存运行时上下文这种实现方式,无栈协程最大的问题它无法实现在任意函数调用层级的位置进行挂起,比如最简单的无栈协程设计如下:

#include <stdio.h>

int function(void) {
  static int i, state = 0;
  switch (state) {
  case 0: /* start of function */
    for (i = 0; i < 10; i++) {
      state = 1; /* so we will come back to "case 1" */
      return i;
    case 1:; /* resume control straight after the return */
    }
  }
}

int main() {
  for (int i = 0; i < 10; i++) {
    fprintf(stdout, "%d\n", function());
  }
  return 0;
}

以上代码通过label和goto实现了yield语义,从而实现调用function()获得打印0~9,如果大家想详细了解这里面的实现可以搜索Protothreads库;

(3)有栈协程和无栈协程总结如下:

内存资源使用:无栈协程借助函数的栈帧来存储一些寄存器状态,可以调用递归函数,而有栈协程会要申请一个内存栈用来存储寄存器信息,调用递归函数可能会爆栈;

速度:无栈协程的上下文比较少,所以能够进行更快的用户态上下文切换;

功能性:有栈协程能够在嵌套的协程中进行挂起/恢复,而无栈协程只能对顶层的协程进行挂起,被调用方是不能挂起的;

2、对称协程和非对称协程

(1)对称协程:任何一个协程都是相互独立且平等的,调度权可以在任意协程之间转移,例如go语言的协程就是对称线程,其实现如下图所示:

Linux高性能网络编程十谈 | 协程调度图

CoroutineA,CoroutineB,CoroutineC之间是可以通过协程调度器可以切换到任意协程。

(2)非对称协程:协程出让调度权的目标只能是它的调用者,即协程之间存在调用和被调用关系,例如libco提供的协议就是非对称协程,其实现如下图所示:

Linux高性能网络编程十谈 | 协程调度图

CoroutineA,CoroutineB,CoroutineC之间比如与调用者成对出现,比如resume的调用者返回的位置,必须是被调用者yield。

第三部分:如何使用协程实现高性能

以下是网络IO与协程调度流程:

Linux高性能网络编程十谈 | 协程调度图

(1)epoll,kqueue等IO事件触发;

(2)调度协程循环等待,如果遇到IO事件,就创建协程开始处理;

(3)创建IO协程或者定时器协程;

(4)如果是定时器协程,就加入到定时协程队列;

(5)如果是IO协程,就加入到IO协程队列(每一个网络连接绑定一个套接字句柄,该套接字绑定一个协程);

(6)触发的IO唤醒调度器,调度器准备协程切换;

(7)从IO协程队列中取出对应的协程进行处理;

(8)如果当前协程遇到IO阻塞,比如处理完recv数据,需要send数据或者往下游send数据,都是IO阻塞场景;

(9)当前协程阻塞后将自己挂起;

(10)切换到调度协程或者其他协程继续调度(如果是对称协程直接切到调度协程,如果是非对程协程调用yield);

(11)遇到IO关闭将当前协程切换到退出状态(可以设置退出状态);

(12)IO协程直接退出;

(13)9~12步骤中的IO触发或者IO关闭以后,切换到下一个协程;

(14)如果调度协程执行完,然后查询定时协程队列,如果有超时的处理TODO;(15)执行完上述流程,继续切换回调度协程,等待IO事件的触发;

以上流程的伪代码如下(详细的代码后续会在https://github.com/linkxzhou/sthread这里开源,目前在完善中):

void process(void *args) {
  ...
  /* co_read封装监听io事件协程切换`yield` */
  ... = co_read(...)
  ...
  /* co_send封装监听io事件协程切换`yield` */
  ... = co_send(...)
  ...
}

void co_eventloop() {
  ...
  for (;;) {
    /* 调度协程通过 epoll_wait捞出就绪事件 */
    int ret = co_epoll_wait(...);
    while (...) {
      /* 如果不存在对应句柄的协程则创建协程,具体process函数处理 */
      ...* co = get_co_by_fd(...);
      if (co == NULL) {
        co = co_create(...)
      }
      ...
      /* 主协程挂起当前协程,切换到对应子协程,处理到期事件和就绪事件结果 */
      co_resume(co)
    }
    ...
    /* 调度协程处理过期事件,主协程切换到定时处理协程 */
    process_alltimeout_list(...);
    ...
  }
}

如何实现高性能呢?

(1)首先通过IO复用结合协程,每个连接绑定一个协程,由于协程比较轻量,假设对于有栈协程占用空间8K左右,100w个连接也就是8G左右,所以对于内存开销不大;

(2)其次协程调度是微秒或者纳秒级,如果对于IO密集型应用,基本上上就是一个协程处理完以后,微秒或者纳秒级内就能切换到下一个处理连接;

(3)最后对比多线程,协程减少了临界区的处理,不需要互斥锁,信号量等开销较大的同步原语,所以可以更能轻松写出高性能的server;

思考

继续提几个思考题(下一章会解答当前问题):

(1)多线程情况下如何处理协程?

(2)golang的协程调度方式是怎样的?

延伸 · 阅读

精彩推荐