Skip to content

Commit

Permalink
Improve list query
Browse files Browse the repository at this point in the history
  • Loading branch information
marco6 committed Aug 1, 2024
1 parent a4fe336 commit 2e6e096
Show file tree
Hide file tree
Showing 5 changed files with 120 additions and 88 deletions.
141 changes: 77 additions & 64 deletions pkg/kine/drivers/generic/generic.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,56 +62,56 @@ var (
columns = "kv.id as theid, kv.name, kv.created, kv.deleted, kv.create_revision, kv.prev_revision, kv.lease, kv.value, kv.old_value"

revSQL = `
SELECT MAX(rkv.id) AS id
FROM kine AS rkv`

SELECT MAX(id) AS id
FROM kine`

// listSQL query looks for the latest version of every row in
// the range and returns all columns from it.
// The search for the "latest id" (table `maxkv` in the query)
// can be carried on quickly with a covering index (kine_name_index).
// The `deleted <= ?` is used to select deleted rows:
// - when the argument is 0 (false), the only rows selected are
// those with deleted = 0 (i.e. alive)
// - when the argument is 1 (true), all rows will be selected,
// including deleted ones.
// Unfortunately, using a normal JOIN operation will confuse
// SQLite planner and insert a SORT temp table at the end of
// the plan, forcing SQLite to load and sort the entire set
// before returning it (and making the cost of a paginated
// query very high) and returning an unsorted set would make
// pagination impossible.
// To workaround this silly misplan, a ORDER by in the first
// table forces ordering of `maxkv` (without paying for it
// as it is the same order as the index) and CROSS JOIN is
// used as it forces SQLite to keep the outer-loop order
// when joining tables. See https://www.sqlite.org/optoverview.html#crossjoin
// for more details.
listSQL = fmt.Sprintf(`
SELECT %s
FROM kine kv
JOIN (
SELECT MAX(mkv.id) as id
FROM kine mkv
WITH maxkv AS (
SELECT MAX(id) AS id
FROM kine
WHERE
mkv.name >= ? AND mkv.name < ?
name >= ? AND name < ?
%%s
GROUP BY mkv.name) maxkv
GROUP BY name
HAVING deleted <= ?
ORDER BY name
)
SELECT %s
FROM maxkv CROSS JOIN kine kv
ON maxkv.id = kv.id
WHERE
(kv.deleted = 0 OR ?)
ORDER BY kv.name ASC, kv.id ASC
`, columns)

revisionAfterSQL = fmt.Sprintf(`
SELECT *
FROM (
SELECT %s
FROM kine AS kv
JOIN (
SELECT MAX(mkv.id) AS id
FROM kine AS mkv
WHERE mkv.name >= ? AND mkv.name < ?
AND mkv.id <= ?
GROUP BY mkv.name
) AS maxkv
ON maxkv.id = kv.id
WHERE
? OR kv.deleted = 0
) AS lkv
ORDER BY lkv.name ASC, lkv.theid ASC
`, columns)

revisionIntervalSQL = `
SELECT (
SELECT crkv.prev_revision
FROM kine AS crkv
WHERE crkv.name = 'compact_rev_key'
SELECT prev_revision
FROM kine
WHERE name = 'compact_rev_key'
ORDER BY prev_revision
DESC LIMIT 1
) AS low, (
SELECT id
SELECT MAX(id)
FROM kine
ORDER BY id
DESC LIMIT 1
) AS high`
)

Expand All @@ -138,7 +138,6 @@ type Generic struct {
GetRevisionSQL string
RevisionSQL string
ListRevisionStartSQL string
GetRevisionAfterSQL string
CountCurrentSQL string
CountRevisionSQL string
AfterSQLPrefix string
Expand Down Expand Up @@ -224,40 +223,54 @@ func Open(ctx context.Context, driverName, dataSourceName string, paramCharacter
DB: prepared.New(db),

GetRevisionSQL: q(fmt.Sprintf(`
SELECT
%s
FROM kine kv
WHERE kv.id = ?`, columns), paramCharacter, numbered),
SELECT %s
FROM kine AS kv
WHERE id = ?`, columns), paramCharacter, numbered),

