1
0
mirror of https://github.com/gogf/gf.git synced 2025-04-05 11:18:50 +08:00

improve clickhouse driver

This commit is contained in:
daguang 2022-02-23 22:51:37 +08:00
parent 455d724c01
commit 157e936f24
7 changed files with 95 additions and 54 deletions

View File

@ -76,6 +76,13 @@ jobs:
--health-timeout 5s
--health-retries 10
clickhouse-server:
image: yandex/clickhouse-server
ports:
- 9000:9000
- 8123:8123
- 9001:9001
# strategy set
strategy:

View File

@ -37,6 +37,7 @@ var (
ErrUnsupportedReplace = errors.New("unsupported method:Replace")
ErrUnsupportedBegin = errors.New("unsupported method:Begin")
ErrUnsupportedTransaction = errors.New("unsupported method:Transaction")
ErrSQLNull = errors.New("SQL cannot be null")
)
func init() {
@ -74,8 +75,8 @@ func (d *Driver) Open(config *gdb.ConfigNode) (*sql.DB, error) {
config.User, config.Host, config.Port, config.Name)
}
source += fmt.Sprintf(
"?charset=%s&debug=%s&compress=%s",
config.Charset, gconv.String(config.Debug), gconv.String(config.Compress),
"?charset=%s&debug=%s",
config.Charset, gconv.String(config.Debug),
)
db, err := sql.Open(driver, source)
if err != nil {
@ -204,24 +205,38 @@ func (d *Driver) ping(conn *sql.DB) error {
return err
}
// DoUpdateSQL in clickhouse ,use update must use alter
// eg.
// ALTER TABLE [db.]table UPDATE column1 = expr1 [, ...] WHERE filter_expr
func (d *Driver) DoUpdateSQL(ctx context.Context, link gdb.Link, table string, updates interface{}, condition string, args ...interface{}) (result sql.Result, err error) {
return d.Core.DoExec(ctx, link, fmt.Sprintf("ALTER TABLE %s UPDATE %s%s", table, updates, condition), args...)
}
// DoDeleteSQL in clickhouse , delete must use alter
// eg.
// ALTER TABLE [db.]table DELETE WHERE filter_expr
func (d *Driver) DoDeleteSQL(ctx context.Context, link gdb.Link, table string, condition interface{}, args ...interface{}) (result sql.Result, err error) {
return d.Core.DoExec(ctx, link, fmt.Sprintf("ALTER TABLE %s DELETE %s", table, condition), args...)
// DoFilter handles the sql before posts it to database.
func (d *Driver) DoFilter(ctx context.Context, link gdb.Link, sql string, args []interface{}) (newSql string, newArgs []interface{}, err error) {
// replace MySQL to Clickhouse SQL grammar
// MySQL eg: UPDATE visits SET xxx
// Clickhouse eg: ALTER TABLE visits UPDATE xxx
// MySQL eg: DELETE FROM VISIT
// Clickhouse eg: ALTER TABLE VISIT DELETE WHERE filter_expr
result, err := gregex.MatchString("(?i)^UPDATE|DELETE", sql)
if err != nil {
return "", nil, err
}
if len(result) != 0 {
sqlSlice := strings.Split(sql, " ")
if len(sqlSlice) < 3 {
return "", nil, ErrSQLNull
}
ck := []string{"ALTER", "TABLE"}
switch strings.ToUpper(result[0]) {
case "UPDATE":
sqlSlice = append(append(append(ck, sqlSlice[1]), result[0]), sqlSlice[3:]...)
return strings.Join(sqlSlice, " "), args, nil
case "DELETE":
sqlSlice = append(append(append(ck, sqlSlice[2]), result[0]), sqlSlice[3:]...)
return strings.Join(sqlSlice, " "), args, nil
}
}
return sql, args, nil
}
// DoCommit commits current sql and arguments to underlying sql driver.
func (d *Driver) DoCommit(ctx context.Context, in gdb.DoCommitInput) (out gdb.DoCommitOutput, err error) {
in.IsIgnoreResult = true
return d.Core.DoCommit(ctx, in)
return d.Core.DoCommit(context.WithValue(ctx, "isIgnoreResult", true), in)
}
func (d *Driver) DoInsert(ctx context.Context, link gdb.Link, table string, list gdb.List, option gdb.DoInsertOption) (result sql.Result, err error) {

View File

@ -26,13 +26,12 @@ import (
// ORDER BY id
func InitClickhouse() gdb.DB {
connect, err := gdb.New(gdb.ConfigNode{
Host: "127.0.0.1",
Port: "9000",
User: "default",
Name: "default",
Type: "clickhouse",
Debug: true,
Compress: true,
Host: "127.0.0.1",
Port: "9000",
User: "default",
Name: "default",
Type: "clickhouse",
Debug: true,
})
gtest.AssertNil(err)
gtest.AssertNE(connect, nil)
@ -94,6 +93,12 @@ func TestDriverClickhouse_DoUpdate(t *testing.T) {
"created": time.Now().Format("2006-01-02 15:04:05"),
}).Update()
gtest.AssertNil(err)
_, err = connect.Model("visits").Data(g.Map{
"created": time.Now().Format("2006-01-02 15:04:05"),
}).Update()
gtest.AssertNE(err, nil)
_, err = connect.Model("visits").Update()
gtest.AssertNE(err, nil)
}
func TestDriverClickhouse_Select(t *testing.T) {
@ -198,3 +203,32 @@ func TestDriverClickhouse_DoExec(t *testing.T) {
_, err := connect.Exec(context.Background(), sqlStr)
gtest.AssertNil(err)
}
func TestDriver_DoFilter(t *testing.T) {
rawSQL := "select * from visits where 1 = 1"
this := Driver{}
replaceSQL, _, err := this.DoFilter(nil, nil, rawSQL, nil)
gtest.AssertNil(err)
gtest.AssertEQ(rawSQL, replaceSQL)
rawSQL = "update visit set url = '1'"
replaceSQL, _, err = this.DoFilter(nil, nil, rawSQL, nil)
gtest.AssertNil(err)
// this SQL can't run ,clickhouse will report an error because there is no WHERE statement
gtest.AssertEQ(replaceSQL, "ALTER TABLE visit update url = '1'")
rawSQL = "delete from visit"
replaceSQL, _, err = this.DoFilter(nil, nil, rawSQL, nil)
gtest.AssertNil(err)
// this SQL can't run ,clickhouse will report an error because there is no WHERE statement
gtest.AssertEQ(replaceSQL, "ALTER TABLE visit delete")
rawSQL = "update visit set url = '1' where url = '0'"
replaceSQL, _, err = this.DoFilter(nil, nil, rawSQL, nil)
gtest.AssertNil(err)
// this SQL can't run ,clickhouse will report an error because there is no WHERE statement
gtest.AssertEQ(replaceSQL, "ALTER TABLE visit update url = '1' where url = '0'")
rawSQL = "delete from visit where url='0'"
replaceSQL, _, err = this.DoFilter(nil, nil, rawSQL, nil)
gtest.AssertNil(err)
// this SQL can't run ,clickhouse will report an error because there is no WHERE statement
gtest.AssertEQ(replaceSQL, "ALTER TABLE visit delete where url='0'")
}

View File

@ -97,14 +97,12 @@ type DB interface {
DoGetAll(ctx context.Context, link Link, sql string, args ...interface{}) (result Result, err error) // See Core.DoGetAll.
DoInsert(ctx context.Context, link Link, table string, data List, option DoInsertOption) (result sql.Result, err error) // See Core.DoInsert.
DoUpdate(ctx context.Context, link Link, table string, data interface{}, condition string, args ...interface{}) (result sql.Result, err error) // See Core.DoUpdate.
DoUpdateSQL(ctx context.Context, link Link, table string, updates interface{}, condition string, args ...interface{}) (result sql.Result, err error)
DoDelete(ctx context.Context, link Link, table string, condition string, args ...interface{}) (result sql.Result, err error) // See Core.DoDelete.
DoDeleteSQL(ctx context.Context, link Link, table string, condition interface{}, args ...interface{}) (result sql.Result, err error)
DoQuery(ctx context.Context, link Link, sql string, args ...interface{}) (result Result, err error) // See Core.DoQuery.
DoExec(ctx context.Context, link Link, sql string, args ...interface{}) (result sql.Result, err error) // See Core.DoExec.
DoFilter(ctx context.Context, link Link, sql string, args []interface{}) (newSql string, newArgs []interface{}, err error) // See Core.DoFilter.
DoCommit(ctx context.Context, in DoCommitInput) (out DoCommitOutput, err error) // See Core.DoCommit.
DoPrepare(ctx context.Context, link Link, sql string) (*Stmt, error) // See Core.DoPrepare.
DoDelete(ctx context.Context, link Link, table string, condition string, args ...interface{}) (result sql.Result, err error) // See Core.DoDelete.
DoQuery(ctx context.Context, link Link, sql string, args ...interface{}) (result Result, err error) // See Core.DoQuery.
DoExec(ctx context.Context, link Link, sql string, args ...interface{}) (result sql.Result, err error) // See Core.DoExec.
DoFilter(ctx context.Context, link Link, sql string, args []interface{}) (newSql string, newArgs []interface{}, err error) // See Core.DoFilter.
DoCommit(ctx context.Context, in DoCommitInput) (out DoCommitOutput, err error) // See Core.DoCommit.
DoPrepare(ctx context.Context, link Link, sql string) (*Stmt, error) // See Core.DoPrepare.
// ===========================================================================
// Query APIs for convenience purpose.
@ -187,15 +185,14 @@ type Core struct {
// DoCommitInput is the input parameters for function DoCommit.
type DoCommitInput struct {
Db *sql.DB
Tx *sql.Tx
Stmt *sql.Stmt
Link Link
Sql string
Args []interface{}
Type string
IsTransaction bool
IsIgnoreResult bool
Db *sql.DB
Tx *sql.Tx
Stmt *sql.Stmt
Link Link
Sql string
Args []interface{}
Type string
IsTransaction bool
}
// DoCommitOutput is the output parameters for function DoCommit.

View File

@ -598,12 +598,6 @@ func (c *Core) DoUpdate(ctx context.Context, link Link, table string, data inter
return nil, err
}
}
return c.db.DoUpdateSQL(ctx, link, table, updates, condition, args...)
}
// DoUpdateSQL Adapt to difference in different drives
// For example, Clickhouse's update
func (c *Core) DoUpdateSQL(ctx context.Context, link Link, table string, updates interface{}, condition string, args ...interface{}) (result sql.Result, err error) {
return c.db.DoExec(ctx, link, fmt.Sprintf("UPDATE %s SET %s%s", table, updates, condition), args...)
}
@ -631,12 +625,6 @@ func (c *Core) DoDelete(ctx context.Context, link Link, table string, condition
}
}
table = c.QuotePrefixTableName(table)
return c.db.DoDeleteSQL(ctx, link, table, condition, args...)
}
// DoDeleteSQL Adapt to difference in different drives
// For example, Clickhouse's delete
func (c *Core) DoDeleteSQL(ctx context.Context, link Link, table string, condition interface{}, args ...interface{}) (result sql.Result, err error) {
return c.db.DoExec(ctx, link, fmt.Sprintf("DELETE FROM %s%s", table, condition), args...)
}

View File

@ -48,7 +48,6 @@ type ConfigNode struct {
UpdatedAt string `json:"updatedAt"` // (Optional) The filed name of table for automatic-filled updated datetime.
DeletedAt string `json:"deletedAt"` // (Optional) The filed name of table for automatic-filled updated datetime.
TimeMaintainDisabled bool `json:"timeMaintainDisabled"` // (Optional) Disable the automatic time maintaining feature.
Compress bool `json:"compress"` // (Optional,only Clickhouse) enable lz4 compression
}
const (

View File

@ -20,6 +20,7 @@ import (
"github.com/gogf/gf/v2/errors/gerror"
"github.com/gogf/gf/v2/internal/intlog"
"github.com/gogf/gf/v2/os/gtime"
"github.com/gogf/gf/v2/util/gconv"
"github.com/gogf/gf/v2/util/guid"
)
@ -211,7 +212,7 @@ func (c *Core) DoCommit(ctx context.Context, in DoCommitInput) (out DoCommitOutp
}
// Result handling.
switch {
case sqlResult != nil && !in.IsIgnoreResult:
case sqlResult != nil && !gconv.Bool(ctx.Value("isIgnoreResult")):
rowsAffected, err = sqlResult.RowsAffected()
out.Result = sqlResult