Skip to content

Commit

Permalink
Remove query customization for dialects
Browse files Browse the repository at this point in the history
  • Loading branch information
marco6 committed Oct 28, 2024
1 parent f5478f7 commit c6736de
Show file tree
Hide file tree
Showing 2 changed files with 115 additions and 129 deletions.
243 changes: 115 additions & 128 deletions pkg/kine/drivers/generic/generic.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"time"

"github.com/canonical/k8s-dqlite/pkg/kine/prepared"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
Expand Down Expand Up @@ -77,7 +76,6 @@ func init() {
if err != nil {
logrus.WithError(err).Warning("Otel failed to create create counter")
}

}

var (
Expand All @@ -89,17 +87,17 @@ var (

listSQL = fmt.Sprintf(`
SELECT %s
FROM kine kv
FROM kine AS kv
JOIN (
SELECT MAX(mkv.id) as id
FROM kine mkv
FROM kine AS mkv
WHERE
mkv.name >= ? AND mkv.name < ?
AND mkv.id <= ?
GROUP BY mkv.name) maxkv
GROUP BY mkv.name
) AS maxkv
ON maxkv.id = kv.id
WHERE
(kv.deleted = 0 OR ?)
WHERE (kv.deleted = 0 OR ?)
ORDER BY kv.name ASC, kv.id ASC
`, columns)

Expand All @@ -112,6 +110,100 @@ var (
SELECT MAX(id)
FROM kine
) AS high`

listRevisionStartSQL = listSQL

countRevisionSQL = fmt.Sprintf(`
SELECT COUNT(*)
FROM (
%s
)`, listSQL)

afterSQLPrefix = fmt.Sprintf(`
SELECT %s
FROM kine AS kv
WHERE
kv.name >= ? AND kv.name < ?
AND kv.id > ?
ORDER BY kv.id ASC`, columns)

afterSQL = fmt.Sprintf(`
SELECT %s
FROM kine AS kv
WHERE kv.id > ?
ORDER BY kv.id ASC
`, columns)

deleteRevSQL = `
DELETE FROM kine
WHERE id = ?`

updateCompactSQL = `
UPDATE kine
SET prev_revision = max(prev_revision, ?)
WHERE name = 'compact_rev_key'`

deleteSQL = `
INSERT INTO kine(name, created, deleted, create_revision, prev_revision, lease, value, old_value)
SELECT
name,
0 AS created,
1 AS deleted,
CASE
WHEN kine.created THEN id
ELSE create_revision
END AS create_revision,
id AS prev_revision,
lease,
NULL AS value,
value AS old_value
FROM kine WHERE id = (SELECT MAX(id) FROM kine WHERE name = ?)
AND deleted = 0
AND id = ?`

createSQL = `
INSERT INTO kine(name, created, deleted, create_revision, prev_revision, lease, value, old_value)
SELECT
? AS name,
1 AS created,
0 AS deleted,
0 AS create_revision,
COALESCE(id, 0) AS prev_revision,
? AS lease,
? AS value,
NULL AS old_value
FROM (
SELECT MAX(id) AS id, deleted
FROM kine
WHERE name = ?
) maxkv
WHERE maxkv.deleted = 1 OR id IS NULL`

updateSQL = `
INSERT INTO kine(name, created, deleted, create_revision, prev_revision, lease, value, old_value)
SELECT
? AS name,
0 AS created,
0 AS deleted,
CASE
WHEN kine.created THEN id
ELSE create_revision
END AS create_revision,
id AS prev_revision,
? AS lease,
? AS value,
value AS old_value
FROM kine WHERE id = (SELECT MAX(id) FROM kine WHERE name = ?)
AND deleted = 0
AND id = ?`

fillSQL = `
INSERT INTO kine(id, name, created, deleted, create_revision, prev_revision, lease, value, old_value)
VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?)`

getSizeSQL = `
SELECT (page_count - freelist_count) * page_size
FROM pragma_page_count(), pragma_page_size(), pragma_freelist_count()`
)

const maxRetries = 500
Expand All @@ -130,24 +222,11 @@ type ErrCode func(error) string
type Generic struct {
sync.Mutex

LockWrites bool
DB *prepared.DB
RevisionSQL string
ListRevisionStartSQL string
CountRevisionSQL string
AfterSQLPrefix string
AfterSQL string
DeleteRevSQL string
CompactSQL string
UpdateCompactSQL string
DeleteSQL string
FillSQL string
CreateSQL string
UpdateSQL string
GetSizeSQL string
Retry ErrRetry
TranslateErr TranslateErr
ErrCode ErrCode
LockWrites bool
DB *prepared.DB
Retry ErrRetry
TranslateErr TranslateErr
ErrCode ErrCode

// CompactInterval is interval between database compactions performed by kine.
CompactInterval time.Duration
Expand Down Expand Up @@ -224,95 +303,6 @@ func Open(ctx context.Context, driverName, dataSourceName string, connPoolConfig

return &Generic{
DB: prepared.New(db),

ListRevisionStartSQL: listSQL,

CountRevisionSQL: fmt.Sprintf(`
SELECT COUNT(*)
FROM (
%s
)`, listSQL),

AfterSQLPrefix: fmt.Sprintf(`
SELECT %s
FROM kine AS kv
WHERE
kv.name >= ? AND kv.name < ?
AND kv.id > ?
ORDER BY kv.id ASC`, columns),

AfterSQL: fmt.Sprintf(`
SELECT %s
FROM kine AS kv
WHERE kv.id > ?
ORDER BY kv.id ASC
`, columns),

DeleteRevSQL: `
DELETE FROM kine
WHERE id = ?`,

UpdateCompactSQL: `
UPDATE kine
SET prev_revision = max(prev_revision, ?)
WHERE name = 'compact_rev_key'`,

DeleteSQL: `
INSERT INTO kine(name, created, deleted, create_revision, prev_revision, lease, value, old_value)
SELECT
name,
0 AS created,
1 AS deleted,
CASE
WHEN kine.created THEN id
ELSE create_revision
END AS create_revision,
id AS prev_revision,
lease,
NULL AS value,
value AS old_value
FROM kine WHERE id = (SELECT MAX(id) FROM kine WHERE name = ?)
AND deleted = 0
AND id = ?`,

CreateSQL: `
INSERT INTO kine(name, created, deleted, create_revision, prev_revision, lease, value, old_value)
SELECT
? AS name,
1 AS created,
0 AS deleted,
0 AS create_revision,
COALESCE(id, 0) AS prev_revision,
? AS lease,
? AS value,
NULL AS old_value
FROM (
SELECT MAX(id) AS id, deleted
FROM kine
WHERE name = ?
) maxkv
WHERE maxkv.deleted = 1 OR id IS NULL`,

UpdateSQL: `
INSERT INTO kine(name, created, deleted, create_revision, prev_revision, lease, value, old_value)
SELECT
? AS name,
0 AS created,
0 AS deleted,
CASE
WHEN kine.created THEN id
ELSE create_revision
END AS create_revision,
id AS prev_revision,
? AS lease,
? AS value,
value AS old_value
FROM kine WHERE id = (SELECT MAX(id) FROM kine WHERE name = ?)
AND deleted = 0
AND id = ?`,

FillSQL: `INSERT INTO kine(id, name, created, deleted, create_revision, prev_revision, lease, value, old_value)
VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?)`,
}, err
}

Expand Down Expand Up @@ -418,7 +408,7 @@ func (d *Generic) Count(ctx context.Context, prefix, startKey string, revision i
if startKey != "" {
start = startKey + "\x01"
}
rows, err := d.query(ctx, "count_revision", d.CountRevisionSQL, start, end, revision, false)
rows, err := d.query(ctx, "count_revision", countRevisionSQL, start, end, revision, false)
if err != nil {
return 0, err
}
Expand Down Expand Up @@ -457,7 +447,7 @@ func (d *Generic) Create(ctx context.Context, key string, value []byte, ttl int6
)
createCnt.Add(ctx, 1)

result, err := d.execute(ctx, "create_sql", d.CreateSQL, key, ttl, value, key)
result, err := d.execute(ctx, "create_sql", createSQL, key, ttl, value, key)
if err != nil {
logrus.WithError(err).Error("failed to create key")
return 0, false, err
Expand All @@ -484,7 +474,7 @@ func (d *Generic) Update(ctx context.Context, key string, value []byte, preRev,
}()

updateCnt.Add(ctx, 1)
result, err := d.execute(ctx, "update_sql", d.UpdateSQL, key, ttl, value, key, preRev)
result, err := d.execute(ctx, "update_sql", updateSQL, key, ttl, value, key, preRev)
if err != nil {
logrus.WithError(err).Error("failed to update key")
return 0, false, err
Expand All @@ -509,7 +499,7 @@ func (d *Generic) Delete(ctx context.Context, key string, revision int64) (rev i
}()
span.SetAttributes(attribute.String("key", key))

result, err := d.execute(ctx, "delete_sql", d.DeleteSQL, key, revision)
result, err := d.execute(ctx, "delete_sql", deleteSQL, key, revision)
if err != nil {
logrus.WithError(err).Error("failed to delete key")
return 0, false, err
Expand Down Expand Up @@ -608,7 +598,7 @@ func (d *Generic) tryCompact(ctx context.Context, start, end int64) (err error)
return err
}

if _, err = tx.ExecContext(ctx, d.UpdateCompactSQL, end); err != nil {
if _, err = tx.ExecContext(ctx, updateCompactSQL, end); err != nil {
return err
}
return tx.Commit()
Expand Down Expand Up @@ -657,7 +647,7 @@ func (d *Generic) DeleteRevision(ctx context.Context, revision int64) error {
}()
span.SetAttributes(attribute.Int64("revision", revision))

_, err = d.execute(ctx, "delete_rev_sql", d.DeleteRevSQL, revision)
_, err = d.execute(ctx, "delete_rev_sql", deleteRevSQL, revision)
return err
}

Expand All @@ -666,7 +656,7 @@ func (d *Generic) List(ctx context.Context, prefix, startKey string, limit, revi
if startKey != "" {
start = startKey + "\x01"
}
sql := d.ListRevisionStartSQL
sql := listRevisionStartSQL
if limit > 0 {
sql = fmt.Sprintf("%s LIMIT ?", sql)
return d.query(ctx, "list_revision_start_sql_limit", sql, start, end, revision, includeDeleted, limit)
Expand Down Expand Up @@ -707,7 +697,7 @@ func (d *Generic) CurrentRevision(ctx context.Context) (int64, error) {

func (d *Generic) AfterPrefix(ctx context.Context, prefix string, rev, limit int64) (*sql.Rows, error) {
start, end := getPrefixRange(prefix)
sql := d.AfterSQLPrefix
sql := afterSQLPrefix
if limit > 0 {
sql = fmt.Sprintf("%s LIMIT ?", sql)
return d.query(ctx, "after_sql_prefix_limit", sql, start, end, rev, limit)
Expand All @@ -716,7 +706,7 @@ func (d *Generic) AfterPrefix(ctx context.Context, prefix string, rev, limit int
}

func (d *Generic) After(ctx context.Context, rev, limit int64) (*sql.Rows, error) {
sql := d.AfterSQL
sql := afterSQL
if limit > 0 {
sql = fmt.Sprintf("%s LIMIT ?", sql)
return d.query(ctx, "after_sql_limit", sql, rev, limit)
Expand All @@ -726,7 +716,7 @@ func (d *Generic) After(ctx context.Context, rev, limit int64) (*sql.Rows, error

func (d *Generic) Fill(ctx context.Context, revision int64) error {
fillCnt.Add(ctx, 1)
_, err := d.execute(ctx, "fill_sql", d.FillSQL, revision, fmt.Sprintf("gap-%d", revision), 0, 1, 0, 0, 0, nil, nil)
_, err := d.execute(ctx, "fill_sql", fillSQL, revision, fmt.Sprintf("gap-%d", revision), 0, 1, 0, 0, 0, nil, nil)
return err
}

Expand All @@ -735,10 +725,7 @@ func (d *Generic) IsFill(key string) bool {
}

func (d *Generic) GetSize(ctx context.Context) (int64, error) {
if d.GetSizeSQL == "" {
return 0, errors.New("driver does not support size reporting")
}
rows, err := d.query(ctx, "get_size_sql", d.GetSizeSQL)
rows, err := d.query(ctx, "get_size_sql", getSizeSQL)
if err != nil {
return 0, err
}
Expand Down
1 change: 0 additions & 1 deletion pkg/kine/drivers/sqlite/sqlite.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ func NewVariant(ctx context.Context, driverName, dataSourceName string, connecti
}
return err
}
dialect.GetSizeSQL = `SELECT (page_count - freelist_count) * page_size FROM pragma_page_count(), pragma_page_size(), pragma_freelist_count()`

dialect.CompactInterval = opts.compactInterval
dialect.PollInterval = opts.pollInterval
Expand Down

0 comments on commit c6736de

Please sign in to comment.