QuestDB 是一个面向列的关系数据库,用于时间序列和事件数据。它使用带有时间序列扩展的 SQL 来辅助实时分析
特性
Designated timestamp
是一个核心特性,它支持面向时间的语言功能和分区Symbol
符号类型可以有效地存储和检索重复的字符串Storage model
存储模型描述了 QuestDB 如何在表中存储记录和分区Indexes
索引可以用于对特定列的更快的读取访问Partitions
分区可以在计算和查询方面获得显著的性能优势SQL extensionsSQL
扩展允许使用简洁的语法进行性能时间序列分析Roor Directory
根目录描述了用于存储和配置的 QuestDB 目录内容
开发
- 可以使用 Postgres 或者 influxDB 客户端 SDK 进行连接
- 写入数据可以使用 Postgres 客户端或者 influx 行协议,也可以通过 REST API 进行批量数据插入
- 查询数据可以使用 Postgres 客户端、REST API 或者 web 控制台
- 使用 influx 行协议时,默认端口在 9099,每条语句块必须以\n分隔
- 时间戳是可选的表列,每条数据都会默认带上服务器当前的时间戳
- 官方推荐使用 influx 行协议进行插入数据,更高效
连接DB
使用postgres :
package main
import (
"database/sql"
"fmt"
"log"
_ "github.com/lib/pq"
)
const (
host = "localhost"
port = 8812
user = "admin"
password = "quest"
dbname = "qdb"
)
func main() {
connStr := fmt.Sprintf("host=%s port=%d user=%s password=%s dbname=%s sslmode=disable", host, port, user, password, dbname)
db, err := sql.Open("postgres", connStr)
if err != nil {
panic(err)
}
defer db.Close()
log.Println("Connected")
}
此外,QuestDB 实现了 influx DB 行协议,所以可以使用 influx DB 行协议进行连接和操作 (默认端口 9009,注意:在 mac 上使用 brew service start questdb 时,这个端口不会被启用,手动执行 questdb start 才可以)
package main
import (
"fmt"
"io/ioutil"
"log"
"net"
"time"
)
func main() {
host := "127.0.0.1:9009"
tcpAddr, err := net.ResolveTCPAddr("tcp4", host)
checkErr(err)
conn, err := net.DialTCP("tcp", nil, tcpAddr)
checkErr(err)
log.Println("Connected")
defer conn.Close()
}
func checkErr(err error) {
if err != nil {
panic(err)
}
}
写入数据
- QuestDB web UI 端口默认在 9000,本地访问浏览器打开http://localhost:9000/ 即可
- 使用 Postgres 客户端往 QuestDB 写入数据,默认端口 8812
pg示例:
package main
import (
"context"
"fmt"
"log"
"time"
"github.com/jackc/pgx/v4"
)
var conn *pgx.Conn
var err error
func main() {
ctx := context.Background()
conn, err = pgx.Connect(ctx, "postgresql://admin:quest@localhost:8812/qdb")
if err != nil {
log.Fatalln(err)
}
defer conn.Close(ctx)
// text-based query
_, err := conn.Exec(ctx, "CREATE TABLE IF NOT EXISTS test_user (ts TIMESTAMP, date DATE, name STRING, age INT) timestamp(ts);")
if err != nil {
log.Fatalln("check table failed", err)
}
// Prepared statement given the name 'ps1'
_, err = conn.Prepare(ctx, "ps1", "INSERT INTO test_user VALUES($1,$2,$3,$4)")
if err != nil {
log.Fatalln("Prepare failed", err)
}
for i := 0; i < 10; i++ {
// Execute 'ps1' statement with a string and the loop iterator value
name := "tname" + fmt.Sprintf("%d", i+1)
_, err = conn.Exec(ctx, "ps1", time.Now(), time.Now().Round(time.Millisecond), name, i+20)
if err != nil {
log.Fatalln("insert failed", err)
}
}
err = conn.Close(ctx)
if err != nil {
log.Fatalln(err)
}
}
使用 influx line protocol 往写入 QuestDB 写入数据,默认端口 9009
package main
import (
"fmt"
"io/ioutil"
"net"
"time"
)
func main() {
host := "127.0.0.1:9009"
tcpAddr, err := net.ResolveTCPAddr("tcp4", host)
checkErr(err)
rows := [2]string{
fmt.Sprintf("trades,name=test_ilp1 value=12.4 %d", time.Now().UnixNano()),
fmt.Sprintf("trades,name=test_ilp2 value=11.4 %d", time.Now().UnixNano()),
}
conn, err := net.DialTCP("tcp", nil, tcpAddr)
checkErr(err)
defer conn.Close()
for _, s := range rows {
_, err = conn.Write([]byte(fmt.Sprintf("%s\n", s)))
checkErr(err)
}
}
func checkErr(err error) {
if err != nil {
panic(err)
}
}
使用 REST API 往 QuestDB 写入数据,QuestDB 外抛了两个 API 接口用于数据导入,默认端口 9000
- /imp 导入数据,支持 csv 格式
- /exec 可以批量执行 SQL 语句
/imp golang 示例:
package main
import (
"bytes"
"fmt"
"io"
"io/ioutil"
"log"
"mime/multipart"
"net/http"
"net/url"
"os"
)
func main() {
u, err := url.Parse("http://localhost:9000")
checkErr(err)
u.Path += "imp"
url := fmt.Sprintf("%v", u)
fileName := "/path/to/data.csv"
file, err := os.Open(fileName)
checkErr(err)
defer file.Close()
buf := new(bytes.Buffer)
writer := multipart.NewWriter(buf)
uploadFile, _ := writer.CreateFormFile("data", "data.csv")
_, err = io.Copy(uploadFile, file)
checkErr(err)
writer.Close()
req, err := http.NewRequest(http.MethodPut, url, buf)
checkErr(err)
req.Header.Add("Content-Type", writer.FormDataContentType())
client := &http.Client{}
res, err := client.Do(req)
checkErr(err)
defer res.Body.Close()
body, err := ioutil.ReadAll(res.Body)
checkErr(err)
log.Println(string(body))
}
func checkErr(err error) {
if err != nil {
panic(err)
}
}
/exec golang 示例:
package main
import (
"fmt"
"io/ioutil"
"log"
"net/http"
"net/url"
)
func main() {
u, err := url.Parse("http://localhost:9000")
checkErr(err)
u.Path += "exec"
params := url.Values{}
params.Add("query", `
CREATE TABLE IF NOT EXISTS
trades (name STRING, value INT);
INSERT INTO
trades
VALUES(
"abc",
123456
);
`)
u.RawQuery = params.Encode()
url := fmt.Sprintf("%v", u)
res, err := http.Get(url)
checkErr(err)
defer res.Body.Close()
body, err := ioutil.ReadAll(res.Body)
checkErr(err)
log.Println(string(body))
}
func checkErr(err error) {
if err != nil {
panic(err)
}
}
查询数据:
package main
import (
"fmt"
"io/ioutil"
"log"
"net/http"
"net/url"
)
func main() {
u, err := url.Parse("http://localhost:9000")
checkErr(err)
u.Path += "exec"
params := url.Values{}
params.Add("query", "SELECT x FROM long_sequence(5);")
u.RawQuery = params.Encode()
url := fmt.Sprintf("%v", u)
res, err := http.Get(url)
checkErr(err)
defer res.Body.Close()
body, err := ioutil.ReadAll(res.Body)
checkErr(err)
log.Println(string(body))
}
func checkErr(err error) {
if err != nil {
panic(err)
}
}
使用 Postgres 客户端查询数据,默认端口 9009
package main
import (
"database/sql"
"fmt"
_ "github.com/lib/pq"
)
const (
host = "localhost"
port = 8812
user = "admin"
password = "quest"
dbname = "qdb"
)
func main() {
connStr := fmt.Sprintf("host=%s port=%d user=%s password=%s dbname=%s sslmode=disable", host, port, user, password, dbname)
db, err := sql.Open("postgres", connStr)
checkErr(err)
defer db.Close()
// Currently, we do not support queries with bind parameters in Go
rows, err := db.Query("SELECT x FROM long_sequence(5);")
checkErr(err)
defer rows.Close()
for rows.Next() {
var num string
err = rows.Scan(&num)
checkErr(err)
fmt.Println(num)
}
err = rows.Err()
checkErr(err)
}
func checkErr(err error) {
if err != nil {
panic(err)
}
}