Golang的database/sql包为SQL类数据库提供了通用的API操作,它简化了对不同数据库driver的依赖,总体来说是使用简单、操作可靠同时维持了代码的扩展性,不过在使用过程中仍然有一些陷阱和误区,本文就记录下使用database/sql过程中的一些注意点,穿插一部分实现。文中部分例子来自database/sql的使用说明,示例Go版本1.6。

database/sql包的结构其实就分为驱动接口(driver)和DB操作(sql)两部分。源码并没有实现任何一种数据库的驱动,提倡以实现database/sql/driver包的形式来注册第三方的DB驱动,对开发者尽可能的屏蔽特定数据库的feature,算是go的一贯风格吧;如果你使用的SQLite、MySQL、PG这样的数据库,直接使用database/sql就可以完成绝大部分DB操作。

关于驱动

如上,数据库的驱动是第三方实现的,在Go里面,通过匿名导入的方式注册一个附加的driver这种方式很常见,下面就是MySQL的注册方式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// MySQL驱动"github.com/go-sql-driver/mysql"的init函数
func init() {
sql.Register("mysql", &MySQLDriver{})
}
// database/sql中记录驱动映射关系的map
var (
driversMu sync.RWMutex
drivers = make(map[string]driver.Driver)
)
// 实际使用中引入MySQL驱动
import (
"database/sql"
_ "github.com/go-sql-driver/mysql"
)
func main() {
db, err := sql.Open("mysql", "xxx@xxx:xxx")
// .....
}

这里主要讲下database/sql/driver这个包(后面用driver包代替)的内容,driver包通常不被直接使用,源码中database/sql是它的一种实现,写自定义驱动的时候要按照driver包的内容实现所有的interface,看几个重要的接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
// Value代表一个database/sql认识的数据库字段类型.
// 它要不为nil,或者作为Go以下原生类型的实例形式:
// int64
// float64
// bool
// []byte
// string [*] everywhere except from Rows.Next.
// time.Time
type Value interface{}
// 任何一个数据库驱动必须实现Driver接口,注册的时候就靠它
type Driver interface {
// Open方法会返回对数据库的连接对象(新连接)
// name的值其实就是常见到的dsn串,格式需要在驱动实现中定义
// Open也可能会返回一个已经缓存的DB连接(closed状态),其实这是不必要的
// 因为sql包会自己去建立连接池去管理所有的DB空闲连接并按需进行重用和释放
Open(name string) (Conn, error)
}
// Conn表示有状态的DB连接,不能在协程间共享!
type Conn interface {
// 返回一个prepared statement,和这个DB连接绑定
Prepare(query string) (Stmt, error)
// 关闭DB连接,停止当前所有的stat执行以及事务,使得连接不再被使用
//
// 因为sql包维护了自己的连接池,调用Close会用来释放不需要的连接
// 没必要在自己的驱动实现中做额外的Conn的缓存
Close() error
// 显式开启一个事务
Begin() (Tx, error)
}
// Stmt 表示一个prepare语句 - 绑定一个DB连接并且不能被多协程共享
type Stmt interface {
// Close 关闭Stmt - 也是为了释放其和后端DB绑定的连接.
Close() error
// 返回语句中占位符的数量 - 就是 '?' 的个数
// 如果 >= 0,sql包会在执行前对占位符参数个数进行检查
NumInput() int
// 执行DML/DDL语句,比如INSERT/UPDATE..(不需要返回查询结果的请求)
Exec(args []Value) (Result, error)
// 执行查询,获得查询数据集
Query(args []Value) (Rows, error)
}

对于Conn类型,标准库明确了这是一个有状态的连接,Prepare()方法会显式绑定一个连接去执行SQL操作,这期间不能干别的事情,这对后面理解连接池至关重要!并发操作的协程安全是在sql包中控制的。

另外为了通用,绝大部分DB类型转换都在database包内进行,第三方驱动往往只要处理极个别的类型,以避免出现精度缺失/截断,所以我们便在driver包中看到了这类接口/函数:

1
2
3
4
5
6
7
8
9
10
11
// ValueConverter是用于类型转换的接口
// 其提供ConvertValue方法用于将值转换为Value类型
type ValueConverter interface {
// 把v转换成Value类型
ConvertValue(v interface{}) (Value, error)
}
// Valuer提供Value方法,用于其实例转换自身为一个Value类型值
type Valuer interface {
Value() (Value, error)
}

这个ValueConverter在driver包中预定义了多种实现,用于保持不同驱动之间一致的值类型转换,一般有如下几个使用场景:

  1. 在sql包中将一个Value类型值转换成合适的DB字段类型,确保字段精度不丢失 – 比如确保int64变量可以存放于一个uint16类型的DB字段中
  2. DB类型转为Value类型
  3. 在sql包的scan方法中,将Value值转换为用于用户自定义的变量

连接池

新手常见的误区便是把sql.DB对象当作一个DB连接,实际上确实对应一个连接池,创建的时候也不会立即对DB进行建链;来看下sql.DB的定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
// 连接池对象
type DB struct {
driver driver.Driver // 引用driver的实现
dsn string // database source name
numClosed uint64 // 连接池中closed状态连接的数目
mu sync.Mutex // mutex锁,保护如下对象
freeConn []*driverConn // 保存DB连接的数组
// 保存请求的数组
// 新请求过来会append,并尝试从freeConn中拿到空闲连接来执行
// 连接结束后对应小表的connRequest会被关闭,对应的连接会释放
connRequests []chan connRequest
numOpen int // 活跃/opend状态的连接数
// 这个chan用于告诉sql.DB需要一个新连接了
// sql.DB创建后会有一个后台的协程通过connectionOpener()函数来获取这个chan来分配连接
// 会在db.Close()后关闭,并广播给connectionOpener退出
openerCh chan struct{}
closed bool // 关闭标识
dep map[finalCloser]depSet
lastPut map[*driverConn]string // stacktrace of last conn's put; debug only
maxIdle int // 最大空闲连接数,默认和负数都为0
maxOpen int // 最大连接数,<= 0表示不限制
maxLifetime time.Duration // 复用空闲连接的时间,超时便是过期连接,需要被释放
cleanerCh chan struct{} // 和openerCh原理类似,用于释放对应的连接和依赖
}
// 对driver.Conn实现的二次封装
// 通过mutex来控制使用这个连接的操作,比如Tx、Stmt、Result、Rows
// 所以rows这种操作用完一定要Close,不然会导致连接不被释放,连接数增长...
type driverConn struct {
db *DB // 引用DB,用来在连接失效/释放的时候告知sql.DB更新连接池记录
createdAt time.Time // 创建时间
sync.Mutex // mutex锁,保护下列对象
ci driver.Conn
closed bool // 连接是否被关闭
finalClosed bool // ci是不是失效了,如果ci.Close执行了,它的状态就是false
openStmt map[driver.Stmt]bool
inUse bool // 是否被使用
onPut []func() // 连接被put到连接池的时候执行的操作,一般用来关闭已有的Stmt
dbmuClosed bool // 用于removeClosedStmtLocked,一个连接上所有的Stmt都是closed状态,它就为true
}

sql.go代码中绝大部分的篇幅都在说明如何控制连接池。sql.DB在实现上协程安全,多个Goroutine引用和执行会自行从连接池中复用空闲连接、释放无效/过期连接,同时进行异常重试,很方便的设定,不过使用的时候要注意:

1. 设置连接池的大小

连接池通过三个属性进行设置,分别为maxIdle(最大空闲连接)、maxOpen(最大连接数)、maxLifetime(连接过期时间),要注意他们的默认值:

  • maxIdle:默认值为2,如果通过SetMaxIdleConns设置<=0的值的话,都表示对DB不保持空闲连接,也就是短链接,执行后立即释放;
  • maxOpen:默认为0,即无限,因为DB的连接资源有限,在突发大量DB访问或者分布式应用部署的时候很容易出现DB连接打满的情况,这种会造成访问拒绝,如果我们设置了上限,那么程序会在没有可用连接的时候等待连接池释放空闲连接在执行后续的请求,出于对DB的保护,推荐设置这个参数。
  • maxLifetime:连接池中空闲连接的过期时间,默认为0,表示不会过期;

2. 连接的分配策略

