服务器之家:专注于服务器技术及软件下载分享
分类导航

Mysql|Sql Server|Oracle|Redis|MongoDB|PostgreSQL|Sqlite|DB2|mariadb|Access|数据库技术|

服务器之家 - 数据库 - Sqlite - 在SQLite中插入10亿条Python VS Rust

在SQLite中插入10亿条Python VS Rust

2021-09-07 23:48虫虫搜奇 Sqlite

写脚本来进行数据处理,比如说给数据库导入导出数据,这种任务一般来说最方便的方法是用python脚本,但是如果数据量比较大时候(比如上亿条)时候Python就会超级慢,看到无法忍受。

在实际生活中,市场有这样的案例:写脚本来进行数据处理,比如说给数据库导入导出数据,这种任务一般来说最方便的方法是用python脚本,但是如果数据量比较大时候(比如上亿条)时候Python就会超级慢,看到无法忍受。在这种案例时候该怎么做呢,有一个外国老哥分享了自己的实践经历,并且对比了Python和Rust语言给SQLite插入十一条数据的情况,最后用Rust实现了在一分钟来完成任务。我们在此分享一下该实践过程,希望能对大家有所启迪,大家也可以尝试自己最拿手方法来实现该例子,并对比一下具体性能。

在SQLite中插入10亿条Python VS Rust

概述

案例中的任务是SQLite数据库插入10亿条的数据。表(user)数据结构和约束如下:

  1. create table IF NOT EXISTS user 
  2. id INTEGER not null primary key, 
  3. area CHAR(6), 
  4. age INTEGER not null, 
  5. active INTEGER not null 
  6. ); 

随机生成数据。其中are列为六位数的区号(任何六位数字)。age将是5、10 或15中的一个数字。Active为0或1。

  • 实验环境硬件配置为:MacBook Pro,2019(2.4 GHz 四核i5,8GB内存,256GB SSD硬盘,Big Sur 11.1)。
  • 任务前提:任务无需保持程序稳健性,如果进程崩溃并且所有数据都丢失了也没关系。可以再次运行脚本。
  • 需要充分利用我的机器资源:100% CPU、8GB 内存和千兆字节的SSD空间。

无需使用真正的随机方法,stdlib伪随机方法即可。

Python