GetCurrentSQL: q(fmt.Sprintf(listSQL, ""), paramCharacter, numbered),
ListRevisionStartSQL: q(fmt.Sprintf(listSQL, "AND mkv.id <= ?"), paramCharacter, numbered),
GetRevisionAfterSQL: q(revisionAfterSQL, paramCharacter, numbered),
ListRevisionStartSQL: q(fmt.Sprintf(listSQL, "AND id <= ?"), paramCharacter, numbered),

CountCurrentSQL: q(fmt.Sprintf(`
SELECT (%s), COUNT(*)
CountCurrentSQL: q(`
SELECT (
SELECT COALESCE(MAX(id), 0) AS id
FROM kine
), COUNT(*)
FROM (
%s
) c`, revSQL, fmt.Sprintf(listSQL, "")), paramCharacter, numbered),

CountRevisionSQL: q(fmt.Sprintf(`
SELECT (%s), COUNT(c.theid)
SELECT MAX(id) AS id
FROM kine
WHERE
name >= ? AND name < ?
GROUP BY name
HAVING deleted = 0
) c`, paramCharacter, numbered),

CountRevisionSQL: q(`
SELECT (
SELECT COALESCE(MAX(id), 0) AS id
FROM kine
), COUNT(*)
FROM (
%s
) c`, revSQL, fmt.Sprintf(listSQL, "AND mkv.id <= ?")), paramCharacter, numbered),
SELECT MAX(id) AS id
FROM kine
WHERE
name >= ? AND name < ?
AND id <= ?
GROUP BY name
HAVING deleted = 0
) c`, paramCharacter, numbered),

AfterSQLPrefix: q(fmt.Sprintf(`
SELECT %s
FROM kine AS kv
WHERE
kv.name >= ? AND kv.name < ?
AND kv.id > ?
ORDER BY kv.id ASC`, columns), paramCharacter, numbered),
WHERE name >= ? AND name < ?
AND id > ?
ORDER BY id ASC`, columns), paramCharacter, numbered),

AfterSQL: q(fmt.Sprintf(`
SELECT %s
FROM kine AS kv
WHERE kv.id > ?
ORDER BY kv.id ASC
FROM kine AS kv
WHERE id > ?
ORDER BY id ASC
`, columns), paramCharacter, numbered),

DeleteSQL: q(`
Expand Down Expand Up @@ -409,7 +422,7 @@ func (d *Generic) Count(ctx context.Context, prefix, startKey string, revision i
if startKey != "" {
start = startKey + "\x01"
}
rows, err := d.query(ctx, "count_revision", d.CountRevisionSQL, start, end, revision, false)
rows, err := d.query(ctx, "count_revision", d.CountRevisionSQL, start, end, revision)
if err != nil {
return 0, 0, err
}
Expand Down Expand Up @@ -538,12 +551,12 @@ func (d *Generic) List(ctx context.Context, prefix, startKey string, limit, revi
return d.query(ctx, "list_revision_start_sql", sql, start, end, revision, includeDeleted)
}

sql := d.GetRevisionAfterSQL
sql := d.ListRevisionStartSQL
if limit > 0 {
sql = fmt.Sprintf("%s LIMIT ?", sql)
return d.query(ctx, "get_revision_after_sql_limit", sql, startKey+"\x01", end, revision, includeDeleted, limit)
}
return d.query(ctx, "get_revision_after_sql", sql, startKey+"\x01", end, revision, includeDeleted)
return d.query(ctx, "list_revision_start_sql", sql, start, end, revision, includeDeleted)
}

