Skip to content

Quest DB 简单使用

Posted on:October 31, 2023 at 11:22 PM

QuestDB 是一个面向列的关系数据库,用于时间序列和事件数据。它使用带有时间序列扩展的 SQL 来辅助实时分析

特性

开发

连接DB

使用postgres :

package main

import (
    "database/sql"
    "fmt"
    "log"
    _ "github.com/lib/pq"
)

const (
    host     = "localhost"
    port     = 8812
    user     = "admin"
    password = "quest"
    dbname   = "qdb"
)

func main() {
    connStr := fmt.Sprintf("host=%s port=%d user=%s password=%s dbname=%s sslmode=disable", host, port, user, password, dbname)
    db, err := sql.Open("postgres", connStr)
    if err != nil {
        panic(err)
    }
    defer db.Close()
    log.Println("Connected")
}

此外,QuestDB 实现了 influx DB 行协议,所以可以使用 influx DB 行协议进行连接和操作 (默认端口 9009,注意:在 mac 上使用 brew service start questdb 时,这个端口不会被启用,手动执行 questdb start 才可以)

package main

import (
    "fmt"
    "io/ioutil"
    "log"
    "net"
    "time"
)

func main() {
    host := "127.0.0.1:9009"
    tcpAddr, err := net.ResolveTCPAddr("tcp4", host)
    checkErr(err)

    conn, err := net.DialTCP("tcp", nil, tcpAddr)
    checkErr(err)
    log.Println("Connected")
    defer conn.Close()
}

func checkErr(err error) {
    if err != nil {
        panic(err)
    }
}

写入数据

pg示例:

package main

import (
  "context"
  "fmt"
  "log"
  "time"

  "github.com/jackc/pgx/v4"
)

var conn *pgx.Conn
var err error

func main() {
    ctx := context.Background()
    conn, err = pgx.Connect(ctx, "postgresql://admin:quest@localhost:8812/qdb")
    if err != nil {
        log.Fatalln(err)
    }
    defer conn.Close(ctx)

    // text-based query
    _, err := conn.Exec(ctx, "CREATE TABLE IF NOT EXISTS test_user (ts TIMESTAMP, date DATE, name STRING, age INT) timestamp(ts);")
    if err != nil {
        log.Fatalln("check table failed", err)
    }

    // Prepared statement given the name 'ps1'
    _, err = conn.Prepare(ctx, "ps1", "INSERT INTO test_user VALUES($1,$2,$3,$4)")
    if err != nil {
        log.Fatalln("Prepare failed", err)
    }
    for i := 0; i < 10; i++ {
        // Execute 'ps1' statement with a string and the loop iterator value
        name := "tname" + fmt.Sprintf("%d", i+1)
        _, err = conn.Exec(ctx, "ps1", time.Now(), time.Now().Round(time.Millisecond), name, i+20)
        if err != nil {
            log.Fatalln("insert failed", err)
        }
    }

    err = conn.Close(ctx)
    if err != nil {
        log.Fatalln(err)
    }
}

使用 influx line protocol 往写入 QuestDB 写入数据,默认端口 9009

package main

import (
  "fmt"
  "io/ioutil"
  "net"
  "time"
)

func main() {
  host := "127.0.0.1:9009"
  tcpAddr, err := net.ResolveTCPAddr("tcp4", host)
  checkErr(err)
  rows := [2]string{
    fmt.Sprintf("trades,name=test_ilp1 value=12.4 %d", time.Now().UnixNano()),
    fmt.Sprintf("trades,name=test_ilp2 value=11.4 %d", time.Now().UnixNano()),
  }

  conn, err := net.DialTCP("tcp", nil, tcpAddr)
  checkErr(err)
  defer conn.Close()

  for _, s := range rows {
    _, err = conn.Write([]byte(fmt.Sprintf("%s\n", s)))
    checkErr(err)
  }
}