首先是原始版本的Python方法。Python标准库提供了一个SQLite模块,首先使用它编写了第一个版本。代码如下:

  1. import sqlite3 
  2. from commons import get_random_age, get_random_active, get_random_bool, get_random_area_code, create_table 
  3. DB_NAME = "naive.db" 
  4. def faker(con: sqlite3.Connection, count=100_000): 
  5. for _ in range(count): 
  6. age = get_random_age() 
  7. active = get_random_active() 
  8. # switch for area code 
  9. if get_random_bool(): 
  10. # random 6 digit number 
  11. area = get_random_area_code() 
  12. con.execute('INSERT INTO user VALUES (NULL,?,?,?)', (area, age, active)) 
  13. else: 
  14. con.execute('INSERT INTO user VALUES (NULL,NULL,?,?)', (age, active)) 
  15. con.commit() 
  16. def main(): 
  17. con = sqlite3.connect(DB_NAME, isolation_level=None
  18. con.execute('PRAGMA journal_mode = WAL;') 
  19. create_table(con) 
  20. faker(con, count=10_000_000
  21. if __name__ == '__main__': 
  22. main() 

在SQLite中插入10亿条Python VS Rust

在该脚本中,通for循环中一一插入1000万条数据。执行花了将近15分钟。基于此进行优化迭代,提高性能。

SQLite中,每次插入都是原子性的并且为一个事务。每个事务都需要保证写入磁盘(涉及IO操作),因此可能会很慢。为了优化,可以尝试通过不同大小的批量插入,对比发现,100000是最佳选择。通过这个简单的更改,运行时间减少到了10分钟,优化了3分之一,但是仍然非常耗时。优化后,批量插入版本源码:

在SQLite中插入10亿条Python VS Rust

SQLite库优化

除了在代码层优化外,如果对于单纯的数据写入,对数据库本身搞的优化也是非常重要的。对于SQLite优化,可以做如下配置:

  1. PRAGMA journal_mode = OFF
  2. PRAGMA synchronous = 0
  3. PRAGMA cache_size = 1000000
  4. PRAGMA locking_mode = EXCLUSIVE
  5. PRAGMA temp_store = MEMORY

具体解释:

首先,journal_mode设置为OFF,将会关闭回滚日志,禁用 SQLite 的原子提交和回滚功能,这样在事务失败情况下,无法恢复,基于例子实例稳健性要求可以设置,但是严禁在生产环境中使用。

其次,关闭synchronous,SQLite可以不再校验磁盘写入的数据可靠性。写入SQLite可能并不意味着它已刷新到磁盘。同样,严禁在生产环境中启用。

cache_size用户指定SQLite允许在内存中保留多少内存页。不要在生产中分配太高的的数值。

使用在EXCLUSIVE锁定模式,SQLite连接持有的锁永远不会被释放。

设置temp_store到MEMOR将使其表现得像一个内存数据库。

优化性能

对上面的两个脚本,添加 SQLite优化参数,然后重新运行:

  1. def main():     
  2. con = sqlite3.connect(DB_NAME, isolation_level=None)     
  3. con.execute('PRAGMA journal_mode = OFF;')     
  4. con.execute('PRAGMA synchronous = 0;')     
  5. con.execute('PRAGMA cache_size = 1000000;') # give it a GB     
  6. con.execute('PRAGMA locking_mode = EXCLUSIVE;')     
  7. con.execute('PRAGMA temp_store = MEMORY;')     
  8. create_table(con)     

faker(con, count=100_000_000)

优化后版本,原始版本,插入1亿行数据,大概花了10分钟;对比批量插入版本大概花了8.5分钟。

pypy版本

对比CPython PyPy在数据处理中可以提高性能,据说可以提高4倍以上的性能。本实验中也尝试编译PyPy解释器,运行脚本(代码无需修改)。

使用pypy解释器,批处理版本,插入1亿行数据只需2.5分钟。性能大概是Cpython的3.5倍,可见传说的4倍性能提高确实是真的,诚不我欺也!。同时,为了测试在纯循环插入中消耗的时间,在脚本中删除SQL指令并运行:

在SQLite中插入10亿条Python VS Rust

以上脚本在CPython中耗时5.5分钟 。PyPy执行耗时1.5分钟(同样提高了3.5倍)。

Rust

在完成Python各种优化折腾。又尝试了Rust版本的插入,对比也有个原始版本和批量插入版本。原始版本,也是每行插入:

  1. use rusqlite::{params, Connection}; 
  2. mod common; 
  3. fn faker(mut conn: Connection, count: i64) { 
  4. let tx = conn.transaction().unwrap(); 
  5. for _ in 0..count { 
  6. let with_area = common::get_random_bool(); 
  7. let age = common::get_random_age(); 
  8. let is_active = common::get_random_active(); 
  9. if with_area { 
  10. let area_code = common::get_random_area_code(); 
  11. tx.execute( 
  12. "INSERT INTO user VALUES (NULL, ?, ?, ?)", 
  13. params![area_code, age, is_active], 
  14. .unwrap(); 
  15. } else { 
  16. tx.execute( 
  17. "INSERT INTO user VALUES (NULL, NULL, ?, ?)", 
  18. params![age, is_active], 
  19. .unwrap(); 
  20. tx.commit().unwrap(); 
  21. fn main() { 
  22. let conn = Connection::open("basic.db").unwrap(); 
  23. conn.execute_batch( 
  24. "PRAGMA journal_mode = OFF
  25. PRAGMA synchronous = 0
  26. PRAGMA cache_size = 1000000
  27. PRAGMA locking_mode = EXCLUSIVE
  28. PRAGMA temp_store = MEMORY;", 
  29. .expect("PRAGMA"); 
  30. conn.execute( 
  31. "CREATE TABLE IF NOT EXISTS user ( 
  32. id INTEGER not null primary key, 
  33. area CHAR(6), 
  34. age INTEGER not null, 
  35. active INTEGER not null)", 
  36. [], 
  37. .unwrap(); 
  38. faker(conn, 100_000_000) 

该版执行,大概用时3分钟。然后我做了进一步的实验:

将rusqlite,换成sqlx异步运行。

  1. use std::str::FromStr;     
  2.  
  3. use sqlx::sqlite::{SqliteConnectOptions, SqliteJournalMode, SqliteSynchronous};     
  4. use sqlx::{ConnectOptions, Connection, Executor, SqliteConnection, Statement};     
  5.  
  6. mod common;     
  7.  
  8. async fn faker(mut conn: SqliteConnection, count: i64) -> Result<(), sqlx::Error> {     
  9. let mut tx = conn.begin().await?;     
  10. let stmt_with_area = tx     
  11. .prepare("INSERT INTO user VALUES (NULL, ?, ?, ?)")     
  12. .await?;     
  13. let stmt = tx     
  14. .prepare("INSERT INTO user VALUES (NULL, NULL, ?, ?)")     
  15. .await?;     
  16. for _ in 0..count {     
  17. let with_area = common::get_random_bool();     
  18. let age = common::get_random_age();     
  19. let is_active = common::get_random_active();     
  20. if with_area {     
  21. let area_code = common::get_random_area_code();     
  22. stmt_with_area     
  23. .query()     
  24. .bind(area_code)     
  25. .bind(age)     
  26. .bind(is_active)     
  27. .execute(&mut tx)     
  28. .await?;     
  29. } else {     
  30. stmt.query()     
  31. .bind(age)     
  32. .bind(is_active)     
  33. .execute(&mut tx)     
  34. .await?;     
  35. }     
  36. }     
  37. tx.commit().await?;     
  38. Ok(())     
  39. }     
  40.  
  41. #[tokio::main]     
  42. async fn main() -> Result<(), sqlx::Error> {     
  43. let mut conn = SqliteConnectOptions::from_str("basic_async.db")     
  44. .unwrap()     
  45. .create_if_missing(true)     
  46. .journal_mode(SqliteJournalMode::Off)     
  47. .synchronous(SqliteSynchronous::Off)     
  48. .connect()     
  49. .await?;     
  50. conn.execute("PRAGMA cache_size = 1000000;").await?;     
  51. conn.execute("PRAGMA locking_mode = EXCLUSIVE;").await?;     
  52. conn.execute("PRAGMA temp_store = MEMORY;").await?;     
  53. conn.execute(     
  54. "CREATE TABLE IF NOT EXISTS user (     
  55. id INTEGER not null primary key,     
  56. area CHAR(6),     
  57. age INTEGER not null,     
  58. active INTEGER not null);",     
  59. )     
  60. .await?;     
  61. faker(conn, 100_000_000).await?;     
  62. Ok(())     

这个版本花了大约14分钟。性能反而下降下降了。比Python版本还要差(原因值得深析)。

对执行的原始SQL语句,切换到准备好的语句并在循环中插入行,但重用了准备好的语句。该版本只用了大约一分钟。

使用准备好的语句并将它们插入到50行的批次中,插入10亿条,耗时34.3 秒。

  1. use rusqlite::{Connection, ToSql, Transaction}; 
  2. mod common; 
  3. fn faker_wrapper(mut conn: Connection, count: i64) { 
  4. let tx = conn.transaction().unwrap(); 
  5. faker(&tx, count); 
  6. tx.commit().unwrap(); 
  7. fn faker(tx: &Transaction, count: i64) { 
  8. // that is, we will batch 50 inserts of rows at once 
  9. let min_batch_size: i64 = 50
  10. if count < min_batch_size { 
  11. panic!("count cant be less than min batch size"); 
  12. // jeez, refactor this! 
  13. let mut with_area_params = " (NULL, ?, ?, ?),".repeat(min_batch_size as usize); 
  14. with_area_params.pop(); 
  15. let with_area_paramswith_area_params = with_area_params.as_str(); 
  16. let mut without_area_params = " (NULL, NULL, ?, ?),".repeat(min_batch_size as usize); 
  17. without_area_params.pop(); 
  18. let without_area_paramswithout_area_params = without_area_params.as_str(); 
  19. let st1 = format!("INSERT INTO user VALUES {}", with_area_params); 
  20. let st2 = format!("INSERT INTO user VALUES {}", without_area_params); 
  21. let mut stmt_with_area = tx.prepare_cached(st1.as_str()).unwrap(); 
  22. let mut stmt = tx.prepare_cached(st2.as_str()).unwrap(); 
  23. for _ in 0..(count / min_batch_size) { 
  24. let with_area = common::get_random_bool(); 
  25. let age = common::get_random_age(); 
  26. let is_active = common::get_random_active(); 
  27. let mut param_values: Vec<_> = Vec::new(); 
  28. if with_area { 
  29. // lets prepare the batch 
  30. let mut vector = Vec::<(String, i8, i8)>::new(); 
  31. for _ in 0..min_batch_size { 
  32. let area_code = common::get_random_area_code(); 
  33. vector.push((area_code, age, is_active)); 
  34. for batch in vector.iter() { 
  35. param_values.push(&batch.0 as &dyn ToSql); 
  36. param_values.push(&batch.1 as &dyn ToSql); 
  37. param_values.push(&batch.2 as &dyn ToSql); 
  38. stmt_with_area.execute(&*param_values).unwrap(); 
  39. } else { 
  40. // lets prepare the batch 
  41. let mut vector = Vec::<(i8, i8)>::new(); 
  42. for _ in 0..min_batch_size { 
  43. vector.push((age, is_active)); 
  44. for batch in vector.iter() { 
  45. param_values.push(&batch.0 as &dyn ToSql); 
  46. param_values.push(&batch.1 as &dyn ToSql); 
  47. stmt.execute(&*param_values).unwrap(); 
  48. fn main() { 
  49. let conn = Connection::open("basic_batched.db").unwrap(); 
  50. conn.execute_batch( 
  51. "PRAGMA journal_mode = OFF
  52. PRAGMA synchronous = 0
  53. PRAGMA cache_size = 1000000
  54. PRAGMA locking_mode = EXCLUSIVE
  55. PRAGMA temp_store = MEMORY;", 
  56. .expect("PRAGMA"); 
  57. conn.execute( 
  58. "CREATE TABLE IF NOT EXISTS user ( 
  59. id INTEGER not null primary key, 
  60. area CHAR(6), 
  61. age INTEGER not null, 
  62. active INTEGER not null)", 
  63. [], 
  64. .unwrap(); 
  65. faker_wrapper(conn, 100_000_000) 
  66. 创建了一个线程版本,其中有一个从通道接收数据的写入线程和四个将数据推送到管道其他线程。 
  67. use rusqlite::{Connection, ToSql}; 
  68. use std::sync::mpsc; 
  69. use std::sync::mpsc::{Receiver, Sender}; 
  70. use std::thread; 
  71. mod common; 
  72. static MIN_BATCH_SIZE: i64 = 50
  73. enum ParamValues { 
  74. WithArea(Vec<(String, i8, i8)>), 
  75. WithoutArea(Vec<(i8, i8)>), 
  76. fn consumer(rx: Receiver<ParamValues>) { 
  77. let mut conn = Connection::open("threaded_batched.db").unwrap(); 
  78. conn.execute_batch( 
  79. "PRAGMA journal_mode = OFF
  80. PRAGMA synchronous = 0
  81. PRAGMA cache_size = 1000000
  82. PRAGMA locking_mode = EXCLUSIVE
  83. PRAGMA temp_store = MEMORY;", 
  84. .expect("PRAGMA"); 
  85. conn.execute( 
  86. "CREATE TABLE IF NOT EXISTS user ( 
  87. id INTEGER not null primary key, 
  88. area CHAR(6), 
  89. age INTEGER not null, 
  90. active INTEGER not null)", 
  91. [], 
  92. .unwrap(); 
  93. let tx = conn.transaction().unwrap(); 
  94. // jeez, refactor this! 
  95. let mut with_area_params = " (NULL, ?, ?, ?),".repeat(MIN_BATCH_SIZE as usize); 
  96. with_area_params.pop(); 
  97. let with_area_paramswith_area_params = with_area_params.as_str(); 
  98. let mut without_area_params = " (NULL, NULL, ?, ?),".repeat(MIN_BATCH_SIZE as usize); 
  99. without_area_params.pop(); 
  100. let without_area_paramswithout_area_params = without_area_params.as_str(); 
  101. let st1 = format!("INSERT INTO user VALUES {}", with_area_params); 
  102. let st2 = format!("INSERT INTO user VALUES {}", without_area_params); 
  103. let mut stmt_with_area = tx.prepare_cached(st1.as_str()).unwrap(); 
  104. let mut stmt_without_area = tx.prepare_cached(st2.as_str()).unwrap(); 
  105. for param_values in rx { 
  106. let mut row_values: Vec<&dyn ToSql> = Vec::new(); 
  107. match param_values { 
  108. ParamValues::WithArea(values) => { 
  109. for batch in values.iter() { 
  110. row_values.push(&batch.0 as &dyn ToSql); 
  111. row_values.push(&batch.1 as &dyn ToSql); 
  112. row_values.push(&batch.2 as &dyn ToSql); 
  113. stmt_with_area.execute(&*row_values).unwrap(); 
  114. ParamValues::WithoutArea(values) => { 
  115. for batch in values.iter() { 
  116. row_values.push(&batch.0 as &dyn ToSql); 
  117. row_values.push(&batch.1 as &dyn ToSql); 
  118. stmt_without_area.execute(&*row_values).unwrap(); 
  119. tx.commit().unwrap(); 
  120. fn producer(tx: Sender<ParamValues>, count: i64) { 
  121. if count < MIN_BATCH_SIZE { 
  122. panic!("count cant be less than min batch size"); 
  123. for _ in 0..(count / MIN_BATCH_SIZE) { 
  124. let with_area = common::get_random_bool(); 
  125. let age = common::get_random_age(); 
  126. let is_active = common::get_random_active(); 
  127. let mut param_values: Vec<_> = Vec::new(); 
  128. if with_area { 
  129. // lets prepare the batch 
  130. let mut vector = Vec::<(String, i8, i8)>::new(); 
  131. for _ in 0..MIN_BATCH_SIZE { 
  132. let area_code = common::get_random_area_code(); 
  133. vector.push((area_code, age, is_active)); 
  134. for batch in vector.iter() { 
  135. param_values.push(&batch.0 as &dyn ToSql); 
  136. param_values.push(&batch.1 as &dyn ToSql); 
  137. param_values.push(&batch.2 as &dyn ToSql); 
  138. // send the values 
  139. tx.send(ParamValues::WithArea(vector)).unwrap(); 
  140. } else { 
  141. // lets prepare the batch 
  142. let mut vector = Vec::<(i8, i8)>::new(); 
  143. for _ in 0..MIN_BATCH_SIZE { 
  144. vector.push((age, is_active)); 
  145. for batch in vector.iter() { 
  146. param_values.push(&batch.0 as &dyn ToSql); 
  147. param_values.push(&batch.1 as &dyn ToSql); 
  148. // send the values 
  149. tx.send(ParamValues::WithoutArea(vector)).unwrap(); 
  150. fn main() { 
  151. // setup the DB and tables 
  152. let (tx, rx): (Sender<ParamValues>, Receiver<ParamValues>) = mpsc::channel(); 
  153. // lets launch the consumer 
  154. let consumer_handle = thread::spawn(|| consumer(rx)); 
  155. let cpu_count = num_cpus::get(); 
  156. let total_rows = 100_000_000
  157. let each_producer_count = (total_rows / cpu_count) as i64; 
  158. let mut handles = Vec::with_capacity(cpu_count); 
  159. for _ in 0..cpu_count { 
  160. let thread_tx = tx.clone(); 
  161. handles.push(thread::spawn(move || { 
  162. producer(thread_tx, each_producer_count.clone()) 
  163. })) 
  164. for t in handles { 
  165. t.join().unwrap(); 
  166. drop(tx); 
  167. // wait till consumer is exited 
  168. consumer_handle.join().unwrap(); 

这是性能最好的版本,耗时约32.37秒。

基准测试对比:

在SQLite中插入10亿条Python VS Rust

总结

通过案例不同任务实验,总体上可以得到:

  • 通过SQLite PRAGMA语句优化设置可以提高插入性能。
  • 使用准备好的语句可以提高性能
  • 进行批量插入可以提高性能。
  • PyPy 实际上比CPython快4倍
  • 线程/异步不一定能提高性能。

原文地址:https://mp.weixin.qq.com/s?__biz=MzU0MTY5MzEwMA==&mid=2247488055&idx=1&sn=2848892932ca18c1bf6835870a795c15&chksm=fb2757f4cc50dee233e8b51e6d4fddabec2e78ae7fc63da1b70181329e3f730ebaa67cf01e96&mpshare=1&s

延伸 · 阅读

精彩推荐
  • SqliteSQLite学习手册(SQLite在线备份)

    SQLite学习手册(SQLite在线备份)

    在SQLite中提供了一组用于在线数据库备份的APIs函数(C接口),可以很好的解决上述方法存在的不足。通过该组函数,可以将源数据库中的内容拷贝到另一个数...

    SQLite教程网5412020-06-06
  • SqliteSQLite教程(五):索引和数据分析/清理

    SQLite教程(五):索引和数据分析/清理

    这篇文章主要介绍了SQLite教程(五):索引和数据分析/清理,本文讲解了创建索引、删除索引、重建索引、数据分析、数据清理等内容,需要的朋友可以参考...

    数据库之家5622020-06-09
  • Sqlite初识SQLITE3数据库

    初识SQLITE3数据库

    本文主要讲诉Sqlite数据库的一些基本概念以及SQLite的优势,需要的朋友可以参考下 ...

    SQLITE教程网3192020-06-07
  • Sqlitesqlite迁移到mysql脚本的方法

    sqlite迁移到mysql脚本的方法

    这篇文章主要介绍了sqlite迁移到mysql脚本的方法,需要的朋友可以参考下 ...

    sqlite教程网4832020-08-05
  • SqliteSQLite优化方法

    SQLite优化方法

    SQLite的数据库本质上来讲就是一个磁盘上的文件,所以一切的数据库操作其实都会转化为对文件的操作,而频繁的文件操作将会是一个很好时的过程,会极...

    SQLite教程网4022020-06-01
  • SqliteSQLite 实现if not exist 类似功能的操作

    SQLite 实现if not exist 类似功能的操作

    这篇文章主要介绍了SQLite 实现if not exist 类似功能的操作,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧...

    吉普赛的歌11642021-02-06
  • SqliteSQLite教程(四):内置函数

    SQLite教程(四):内置函数

    这篇文章主要介绍了SQLite教程(四):内置函数,本文讲解了聚合函数、核心函数、日期和时间函数、,需要的朋友可以参考下 ...

    SQLite教程网4282020-06-09
  • SqliteSQLite教程(一):SQLite数据库介绍

    SQLite教程(一):SQLite数据库介绍

    这篇文章主要介绍了SQLite教程(一):SQLite数据库介绍,本文讲解了什么是SQLite、SQLite的主要优点、和RDBMS相比SQLite的一些劣势、个性化特征等内容,需要的朋...

    ZZVIPS6512020-06-08