func (d *Generic) CurrentRevision(ctx context.Context) (int64, error) {
Expand Down
15 changes: 14 additions & 1 deletion pkg/kine/drivers/sqlite/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
type SchemaVersion int32

var (
databaseSchemaVersion = NewSchemaVersion(0, 1)
databaseSchemaVersion = NewSchemaVersion(0, 2)
)

func NewSchemaVersion(major int16, minor int16) SchemaVersion {
Expand Down Expand Up @@ -89,6 +89,19 @@ CREATE TABLE kine
return nil
}

// applySchemaV0_2 moves the schema from version 1 to version 2
func applySchemaV0_2(ctx context.Context, txn *sql.Tx) error {
if _, err := txn.ExecContext(ctx, `DROP INDEX kine_name_index`); err != nil {
return err
}

if _, err := txn.ExecContext(ctx, `CREATE UNIQUE INDEX kine_name_index ON kine(name, id, deleted)`); err != nil {
return err
}

return nil
}

// hasTable checks if a table exists.
func hasTable(ctx context.Context, txn *sql.Tx, tableName string) (bool, error) {
// FIXME: why we can't use `pragma_table_list()`? Is dqlite/sqlite using
Expand Down
5 changes: 5 additions & 0 deletions pkg/kine/drivers/sqlite/sqlite.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,11 @@ func migrate(ctx context.Context, txn *sql.Tx) error {
if err := applySchemaV0_1(ctx, txn); err != nil {
return err
}
fallthrough
case NewSchemaVersion(0, 1):
if err := applySchemaV0_2(ctx, txn); err != nil {
return err
}
default:
return nil
}
Expand Down
3 changes: 0 additions & 3 deletions test/list_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,17 +183,14 @@ func BenchmarkList(b *testing.B) {
if err := insertMany(ctx, tx, "key", payloadSize, n); err != nil {
return err
}
b.Log("insert", n)

if err := updateMany(ctx, tx, "key", payloadSize, n/2); err != nil {
return err
}
b.Log("update", n)

if err := deleteMany(ctx, tx, "key", n/2); err != nil {
return err
}
b.Log("delete", n)
return nil
}
backends := []string{endpoint.SQLiteBackend, endpoint.DQLiteBackend}
Expand Down
44 changes: 24 additions & 20 deletions test/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ func (ks *kineServer) ResetMetrics() {
}

func insertMany(ctx context.Context, tx *sql.Tx, prefix string, valueSize, n int) error {
insertManyQuery := `
const insertManyQuery = `
WITH RECURSIVE gen_id AS(
SELECT 1 AS id
Expand All @@ -211,40 +211,44 @@ FROM gen_id, revision`
}

func updateMany(ctx context.Context, tx *sql.Tx, prefix string, valueSize, n int) error {
updateManyQuery := `
const updateManyQuery = `
WITH maxkv AS (
SELECT MAX(id) AS id
FROM kine
WHERE
?||'/' <= name AND name < ?||'0'
GROUP BY name
HAVING deleted = 0
ORDER BY name
)
INSERT INTO kine(
name, created, deleted, create_revision, prev_revision, lease, value, old_value
)
SELECT kv.name, 0, 0, kv.create_revision, kv.id, 0, randomblob(?), kv.value
FROM kine AS kv
JOIN (
SELECT MAX(mkv.id) as id
FROM kine mkv
WHERE ?||'/' <= mkv.name AND mkv.name < ?||'0'
GROUP BY mkv.name
) maxkv ON maxkv.id = kv.id
WHERE kv.deleted = 0
ORDER BY kv.name
FROM maxkv CROSS JOIN kine kv
ON maxkv.id = kv.id
LIMIT ?`
_, err := tx.ExecContext(ctx, updateManyQuery, valueSize, prefix, prefix, n)
return err
}

func deleteMany(ctx context.Context, tx *sql.Tx, prefix string, n int) error {
const deleteManyQuery = `
WITH maxkv AS (
SELECT MAX(id) AS id
FROM kine
WHERE
?||'/' <= name AND name < ?||'0'
GROUP BY name
HAVING deleted = 0
ORDER BY name
)
INSERT INTO kine(
name, created, deleted, create_revision, prev_revision, lease, value, old_value
)
SELECT kv.name, 0, 1, kv.create_revision, kv.id, 0, kv.value, kv.value
FROM kine AS kv
JOIN (
SELECT MAX(mkv.id) as id
FROM kine mkv
WHERE ?||'/' <= mkv.name AND mkv.name < ?||'0'
GROUP BY mkv.name
) maxkv ON maxkv.id = kv.id
WHERE kv.deleted = 0
ORDER BY kv.name
FROM maxkv CROSS JOIN kine kv
ON maxkv.id = kv.id
LIMIT ?`
_, err := tx.ExecContext(ctx, deleteManyQuery, prefix, prefix, n)
return err
Expand Down

0 comments on commit 2e6e096

Please sign in to comment.