func checkErr(err error) {
  if err != nil {
    panic(err)
  }
}

使用 REST API 往 QuestDB 写入数据,QuestDB 外抛了两个 API 接口用于数据导入,默认端口 9000

/imp golang 示例:

package main

import (
  "bytes"
  "fmt"
  "io"
  "io/ioutil"
  "log"
  "mime/multipart"
  "net/http"
  "net/url"
  "os"
)

func main() {
  u, err := url.Parse("http://localhost:9000")
  checkErr(err)
  u.Path += "imp"
  url := fmt.Sprintf("%v", u)
  fileName := "/path/to/data.csv"
  file, err := os.Open(fileName)
  checkErr(err)

  defer file.Close()

  buf := new(bytes.Buffer)
  writer := multipart.NewWriter(buf)
  uploadFile, _ := writer.CreateFormFile("data", "data.csv")
  _, err = io.Copy(uploadFile, file)
  checkErr(err)
  writer.Close()

  req, err := http.NewRequest(http.MethodPut, url, buf)
  checkErr(err)
  req.Header.Add("Content-Type", writer.FormDataContentType())

  client := &http.Client{}
  res, err := client.Do(req)
  checkErr(err)

  defer res.Body.Close()

  body, err := ioutil.ReadAll(res.Body)
  checkErr(err)

  log.Println(string(body))
}

func checkErr(err error) {
  if err != nil {
    panic(err)
  }
}

/exec golang 示例:

package main

import (
  "fmt"
  "io/ioutil"
  "log"
  "net/http"
  "net/url"
)

func main() {
  u, err := url.Parse("http://localhost:9000")
  checkErr(err)

  u.Path += "exec"
  params := url.Values{}
  params.Add("query", `
    CREATE TABLE IF NOT EXISTS
      trades (name STRING, value INT);
    INSERT INTO
      trades
    VALUES(
      "abc",
      123456
    );
  `)
  u.RawQuery = params.Encode()
  url := fmt.Sprintf("%v", u)

  res, err := http.Get(url)
  checkErr(err)

  defer res.Body.Close()

  body, err := ioutil.ReadAll(res.Body)
  checkErr(err)

  log.Println(string(body))
}

func checkErr(err error) {
  if err != nil {
    panic(err)
  }
}

查询数据:

package main

import (
  "fmt"
  "io/ioutil"
  "log"
  "net/http"
  "net/url"
)

func main() {
  u, err := url.Parse("http://localhost:9000")
  checkErr(err)

  u.Path += "exec"
  params := url.Values{}
  params.Add("query", "SELECT x FROM long_sequence(5);")
  u.RawQuery = params.Encode()
  url := fmt.Sprintf("%v", u)

  res, err := http.Get(url)
  checkErr(err)

  defer res.Body.Close()

  body, err := ioutil.ReadAll(res.Body)
  checkErr(err)

  log.Println(string(body))
}

func checkErr(err error) {
  if err != nil {
    panic(err)
  }
}

使用 Postgres 客户端查询数据,默认端口 9009

package main

import (
  "database/sql"
  "fmt"

  _ "github.com/lib/pq"
)

const (
  host     = "localhost"
  port     = 8812
  user     = "admin"
  password = "quest"
  dbname   = "qdb"
)

func main() {
  connStr := fmt.Sprintf("host=%s port=%d user=%s password=%s dbname=%s sslmode=disable", host, port, user, password, dbname)
  db, err := sql.Open("postgres", connStr)
  checkErr(err)
  defer db.Close()

  // Currently, we do not support queries with bind parameters in Go
  rows, err := db.Query("SELECT x FROM long_sequence(5);")
  checkErr(err)
  defer rows.Close()

  for rows.Next() {
    var num string
    err = rows.Scan(&num)
    checkErr(err)
    fmt.Println(num)
  }

  err = rows.Err()
  checkErr(err)
}

func checkErr(err error) {
  if err != nil {
    panic(err)
  }
}