Sorry, your browser cannot access this site
This page requires browser support (enable) JavaScript
Learn more >

背景

在 visual studio中配置mysql环境参考文章如下

https://blog.csdn.net/weixin_58210154/article/details/141790084

乐观锁和悲观锁是常见的并发控制机制,解决多进程或者多线程环境中数据数据访问冲突的问题

数据库表创建

创建一个银行账户表 account,包含账户ID、姓名、余额、版本号

1
2
3
4
5
6
7
CREATE TABLE account (
id INT PRIMARY KEY AUTO_INCREMENT, /*账户*/
user_name VARCHAR(50) NOT NULL, /*用户*/
balance DECIMAL(10,2), /*余额*/
createtime DATETIME, /*时间戳*/
version INT NOT NULL /*版本号*/
);

乐观锁

乐观锁持乐观态度,它在更改共享数据时是认为不会发生冲突的,所以在访问共享数据的时候不会对其进行加锁,在提交更新数据的时候再检查这个共享数据是否被其他线程更改,如果被其它线程更改则表示产生冲突,更新失败。

乐观锁不加锁访问共享数据的方式是牺牲绝对的正确性来换取高并发的性能,就像腾讯的共享文档,每个人都都可以写文档,但是在提交的时候才能知道是否产生冲突,如果腾讯在线文档使用悲观锁,那么用户一定会崩溃,不知道要等多久才能轮到自己写文档

乐观锁的实现方式

版本号

在每次读取数据的时候先获取它的版本号,更新数据的时候再获取它的版本号,如果两次版本号一致说明这个数据没有被其它线程更改,本次更改可以正常提交,然后递增版本号;如果两次版本号不一致,说明该数据已经被其他线程更改了,产生冲突,那么本次更新就会失败。

时间戳

时间戳机制和版本号机制的处理思想是一样的,都是在更新数据前后判断时间戳是否一致来判断这个数据是否被其他线程更改。若成功更新数据,就更新时间戳。

应用场景

适用于读多写少的场景,并发冲突概率较低的场景

代码demo

版本号机制和时间戳机制的处理思想是相同,代码demo 模拟的是通过版本号机制比较乐观锁

数据库连接类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
#pragma once
#include <stdio.h>
#include <iostream>
#include <WinSock.h>
#include <Windows.h>
#include <mysql.h>
using namespace std;

class MySQLConnection {
private:
MYSQL mysql; //mysql连接
char DataBase_UserName[32]; //数据库用户名username
char DataBase_Password[32]; //数据库密码
char DataBase_Host[64]; //数据库连接地址
char DataBase_Name[64]; //database name
int DataBase_Port; //server port
public:
MySQLConnection() :
DataBase_Port(3306),
DataBase_UserName("root"),
DataBase_Password("123456"),
DataBase_Host("localhost"),
DataBase_Name("mysql_limitdeep")
{}

bool ConnectDatabase()
{
//初始化mysql
mysql_init(&mysql); //连接mysql,数据库
if (!(mysql_real_connect(&mysql, DataBase_Host, DataBase_UserName, DataBase_Password, DataBase_Name, DataBase_Port, NULL, 3306)))
{
cout << "Error connecting to database:" << mysql_error(&mysql) << endl;
return false;
}
else
{
cout << "连接MYSQL数据库成功!" << endl;
cout << "Connected..." << endl;
return true;
}
}
//释放资源
~MySQLConnection()
{
mysql_close(&mysql);
cout << "数据库已释放!" << endl;
}

// 公有接口获取 MYSQL 对象
MYSQL* GetMysql() {
return &mysql;
}

// 禁用拷贝(防止多个对象管理同一连接)
MySQLConnection(const MySQLConnection&) = delete;
MySQLConnection& operator=(const MySQLConnection&) = delete;
};

账户类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
#include <iostream>
#include <vector>
#include <string>
#include"SQLMannager.h"

using namespace std;


MySQLConnection SQLConnection;
MYSQL* mysql = SQLConnection.GetMysql();