连接池中的连接可能会因为网络抖动、DB端超时而失效,那下次从连接池中拿到这个driverConn的时候不能用怎么办?sql包怎么处理这种场景呢?

这里就涉及到sql包复用连接的策略:

1
2
3
4
5
6
7
8
9
10
11
12
// 连接复用策略.
type connReuseStrategy uint8
const (
// 强制开启一个新连接.
alwaysNewConn connReuseStrategy = iota
// 从连接池中获得空闲连接使用,分两种情况
// 1,达到MaxOpenConns上限,等待..
// 2,没有空闲连接,向driver中申请一个新的
cachedOrNewConn
)

通过以上的两种策略,sql包始终可以控制对一个失效连接(ErrBadConn)的重试,比如Exec的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func (db *DB) Exec(query string, args ...interface{}) (Result, error) {
var res Result
var err error
for i := 0; i < maxBadConnRetries; i++ {
res, err = db.exec(query, args, cachedOrNewConn)
if err != driver.ErrBadConn {
break
}
}
if err == driver.ErrBadConn {
return db.exec(query, args, alwaysNewConn)
}
return res, err
}

所以这种DB操作是可以重试的….

3. 及时释放连接

从driverConn的定义中可以看到,Tx、Stmt、Result、Rows这类操作是绑定连接的,如果使用完成后不关闭,会导致连接池中对应连接一直处于活跃状态,这种情况非常糟糕,比如下面情况:

  • 没有设置连接池上限,连接池资源不断分配新连接,DB连接打满…
  • 设置并达到连接上限,新sql.DB操作的调用一直处于等待空闲连接,请求被无限挂起

我会在后面举例说明Tx、Stmt的几个陷阱。

4. 丢失连接属性

永远不要假设你的SQL操作都是在一个DB连接上执行的;除非你使用Tx、Stmt,否则在使用sql.DB的时候很容易遇到一些极其隐蔽的异常,比如使用USE语句切换database,或者set字符集读取一个非UTF8的表等,这种操作只对你从连接池中取到的那个连接有效,并非全局设置,由于在sql.DB上执行了这些操作,连接在回到连接池后,有着和其他连接不相同的属性,很有可能你下次操作拿到的连接就是一个默认的database/字符集。

这也是不能在sql.DB上显示使用BEGIN和COMMIT语句的原因。

使用Prepared Statements

通常一个prepared statment(后面用PS代替)的执行过程为:获得一个DB连接并绑定,发送一个带占位符的statement给数据库进行编译并返回statement ID给前端,最后将参数、statement ID发送给DB执行后拿到结果;使用PS的理由多种多样,安全、性能、代码通用性等等。

大多数场景下都推荐使用PS,但也要注意一下潜在的问题,:

1. 并发Stmt导致连接突增

在使用sql.DB的时候一定要记住这是一个连接池,我们只能通过Tx、DB这种的对象去使用PS,而不能单独指定一个连接去执行。

在sql包中,PS的执行过程如下:

  1. 在连接池中绑定到一个空闲连接,执行prepare操作,返回Stmt对象(Stmt可以记住对应的连接)
  2. 执行时,Stmt尝试使用这个连接,但发现它已经失效或者出于busy状态时,会重复第一步,重新使用一个连接prepare

如果是高并发场景,Stmt会频繁使用新连接去re-perpare,这种即会出现连接资源突增,也会不可避免的造成DB端statement大量占用,某些数据库会触发阈值保护导致请求失败;

其实关键还是在于PS是绑定连接执行的,实际工作环境中,很多DB后端都使用了中间层,那么这种PS请求就是一条绑定了client->proxy->DB的一条链(甚至更长);有时候我们不在乎prepare这点性能影响,一些SQL的防控也可以在代码中控制或者由proxy拦截,可是又希望尽可能的复用各个层面的连接池(client、proxy…),那么这时候就可以少用Stmt。

其实这样的场景还挺多的…

2. 占位符区别

SQL语句的占位符是在驱动包中定义的!不同数据库的不一样,比如:

1
2
3
4
MySQL PostgreSQL Oracle
===== ========== ======
WHERE col = ? WHERE col = $1 WHERE col = :col
VALUES(?, ?, ?) VALUES($1, $2, $3) VALUES(:val1, :val2, :val3)

