Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement zlib compression #1487

Draft
wants to merge 81 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 77 commits
Commits
Show all changes
81 commits
Select commit Hold shift + click to select a range
e6c682c
packets: implemented compression protocol
Jul 31, 2017
77f6792
packets: implemented compression protocol CR changes
Aug 16, 2017
a0cf94b
packets: implemented compression protocol: remove bytes.Reset for bac…
Aug 16, 2017
4cdff28
Merge branch 'master' of https://github.com/go-sql-driver/mysql
Oct 8, 2017
d0ea1a4
reading working
Aug 18, 2017
477c9f8
writerly changes
Aug 18, 2017
996ed2d
PR 649: adding compression (second code review)
Oct 8, 2017
f74faed
do not query max_allowed_packet by default (#680)
julienschmidt Oct 12, 2017
b3a093e
packets: do not call function on nulled value (#678)
julienschmidt Oct 16, 2017
5eaa5ff
ColumnType interfaces (#667)
julienschmidt Oct 17, 2017
ee46028
Add Aurora errno to rejectReadOnly check (#634)
jeffcharles Oct 17, 2017
93aed73
allow successful TravisCI runs in forks (#639)
jmhodges Oct 17, 2017
4f10ee5
Drop support for Go 1.6 and lower (#696)
julienschmidt Nov 12, 2017
59b0f90
Make gofmt happy (#704)
julienschmidt Nov 14, 2017
3fbf53a
Added support for custom string types in ConvertValue. (#623)
dsmontoya Nov 15, 2017
f9c6a2c
Implement NamedValueChecker for mysqlConn (#690)
pushrax Nov 16, 2017
6046bf0
Fix Valuers by returning driver.ErrSkip if couldn't convert type inte…
randomjunk Nov 16, 2017
385673a
statement: Fix conversion of Valuer (#710)
linxGnu Nov 17, 2017
9031984
Fixed imports for appengine/cloudsql (#700)
rrbrussell Nov 17, 2017
6992fad
Fix tls=true didn't work with host without port (#718)
methane Dec 4, 2017
386f84b
Differentiate between BINARY and CHAR (#724)
kwoodhouse93 Jan 10, 2018
f853432
Test with latest Go patch versions (#693)
AlekSi Jan 10, 2018
d1a8b86
Fix prepared statement (#734)
methane Jan 13, 2018
3167920
driver.ErrBadConn when init packet read fails (#736)
Jan 25, 2018
fb33a2c
packets: implemented compression protocol
Jul 31, 2017
f174605
packets: implemented compression protocol CR changes
Aug 16, 2017
dbd1e2b
third code review changes
Mar 23, 2018
3e12e32
PR 649: minor cleanup
Mar 23, 2018
17a06f1
Merge branch 'master' into master
methane Mar 26, 2018
60bdaec
Sort AUTHORS
methane Mar 26, 2018
422ab6f
Update dsn.go
methane Mar 26, 2018
ee2a1c7
Merge remote-tracking branch 'upstream/master' into compression
Oct 4, 2023
1f38652
Please linter.
Oct 4, 2023
d43864e
Formatting.
Oct 4, 2023
1c2ac70
Unexport constructors.
Oct 5, 2023
944e638
Fix tests.
Oct 5, 2023
15017fc
Update AUTHORS.
Oct 5, 2023
f400590
Formatting.
Oct 5, 2023
084dafb
Update README feature list.
Oct 5, 2023
ee87a7d
Fix TLS.
Oct 6, 2023
b8cfe77
Formatting.
Oct 11, 2023
d2501ec
Tidy up.
Oct 11, 2023
59c3cf1
Fix compression negotiations.
Oct 11, 2023
d1aef08
Format README.
Oct 12, 2023
09a4fb8
Add usage instructions to README.
Oct 12, 2023
e523af2
Add minCompressLength param.
Oct 12, 2023
efbc53b
Fix non-compression of small packets.
Oct 12, 2023
ac5cb7a
Merge remote-tracking branch 'upstream/master' into compression
Oct 12, 2023
7610823
Rename fields for clarity.
Oct 12, 2023
eb449fa
Simplify compressedWriter.Write.
Oct 12, 2023
75c2480
Disable compression by default.
Oct 13, 2023
6a38735
Merge remote-tracking branch 'upstream/master' into compression
Oct 27, 2023
8b8b428
Fix bytes.NewBuffer usage.
Oct 27, 2023
bc3ad68
Revert README formatting.
Nov 2, 2023
b6d9883
Update README with compression usage.
Nov 2, 2023
850c83f
Merge remote-tracking branch 'upstream/master' into compression
methane Mar 11, 2024
5ec621c
simplify
methane Mar 11, 2024
3d0d418
change minCompressLength to 150
methane Mar 11, 2024
31b8b38
fixup
methane Mar 11, 2024
9f797b1
remove unnecessary test
methane Mar 11, 2024
d7ed578
code cleanup and minor improvements
methane Mar 13, 2024
0f9ec9f
remove test depends on compressed output
methane Mar 13, 2024
a64171f
cleanup
methane Mar 14, 2024
d78cdf8
fix test
methane Mar 14, 2024
eb42024
fix sync error
methane Mar 14, 2024
679cc53
fix sync error again
methane Mar 14, 2024
876af07
fix todo
methane Mar 14, 2024
d5ad92e
merge compressedReader and compressedWriter
methane Mar 15, 2024
1c05916
use sync.Pool for zlib
methane Mar 15, 2024
39e52e4
cleanup
methane Mar 15, 2024
0e3ace3
code cleanup
methane Mar 15, 2024
750fe2a
fix typo
methane Mar 15, 2024
0512769
move const flag
methane Mar 15, 2024
60ce788
remove writer from compressor
methane Mar 15, 2024
ee70acf
remove packetWriter and simplify tests
methane Mar 15, 2024
1e78561
run tests with compression
methane Mar 15, 2024
77d86ec
fix tests
methane Mar 15, 2024
e1dc557
wip
methane Mar 16, 2024
406bce2
Merge branch 'master' into compression
methane Mar 24, 2024
e9f5b24
fix some errors
methane Mar 25, 2024
243b3df
Merge branch 'master' into compression
methane May 1, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ jobs:
my-cnf: |
innodb_log_file_size=256MB
innodb_buffer_pool_size=512MB
max_allowed_packet=16MB
max_allowed_packet=48MB
; TestConcurrent fails if max_connections is too large
max_connections=50
local_infile=1
Expand Down
2 changes: 2 additions & 0 deletions AUTHORS
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ Animesh Ray <mail.rayanimesh at gmail.com>
Arne Hormann <arnehormann at gmail.com>
Ariel Mashraki <ariel at mashraki.co.il>
Asta Xie <xiemengjun at gmail.com>
B Lamarche <blam413 at gmail.com>
Brian Hendriks <brian at dolthub.com>
Bulat Gaifullin <gaifullinbf at gmail.com>
Caine Jette <jette at alum.mit.edu>
Expand Down Expand Up @@ -60,6 +61,7 @@ Jennifer Purevsuren <jennifer at dolthub.com>
Jerome Meyer <jxmeyer at gmail.com>
Jiajia Zhong <zhong2plus at gmail.com>
Jian Zhen <zhenjl at gmail.com>
Joe Mann <contact at joemann.co.uk>
Joshua Prunier <joshua.prunier at gmail.com>
Julien Lefevre <julien.lefevr at gmail.com>
Julien Schmidt <go-sql-driver at julienschmidt.com>
Expand Down
20 changes: 20 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ A MySQL-Driver for Go's [database/sql](https://golang.org/pkg/database/sql/) pac
* Secure `LOAD DATA LOCAL INFILE` support with file allowlisting and `io.Reader` support
* Optional `time.Time` parsing
* Optional placeholder interpolation
* Supports zlib compression.

## Requirements

Expand Down Expand Up @@ -267,6 +268,16 @@ SELECT u.id FROM users as u

will return `u.id` instead of just `id` if `columnsWithAlias=true`.

##### `compress`

```
Type: bool
Valid Values: true, false
Default: false
```

Toggles zlib compression. false by default.
methane marked this conversation as resolved.
Show resolved Hide resolved

##### `interpolateParams`

```
Expand Down Expand Up @@ -310,6 +321,15 @@ Default: 64*1024*1024

Max packet size allowed in bytes. The default value is 64 MiB and should be adjusted to match the server settings. `maxAllowedPacket=0` can be used to automatically fetch the `max_allowed_packet` variable from server *on every connection*.

##### `minCompressLength`

```
Type: decimal number
Default: 50
```

Min packet size in bytes to compress, when compression is enabled (see the `compress` parameter). Packets smaller than this will be sent uncompressed.

##### `multiStatements`

```
Expand Down
27 changes: 20 additions & 7 deletions benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,13 @@ func (tb *TB) checkStmt(stmt *sql.Stmt, err error) *sql.Stmt {
return stmt
}

func initDB(b *testing.B, queries ...string) *sql.DB {
func initDB(b *testing.B, useCompression bool, queries ...string) *sql.DB {
tb := (*TB)(b)
db := tb.checkDB(sql.Open(driverNameTest, dsn))
comprStr := ""
if useCompression {
comprStr = "&compress=1"
}
db := tb.checkDB(sql.Open(driverNameTest, dsn+comprStr))
for _, query := range queries {
if _, err := db.Exec(query); err != nil {
b.Fatalf("error on %q: %v", query, err)
Expand All @@ -60,10 +64,18 @@ func initDB(b *testing.B, queries ...string) *sql.DB {
const concurrencyLevel = 10

func BenchmarkQuery(b *testing.B) {
benchmarkQueryHelper(b, false)
}

func BenchmarkQueryCompression(b *testing.B) {
benchmarkQueryHelper(b, true)
}

func benchmarkQueryHelper(b *testing.B, compr bool) {
tb := (*TB)(b)
b.StopTimer()
b.ReportAllocs()
db := initDB(b,
db := initDB(b, compr,
"DROP TABLE IF EXISTS foo",
"CREATE TABLE foo (id INT PRIMARY KEY, val CHAR(50))",
`INSERT INTO foo VALUES (1, "one")`,
Expand Down Expand Up @@ -224,6 +236,7 @@ func BenchmarkInterpolation(b *testing.B) {
maxWriteSize: maxPacketSize - 1,
buf: newBuffer(nil),
}
mc.packetReader = &mc.buf

args := []driver.Value{
int64(42424242),
Expand Down Expand Up @@ -269,7 +282,7 @@ func benchmarkQueryContext(b *testing.B, db *sql.DB, p int) {
}

func BenchmarkQueryContext(b *testing.B) {
db := initDB(b,
db := initDB(b, false,
"DROP TABLE IF EXISTS foo",
"CREATE TABLE foo (id INT PRIMARY KEY, val CHAR(50))",
`INSERT INTO foo VALUES (1, "one")`,
Expand Down Expand Up @@ -305,7 +318,7 @@ func benchmarkExecContext(b *testing.B, db *sql.DB, p int) {
}

func BenchmarkExecContext(b *testing.B) {
db := initDB(b,
db := initDB(b, false,
"DROP TABLE IF EXISTS foo",
"CREATE TABLE foo (id INT PRIMARY KEY, val CHAR(50))",
`INSERT INTO foo VALUES (1, "one")`,
Expand All @@ -323,7 +336,7 @@ func BenchmarkExecContext(b *testing.B) {
// "size=" means size of each blobs.
func BenchmarkQueryRawBytes(b *testing.B) {
var sizes []int = []int{100, 1000, 2000, 4000, 8000, 12000, 16000, 32000, 64000, 256000}
db := initDB(b,
db := initDB(b, false,
"DROP TABLE IF EXISTS bench_rawbytes",
"CREATE TABLE bench_rawbytes (id INT PRIMARY KEY, val LONGBLOB)",
)
Expand Down Expand Up @@ -376,7 +389,7 @@ func BenchmarkQueryRawBytes(b *testing.B) {
// BenchmarkReceiveMassiveRows measures performance of receiving large number of rows.
func BenchmarkReceiveMassiveRows(b *testing.B) {
// Setup -- prepare 10000 rows.
db := initDB(b,
db := initDB(b, false,
"DROP TABLE IF EXISTS foo",
"CREATE TABLE foo (id INT PRIMARY KEY, val TEXT)")
defer db.Close()
Expand Down
246 changes: 246 additions & 0 deletions compress.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,246 @@
// Go MySQL Driver - A MySQL-Driver for Go's database/sql package
//
// Copyright 2024 The Go-MySQL-Driver Authors. All rights reserved.
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
// You can obtain one at http://mozilla.org/MPL/2.0/.

package mysql

import (
"bytes"
"compress/zlib"
"fmt"
"io"
"sync"
)

var (
zrPool *sync.Pool // Do not use directly. Use zDecompress() instead.
zwPool *sync.Pool // Do not use directly. Use zCompress() instead.
)

func init() {
zrPool = &sync.Pool{
New: func() any { return nil },
}
zwPool = &sync.Pool{
New: func() any {
zw, err := zlib.NewWriterLevel(new(bytes.Buffer), 2)
if err != nil {
panic(err) // compress/zlib return non-nil error only if level is invalid
}
return zw
},
}
}

func zDecompress(src, dst []byte) (int, error) {
br := bytes.NewReader(src)
var zr io.ReadCloser
var err error

if a := zrPool.Get(); a == nil {
if zr, err = zlib.NewReader(br); err != nil {
return 0, err
}
} else {
zr = a.(io.ReadCloser)
if zr.(zlib.Resetter).Reset(br, nil); err != nil {
return 0, err
}
}
defer func() {
zr.Close()
zrPool.Put(zr)
}()

lenRead := 0
size := len(dst)

for lenRead < size {
n, err := zr.Read(dst[lenRead:])
lenRead += n

if err == io.EOF {
if lenRead < size {
return lenRead, io.ErrUnexpectedEOF
}
} else if err != nil {
return lenRead, err
}
}
return lenRead, nil
}

func zCompress(src []byte, dst io.Writer) error {
zw := zwPool.Get().(*zlib.Writer)
zw.Reset(dst)
if _, err := zw.Write(src); err != nil {
return err
}
zw.Close()
zwPool.Put(zw)
return nil
}

type decompressor struct {
mc *mysqlConn
// read buffer (FIFO).
// We can not reuse already-read buffer until dropping Go 1.20 support.
// It is because of database/mysql's weired behavior.
// See https://github.com/go-sql-driver/mysql/issues/1435
bytesBuf []byte
}

func newDecompressor(mc *mysqlConn) *decompressor {
return &decompressor{
mc: mc,
}
}

func (c *decompressor) readNext(need int) ([]byte, error) {
for len(c.bytesBuf) < need {
if err := c.uncompressPacket(); err != nil {
return nil, err
}
}

data := c.bytesBuf[:need:need] // prevent caller writes into r.bytesBuf
c.bytesBuf = c.bytesBuf[need:]
return data, nil
}

func (c *decompressor) uncompressPacket() error {
header, err := c.mc.buf.readNext(7) // size of compressed header
if err != nil {
return err
}

// compressed header structure
comprLength := int(uint32(header[0]) | uint32(header[1])<<8 | uint32(header[2])<<16)
uncompressedLength := int(uint32(header[4]) | uint32(header[5])<<8 | uint32(header[6])<<16)
compressionSequence := uint8(header[3])
if debugTrace {
c.mc.cfg.Logger.Print(
fmt.Sprintf("uncompress cmplen=%v uncomplen=%v pkt_cmp_seq=%v expected_cmp_seq=%v\n",
comprLength, uncompressedLength, compressionSequence, c.mc.sequence))
}
if compressionSequence != c.mc.sequence {
// return ErrPktSync
// server may return error packet (e.g. 1153 Got a packet bigger than 'max_allowed_packet' bytes)
// before receiving all packets from client. In this case, seqnr is younger than expected.
c.mc.cfg.Logger.Print(
fmt.Sprintf("[warn] unexpected cmpress seq nr: expected %v, got %v",
c.mc.sequence, compressionSequence))
}
c.mc.sequence = compressionSequence + 1
c.mc.compressSequence = c.mc.sequence

comprData, err := c.mc.buf.readNext(comprLength)
if err != nil {
return err
}

// if payload is uncompressed, its length will be specified as zero, and its
// true length is contained in comprLength
if uncompressedLength == 0 {
c.bytesBuf = append(c.bytesBuf, comprData...)
return nil
}

// use existing capacity in bytesBuf if possible
offset := len(c.bytesBuf)
if cap(c.bytesBuf)-offset < uncompressedLength {
old := c.bytesBuf
c.bytesBuf = make([]byte, offset, offset+uncompressedLength)
copy(c.bytesBuf, old)
}

lenRead, err := zDecompress(comprData, c.bytesBuf[offset:offset+uncompressedLength])
if err != nil {
return err
}
if lenRead != uncompressedLength {
return fmt.Errorf("invalid compressed packet: uncompressed length in header is %d, actual %d",
uncompressedLength, lenRead)
}
c.bytesBuf = c.bytesBuf[:offset+uncompressedLength]
return nil
}

const maxPayloadLen = maxPacketSize - 4

// writeCompressed sends one or some packets with compression.
// Use this instead of mc.netConn.Write() when mc.compress is true.
func (mc *mysqlConn) writeCompressed(packets []byte) (int, error) {
totalBytes := len(packets)
dataLen := len(packets)
blankHeader := make([]byte, 7)
var buf bytes.Buffer

for dataLen > 0 {
payloadLen := dataLen
if payloadLen > maxPayloadLen {
payloadLen = maxPayloadLen
}
payload := packets[:payloadLen]
uncompressedLen := payloadLen

if _, err := buf.Write(blankHeader); err != nil {
return 0, err
}

// If payload is less than minCompressLength, don't compress.
if uncompressedLen < minCompressLength {
if _, err := buf.Write(payload); err != nil {
return 0, err
}
uncompressedLen = 0
} else {
zCompress(payload, &buf)
}

if err := mc.writeCompressedPacket(buf.Bytes(), uncompressedLen); err != nil {
return 0, err
}
dataLen -= payloadLen
packets = packets[payloadLen:]
buf.Reset()
}

return totalBytes, nil
}

// writeCompressedPacket writes a compressed packet with header.
// data should start with 7 size space for header followed by payload.
func (mc *mysqlConn) writeCompressedPacket(data []byte, uncompressedLen int) error {
comprLength := len(data) - 7
if debugTrace {
mc.cfg.Logger.Print(
fmt.Sprintf(
"writeCompressedPacket: comprLength=%v, uncompressedLen=%v, seq=%v",
comprLength, uncompressedLen, mc.compressSequence))
}

// compression header
data[0] = byte(0xff & comprLength)
data[1] = byte(0xff & (comprLength >> 8))
data[2] = byte(0xff & (comprLength >> 16))

data[3] = mc.compressSequence

// this value is never greater than maxPayloadLength
data[4] = byte(0xff & uncompressedLen)
data[5] = byte(0xff & (uncompressedLen >> 8))
data[6] = byte(0xff & (uncompressedLen >> 16))

if _, err := mc.netConn.Write(data); err != nil {
mc.cfg.Logger.Print(err)
return err
}

mc.compressSequence++
return nil
}
Loading
Loading