class Account {
private:
int id; /*账户*/
string user_name; /*用户*/
double balance; /*余额*/
string createtime; /*时间戳*/
int version; /*版本号*/

public:
Account(int _id, string _user_name, double _balance, string _createtime, int _version) :
id(_id), user_name(_user_name), balance(_balance), createtime(createtime), version(_version) {
string sql = "INSERT INTO account(id, user_name, balance, createtime, version) VALUES("
+to_string(_id) + ", '"
+_user_name + "', "
+to_string(_balance) + ", NOW(), "
+to_string(_version) + ")";

if (mysql_query(mysql, sql.c_str()) != 0) {
cout << "插入失败: " << mysql_error(mysql) << endl;
}

}

void getUserInfo() {

// 安全构建SQL语句
string sql = "SELECT id, user_name, balance, createtime, version FROM account WHERE id =" + to_string(id);

// 执行查询(补全括号)
if (mysql_query(mysql, sql.c_str()) != 0) {
cout << "查询失败: " << mysql_error(mysql) << endl;
}

MYSQL_RES* result = mysql_store_result(mysql);
if (!result) {
cout << "获取结果集失败: " << mysql_error(mysql) << endl;
}

unsigned int num_fields = mysql_num_fields(result);
MYSQL_ROW row = mysql_fetch_row(result);

if (row) {
// 同时更新类成员变量
if (num_fields >= 5) {
id = stoi(row[0]);
user_name = row[1];
balance = stod(row[2]);
createtime = row[3];
version = stoi(row[4]);
}

}
cout << id << " " << user_name << " " << balance << " " << createtime << " " << version << endl;
mysql_free_result(result);
}

int getVersion() {/*计算余额,增加版本号*/
return version;
}


bool update() {/*尝试更新账户*/
string sql = "SELECT id, user_name, balance, createtime, version FROM account WHERE id = " + to_string(id);
int beforeVersion = getVersion();
int afterVersion = getVersion();
if (beforeVersion != afterVersion) return false;
else {
sql = "UPDATE account SET version = version + 1 WHERE id = " + to_string(id);
// 执行查询(补全括号)
if (mysql_query(mysql, sql.c_str()) != 0) {
cout << "更新失败: " << mysql_error(mysql) << endl;
}
}
cout << "更新后的记录是" << endl;
getUserInfo();
}
};


int main()
{
SQLConnection.ConnectDatabase();

Account zz(1,"zz",100.00,"",1);
Account ss(2,"ss",100.00,"",1);
Account xx(3,"xx",100.00,"",1);
zz.getUserInfo();
ss.getUserInfo();
xx.getUserInfo();
zz.update();
system("pause");
return 0;
}

当构造一个账户时,就往数据库中插入这条记录,每条记录插入的时候,初始版本号都是1,在main函数中构造了3个用户,分别是zz、ss、xx,终端输出结果如下:

目前的代码是单线程,我们尝试更新zz的账户,因为没有竞争关系,zz账户在更改前后判断的版本号是相同的,所以成功更新。

visual studio终端输出结果如下:初始化三个账户时,其版本都是1, 更新zz账户后,版本号+1

image-20250513171444606

数据库表查询结果如下

image-20250513171845936

现在创建多个进程去更改账户,刻意制造写冲突,看看会发生什么?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
#include <iostream>
#include <vector>
#include <string>
#include <thread>
#include <mutex>
#include <cstdlib>
#include <ctime>
#include "SQLMannager.h"

using namespace std;

mutex mtx; // 用于同步输出
mutex dbMutex; // 用于同步数据库访问
MySQLConnection SQLConnection;
MYSQL* mysql = SQLConnection.GetMysql();