可以看到在MySQL,PG,Oracle中都不一样,所以想一份代码搞定通用的DB操作这种想法是行不通的,一些DB比如Sphinx都没有PS语句(不支持“binary” protocol),你的应用如果要访问不同类型的DB,得好好想想…

另外像db.Query、Exec这样的操作,如果是写了占位符的传参,比如:db.Query(“insert into t values (?,?)”, param1, param2),也会默认使用PS,防不胜防…

3. 事务中使用PS

PS和Tx操作都是绑定连接执行的,如果要在事务中执行PS,需要确保其在Tx的内部进行Prepare,而且这个Stmt不能在事务外部使用。

那事务外prepare的Stmt能不能在Tx内使用呢?翻了下标准库后发现….居然可以!

通过Tx.Stmt方法就可以做到,来看示例:

1
2
3
4
5
updateMoney, err := db.Prepare("UPDATE balance SET money=money+? WHERE id=?")
...
tx, err := db.Begin()
...
res, err := tx.Stmt(updateMoney).Exec(123.45, 98293203)

不过这个地方强烈不建议使用它,因为在Tx中使用外部的Stmt在实际执行时,Go会创建一个事务级别的Stmt,它会把每次执行的语句在这个事务的连接上re-prepare,所以效果和在内部使用Stmt没有区别,反而增加了re-prepare的消耗;Go官方自己也意识到了这个实现不妙,所以写了TODO打算修复来着:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
func (tx *Tx) Stmt(stmt *Stmt) *Stmt {
// TODO(bradfitz): optimize this. Currently this re-prepares
// each time. This is fine for now to illustrate the API but
// we should really cache already-prepared statements
// per-Conn. See also the big comment in Tx.Prepare.
if tx.db != stmt.db {
return &Stmt{stickyErr: errors.New("sql: Tx.Stmt: statement from different database used")}
}
dc, err := tx.grabConn()
if err != nil {
return &Stmt{stickyErr: err}
}
dc.Lock()
si, err := dc.ci.Prepare(stmt.query)
dc.Unlock()
txs := &Stmt{
db: tx.db,
tx: tx,
txsi: &driverStmt{
Locker: dc,
si: si,
},
query: stmt.query,
stickyErr: err,
}
tx.stmts.Lock()
tx.stmts.v = append(tx.stmts.v, txs)
tx.stmts.Unlock()
return txs
}

所以慎用…..

另外一个关于在事务中使用的PS的陷阱就是要注意defer的作用域,老生常谈的坑,看个错误示范

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
tx, err := db.Begin()
if err != nil {
log.Fatal(err)
}
defer tx.Rollback()
stmt, err := tx.Prepare("INSERT INTO foo VALUES (?)")
if err != nil {
log.Fatal(err)
}
defer stmt.Close() // 错误示范,这里defer的作用域为func!
for i := 0; i < 10; i++ {
_, err = stmt.Exec(i)
if err != nil {
log.Fatal(err)
}
}
err = tx.Commit()
if err != nil {
log.Fatal(err)
}
// func结束才会执行stmt.Close()!

上面的操作实际是Tx提交后,连接就释放回连接池了,但是defer由于延迟执行,在func作用域之后才会在释放这个连接上的Stmt依赖,这种会造成连接池中的空闲连接的状态不一致,如果是使用的Go 1.4之前的版本,要注意把stmt.Close()放到Tx内,比如去掉defer或者加个匿名的func。

当然官方在之后的版本修复了这个问题,见CR 131650043

使用事务Tx

如果你用了database/sql包,并且希望某些操作只在一个DB连接上执行,用Tx是你唯一的选择。

关于Tx的建议无非是遵从它的标准方法,基本在PS环节都有提及,比如用Begin()、Commit()、RollBack()来替代显式的事务执行语句,事务内使用PS。

1. Multiple Statement支持

database/sql包没有明确表示支持Multiple Statement,所以一个Multiple Statement操作会由driver的实现来自行决定执行方式,结果也就难以预料,比如下面这个语句很可能由driver返回一个error,或者只执行了部分语句:

1
_, err := db.Exec("DELETE FROM tbl1; DELETE FROM tbl2")

也不要在Tx中这么搞,牢记事务中每个SQL操作必须串行执行,这个得由开发者自己控制;同时由于绑定单个连接,对于查询的返回结果Rows这样的对象,必须被scanned/closed后Tx才能被下一个查询使用。

举例一个常见的使用场景:“我要从一个表中获得所有的标识列(id),然后去另一张表里面引用这些id去做查询/更新”,下面这个例子是可行的:

1
2
3
4
5
6
rows, err := db.Query("select * from tbl1") // connection 1
for rows.Next() {
err = rows.Scan(&myvariable)
// 下面的语句并不会使用use connection 1, 因为已被Tx占用
db.Query("select * from tbl2 where id = ?", myvariable)
}

但是如果我们把查询2替换成Tx查询就不行了,错误示例

1
2
3
4
5
6
7
tx, err := db.Begin()
rows, err := tx.Query("select * from tbl1") // tx's connection
for rows.Next() {
err = rows.Scan(&myvariable)
// 错误! 连接被rows使用中
tx.Query("select * from tbl2 where id = ?", myvariable)
}

因为rows绑定了这个连接上的操作,再次执行查询2是不能继续复用这个连接的,甚至不能执行,必须等rows关闭;第二个错误的例子会收到由driver返回的错误,以MySQL为例:

1
2
[MySQL] 2016/06/20 00:01:51 packets.go:384: Busy buffer
query id 2 failed: driver: bad connection

这种场景要严格串行化对Tx的使用,或者使用连表查询的方式代替。

2. 并发场景下的Tx

紧接着上面的串行操作,Tx在使用上并非和sql.DB一样是协程安全的,这个和sql包的实现有关,Tx的SQL操作是在绑定的driverConn上执行的,虽然有原语控制driverConn的调用,但是rows这样的操作不结束,还是会出现上面一样的bad connection错误;没有rows这种依赖的就不会,比如Tx.Exec()操作就是协程安全的。

所以想做到在tx上并发一堆查询(Query)就要自己用mutex控制对Tx的串行化调用,并保证rows关闭后开始下一个操作;不过这种方式不推荐,存在实现难度…

试想这样的场景:为了提高程序的执行效率,我将计算和存储工作都放在了并发的goroutine中,每个并发的Goroutine都要查询和更新DB中的数据,于此同时为了保证整个操作的版本一致性,所有goroutine中我使用了一个Tx来保证,代码如下(忽略error处理):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
tx, _ := db.Begin() // ignore errs
// ...
defer tx.Rollback()
for i := 0; i < 10; i++ {
go func(id int) {
SomeWork()
rows, _ := tx.Query("select xxx")
for rows.Next() {
// ....
}
rows.Close()
SomeWork2()
_, _ := tx.Exec("insert into xxx")
}(i)
}
tx.Commit()

需求合情合理,但是这么做就会遭遇提到的busy buffer问题,怎么办?

我的建议是把DB看成一个单纯的存储,尽量在程序的计算完成后,对DB做一次持久化,而不是把一堆逻辑放在一起,这种还容易产生死锁。

可以按照下面的方法改写:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// query data for workFuncs()
var ids []int
rows, _ := tx.Query("select id from xxx")
for rows.Next() {
var id int
rows.Scan(&id)
ids := append(ids, id)
}
rows.Close()
// do concurrent works
for i := 0; i < 10; i++ {
go func(id []int) {
info := SomeWork(id)
results <- info
}(ids)
}
// update data to db
for info := range results {
tx.Exec("insert into xxx values (?)", info.X)
}
tx.Commit()

避免并发的Tx调用。

总结

以上就是能想到的关于database/sql包的注意点,官方的说明文档中还有一些坑这里都没有提及,因为很少遇到,比如存储过程和uint64数值,如果有这种需求,不妨多阅读一下doc。

尽管如此标准库还是很好用的,这部分我建议直接阅读源码,实现并不复杂,较多的篇幅都是在介绍连接池的控制,多翻翻基本就可以了解,不过重要的还是实际场景多实践,配合用例理解才能更深刻。