gtxyzz

远程写入prometheus存储

gtxyzz 运维技术 2022-11-14 461浏览 0

远程写入prometheus存储

简介

prometheus一般都是采用pull方式获取数据,但是有一些情况下,不方便配置exporter,就希望能通过push的方式上传指标数据。

1、可以采用pushgateway的方式,推送到pushgateway,然后prometheus通过pushgateway拉取数据。

2、在新版本中增加了一个参数:–enable-feature=remote-write-receiver,允许远程通过接口/api/v1/write,直接写数据到prometheus里面。

pushgateway在高并发的情况下还是比较消耗资源的,特别是开启一致性检查,高并发写入的时候特别慢。

第二种方式少了一层转发,速度应该比较快。

远程写入prometheus存储

接口

可以通过prometheus的http接口/api/v1/write提交数据,这个接口的数据格式有有要求:

  • 使用POST方式提交
  • 需要经过protobuf编码,依赖github.com/gogo/protobuf/proto
  • 可以使用snappy进行压缩,依赖github.com/golang/snappy

步骤:

  1. 收集指标名称,时间戳,值和标签
  2. 将数据转换成prometheus需要的数据格式
  3. 使用proto对数据进行编码,并用snappy进行压缩
  4. 通过httpClient提交数据
    packageprome
    
    import(
    "bufio"
    "bytes"
    "context"
    "io"
    "io/ioutil"
    "net/http"
    "net/url"
    "regexp"
    "time"
    
    "github.com/gogo/protobuf/proto"
    "github.com/golang/snappy"
    "github.com/opentracing-contrib/go-stdlib/nethttp"
    opentracing"github.com/opentracing/opentracing-go"
    "github.com/pkg/errors"
    "github.com/prometheus/common/model"
    "github.com/prometheus/prometheus/pkg/labels"
    "github.com/prometheus/prometheus/prompb"
    )
    
    typeRecoverableErrorstruct{
    error
    }
    
    typeHttpClientstruct{
    url*url.URL
    Client*http.Client
    timeouttime.Duration
    }
    
    varMetricNameRE=regexp.MustCompile(`^[a-zA-Z_:][a-zA-Z0-9_:]*$`)
    
    typeMetricPointstruct{
    Metricstring`json:"metric"`//指标名称
    TagsMapmap[string]string`json:"tags"`//数据标签
    Timeint64`json:"time"`//时间戳,单位是秒
    Valuefloat64`json:"value"`//内部字段,最终转换之后的float64数值
    }
    
    func(c*HttpClient)remoteWritePost(req[]byte)error{
    httpReq,err:=http.NewRequest("POST",c.url.String(),bytes.NewReader(req))
    iferr!=nil{
    returnerr
    }
    httpReq.Header.Add("Content-Encoding","snappy")
    httpReq.Header.Set("Content-Type","application/x-protobuf")
    httpReq.Header.Set("User-Agent","opcai")
    httpReq.Header.Set("X-Prometheus-Remote-Write-Version","0.1.0")
    ctx,cancel:=context.WithTimeout(context.Background(),c.timeout)
    defercancel()
    
    httpReq=httpReq.WithContext(ctx)
    
    ifparentSpan:=opentracing.SpanFromContext(ctx);parentSpan!=nil{
    varht*nethttp.Tracer
    httpReq,ht=nethttp.TraceRequest(
    parentSpan.Tracer(),
    httpReq,
    nethttp.OperationName("RemoteStore"),
    nethttp.ClientTrace(false),
    )
    deferht.Finish()
    }
    
    httpResp,err:=c.Client.Do(httpReq)
    iferr!=nil{
    //ErrorsfromClient.Doarefrom(forexample)networkerrors,soare
    //recoverable.
    returnRecoverableError{err}
    }
    deferfunc(){
    io.Copy(ioutil.Discard,httpResp.Body)
    httpResp.Body.Close()
    }()
    
    ifhttpResp.StatusCode/100!=2{
    scanner:=bufio.NewScanner(io.LimitReader(httpResp.Body,512))
    line:=""
    ifscanner.Scan(){
    line=scanner.Text()
    }
    err=errors.Errorf("serverreturnedHTTPstatus%s:%s",httpResp.Status,line)
    }
    ifhttpResp.StatusCode/100==5{
    returnRecoverableError{err}
    }
    returnerr
    }
    
    funcbuildWriteRequest(samples[]*prompb.TimeSeries)([]byte,error){
    
    req:=&prompb.WriteRequest{
    Timeseries:samples,
    }
    data,err:=proto.Marshal(req)
    iferr!=nil{
    returnnil,err
    }
    compressed:=snappy.Encode(nil,data)
    returncompressed,nil
    }
    
    typesamplestruct{
    labelslabels.Labels
    tint64
    vfloat64
    }
    
    const(
    LABEL_NAME="__name__"
    )
    
    funcconvertOne(item*MetricPoint)(*prompb.TimeSeries,error){
    pt:=prompb.TimeSeries{}
    pt.Samples=[]prompb.Sample{{}}
    s:=sample{}
    s.t=item.Time
    s.v=item.Value
    //name
    if!MetricNameRE.MatchString(item.Metric){
    return&pt,errors.New("invalidmetricsname")
    }
    nameLs:=labels.Label{
    Name:LABEL_NAME,
    Value:item.Metric,
    }
    s.labels=append(s.labels,nameLs)
    fork,v:=rangeitem.TagsMap{
    ifmodel.LabelNameRE.MatchString(k){
    ls:=labels.Label{
    Name:k,
    Value:v,
    }
    s.labels=append(s.labels,ls)
    }
    }
    
    pt.Labels=labelsToLabelsProto(s.labels,pt.Labels)
    //时间赋值问题,使用毫秒时间戳
    tsMs:=time.Unix(s.t,0).UnixNano()/1e6
    pt.Samples[0].Timestamp=tsMs
    pt.Samples[0].Value=s.v
    return&pt,nil
    }
    
    funclabelsToLabelsProto(labelslabels.Labels,buf[]*prompb.Label)[]*prompb.Label{
    result:=buf[:0]
    ifcap(buf)<len(labels){
    result=make([]*prompb.Label,0,len(labels))
    }
    for_,l:=rangelabels{
    result=append(result,&prompb.Label{
    Name:l.Name,
    Value:l.Value,
    })
    }
    returnresult
    }
    
    func(c*HttpClient)RemoteWrite(items[]MetricPoint)(errerror){
    iflen(items)==0{
    return
    }
    ts:=make([]*prompb.TimeSeries,len(items))
    fori:=rangeitems{
    ts[i],err=convertOne(&items[i])
    iferr!=nil{
    return
    }
    }
    data,err:=buildWriteRequest(ts)
    iferr!=nil{
    return
    }
    err=c.remoteWritePost(data)
    return
    }
    
    funcNewClient(urstring,timeouttime.Duration)(c*HttpClient,errerror){
    u,err:=url.Parse(ur)
    iferr!=nil{
    return
    }
    c=&HttpClient{
    url:u,
    Client:&http.Client{},
    timeout:timeout,
    }
    return
    }
    

    测试

    prometheus启动的时候记得加参数–enable-feature=remote-write-receiver

    packageprome
    
    import(
    "testing"
    "time"
    )
    
    funcTestRemoteWrite(t*testing.T){
    c,err:=NewClient("http://localhost:9090/api/v1/write",10*time.Second)
    iferr!=nil{
    t.Fatal(err)
    }
    metrics:=[]MetricPoint{
    {Metric:"opcai1",
    TagsMap:map[string]string{"env":"testing","op":"opcai"},
    Time:time.Now().Add(-1*time.Minute).Unix(),
    Value:1},
    {Metric:"opcai2",
    TagsMap:map[string]string{"env":"testing","op":"opcai"},
    Time:time.Now().Add(-2*time.Minute).Unix(),
    Value:2},
    {Metric:"opcai3",
    TagsMap:map[string]string{"env":"testing","op":"opcai"},
    Time:time.Now().Unix(),
    Value:3},
    {Metric:"opcai4",
    TagsMap:map[string]string{"env":"testing","op":"opcai"},
    Time:time.Now().Unix(),
    Value:4},
    }
    err=c.RemoteWrite(metrics)
    iferr!=nil{
    t.Fatal(err)
    }
    t.Log("end...")
    }
    

    使用go test进行测试

    gotest-v
    

    总结

    这个方法也是在看夜莺v5的代码的时候发现的,刚好有需要统一收集redis的监控指标,刚好可以用上,之前用pushgateway写的实在是慢。

继续浏览有关 系统运维 的文章
发表评论