class Account {
private:
int id; /*账户*/
string user_name; /*用户*/
double balance; /*余额*/
string createtime; /*时间戳*/
int version; /*版本号*/

public:
Account(int _id, string _user_name, double _balance, string _createtime, int _version) :
id(_id), user_name(_user_name), balance(_balance), createtime(createtime), version(_version) {

string sql = "INSERT INTO account(id, user_name, balance, createtime, version) VALUES("
+ to_string(_id) + ", '"
+ _user_name + "', "
+ to_string(_balance) + ", NOW(), "
+ to_string(_version) + ")";

// 加锁保护数据库操作
{
lock_guard<mutex> lock(dbMutex);
if (mysql_query(mysql, sql.c_str()) != 0) {
cout << "插入失败: " << mysql_error(mysql) << endl;
}
}
}

void getUserInfo() {
string sql = "SELECT id, user_name, balance, createtime, version FROM account WHERE id =" + to_string(id);

// 加锁保护数据库操作
{
lock_guard<mutex> lock(dbMutex);

if (mysql_query(mysql, sql.c_str()) != 0) {
cout << "查询失败: " << mysql_error(mysql) << endl;
return;
}

MYSQL_RES* result = mysql_store_result(mysql);
if (!result) {
cout << "获取结果集失败: " << mysql_error(mysql) << endl;
return;
}

unsigned int num_fields = mysql_num_fields(result);
MYSQL_ROW row = mysql_fetch_row(result);

if (row) {
if (num_fields >= 5) {
id = stoi(row[0]);
user_name = row[1];
balance = stod(row[2]);
createtime = row[3];
version = stoi(row[4]);
}
}

cout << id << " " << user_name << " " << balance << " " << createtime << " " << version << endl;
mysql_free_result(result);
}
}

int getVersion() const {
return version;
}

bool update() {
// 先刷新本地状态
{
lock_guard<mutex> lock(dbMutex);
if (!refreshFromDB()) {
return false;
}
}

int originalVersion = version;
double newBalance = balance + 10.0; // 模拟余额增加

// 正确的乐观锁更新:检查版本并更新
string sql = "UPDATE account "
"SET balance = " + to_string(newBalance) + ", "
" version = version + 1 "
"WHERE id = " + to_string(id) + " "
" AND version = " + to_string(originalVersion);

// 加锁保护数据库操作
{
lock_guard<mutex> lock(dbMutex);

if (mysql_query(mysql, sql.c_str()) != 0) {
cout << "更新失败: " << mysql_error(mysql) << endl;
return false;
}

// 检查是否有记录被更新
int affectedRows = mysql_affected_rows(mysql);
if (affectedRows == 0) {
cout << "版本冲突!账户ID: " << id
<< ", 当前版本: " << version
<< ", 期望版本: " << originalVersion << endl;
return false;
}

// 更新成功后刷新状态
if (!refreshFromDB()) {
return false;
}
}

return true;
}

private:
// 辅助方法:从数据库刷新账户信息
bool refreshFromDB() {
string sql = "SELECT id, user_name, balance, createtime, version FROM account WHERE id = " + to_string(id);

if (mysql_query(mysql, sql.c_str()) != 0) {
cout << "查询失败: " << mysql_error(mysql) << endl;
return false;
}

MYSQL_RES* result = mysql_store_result(mysql);
if (!result) {
cout << "获取结果集失败: " << mysql_error(mysql) << endl;
return false;
}

MYSQL_ROW row = mysql_fetch_row(result);
if (!row) {
cout << "未找到账户: " << id << endl;
mysql_free_result(result);
return false;
}

id = stoi(row[0]);
user_name = row[1];
balance = stod(row[2]);
createtime = row[3];
version = stoi(row[4]);

mysql_free_result(result);
return true;
}
};

void threadFunc(int threadId, Account& account) {
// 初始化随机数种子(每个线程不同)
srand(static_cast<unsigned int>(time(nullptr)) + threadId);

for (int i = 0; i < 5; i++) {
{
lock_guard<mutex> lock(mtx);
cout << "线程 " << threadId << " 尝试更新,第 " << i + 1 << " 次尝试" << endl;
}

this_thread::sleep_for(chrono::milliseconds(rand() % 100));

if (account.update()) {
lock_guard<mutex> lock(mtx);
cout << "线程 " << threadId << " 更新成功!" << endl;
}
else {
lock_guard<mutex> lock(mtx);
cout << "线程 " << threadId << " 更新失败,版本冲突" << endl;
}

this_thread::sleep_for(chrono::milliseconds(50));
}
}

int main()
{
// 初始化随机数种子
srand(static_cast<unsigned int>(time(nullptr)));

// 先连接数据库
SQLConnection.ConnectDatabase();
if (!mysql) {
cout << "数据库连接失败!" << endl;
return 1;
}

// 重置测试数据
{
lock_guard<mutex> lock(dbMutex);
string resetSql = "DELETE FROM account WHERE id = 1;";
mysql_query(mysql, resetSql.c_str());
}

// 创建账户对象
Account zz(1, "zz", 100.00, "", 1);

// 创建线程
vector<thread> threads;
for (int i = 0; i < 3; i++) {
threads.emplace_back(threadFunc, i, ref(zz));
}

// 等待所有线程完成
for (auto& t : threads) {
t.join();
}

// 输出最终状态
{
lock_guard<mutex> lock(mtx);
cout << "\n===== 测试完成 =====" << endl;
cout << "最终账户状态:" << endl;
zz.getUserInfo();
}

system("pause");
return 0;
}

image-20250513175454861

悲观锁

悲观锁持悲观态度,它在更改共享数据时是认为一定会发生冲突的,所以在访问共享数据的时候会对其进行加锁,操作完成后才会释放锁,保证线程在持有锁期间,别的线程无法访问共享数据,从而避免了冲突的发生。

悲观锁以加锁访问共享数据的方式是牺牲并发性来保证确定性,加锁的粒度也需要把控好,加锁粒度太大可能退化到串行,加锁粒度太小可能会出现数据冲突问题,加锁不当还会出现死锁。

悲观锁的实现方式

数据库:通过临键锁、间隙锁、记录锁实现

例如 select colum1, colum2….from table for update

编程语言:锁(mutex)、条件变量实现

应用场景

适用于写多读少的场景,并发冲突概率较高的场景

代码demo

多个线程轮询打印 1-m(这是典型的多线程同步和互斥问题)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
#include <iostream>
#include <thread>
#include <mutex>
#include <vector>
#include <condition_variable>
using namespace std;
mutex gMtx; // 互斥锁
condition_variable cv; // 条件变量
int num = 1; // 共享变量
const int maxNum = 100; // 最大数字
int currentThread = 1; // 当前应该执行的线程编号
void func(int id, int n) {
unique_lock<mutex> lc(gMtx);
while (num <= maxNum) {
// 等待当前线程的轮次
cv.wait(lc, [id]() { return currentThread == id; });
if (num > maxNum) {
break; // 如果 num 超过最大值,退出循环
}
// 打印当前数字
cout << "Thread " << id << " - num: " << num <<
endl;
num++;
// 更新下一个应该执行的线程编号
currentThread = (currentThread % n) + 1;
// 通知其他线程
cv.notify_all();
}
}
int main() {
int n;
cin >> n;
vector<thread> threads;
for (int i = 0; i < n; i++) {
threads.emplace_back(func, i + 1, n);
}
for (auto& t : threads) {
t.join();
}

cout << "Finished!" << endl;
return 0;
}

mutex gMtx

unique _lock lc(gMtx) //互斥锁

unique_lock 是一个模板类,使用了 RALL 资源技术,确保在作用 域结束时自动释放锁,从而避免资源泄漏和死锁问题。

当条件变量调用 wait 时, lc 会被释放

condition_variable cv; //条件变量

cv.wait ()

作用: cv.wait() 是条件变量的成员函数,作用是让当前线程进入等 待状态,直到其它线程调用了 cv.notify_all() 或者 cv.notify_one() 来唤 醒等待的线程。cv 通常和一个条件判断相结合,确保线程只有在条件 为 true 时才会继续执行

cv.wait()的两种等待形式

无条件等待——cv.wait(lc)

作用: 它会让当前线程直接进入等待状态,直到其它线程调用 notify 唤醒它,但这种方式容易受到虚假唤醒的影响

有条件等待——cv.wait(lc,lambda 表达式)

作用: 如果条件为true ,线程继续执行;如果条件为false ,线程会 再次进入等待状态

cv.wait 的工作流程(有条件)

step1: 线程调用 cv.wait 时,会释放锁 lc ,并检查 wait 的条件,如果 为 true ,则继续运行,否则进入等待状态。

step2:当其他线程调用 cv.notify_all() 或 cv.notify_one() 时,当前线程 会被唤醒。

step3:线程被唤醒后,会重新获取锁 lc ,并检查 lambda 表达式中 的条件:

如果条件为 true (即 currentThread == id) ,线程继续执行。

如果条件为false ,线程会再次释放锁并进入等待状态。

评论