简介
prometheus一般都是采用pull方式获取数据,但是有一些情况下,不方便配置exporter,就希望能通过push的方式上传指标数据。
1、可以采用pushgateway的方式,推送到pushgateway,然后prometheus通过pushgateway拉取数据。
2、在新版本中增加了一个参数:–enable-feature=remote-write-receiver,允许远程通过接口/api/v1/write,直接写数据到prometheus里面。
pushgateway在高并发的情况下还是比较消耗资源的,特别是开启一致性检查,高并发写入的时候特别慢。
第二种方式少了一层转发,速度应该比较快。
接口
可以通过prometheus的http接口/api/v1/write提交数据,这个接口的数据格式有有要求:
- 使用POST方式提交
- 需要经过protobuf编码,依赖github.com/gogo/protobuf/proto
- 可以使用snappy进行压缩,依赖github.com/golang/snappy
步骤:
- 收集指标名称,时间戳,值和标签
- 将数据转换成prometheus需要的数据格式
- 使用proto对数据进行编码,并用snappy进行压缩
- 通过httpClient提交数据
packageprome import( "bufio" "bytes" "context" "io" "io/ioutil" "net/http" "net/url" "regexp" "time" "github.com/gogo/protobuf/proto" "github.com/golang/snappy" "github.com/opentracing-contrib/go-stdlib/nethttp" opentracing"github.com/opentracing/opentracing-go" "github.com/pkg/errors" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/prompb" ) typeRecoverableErrorstruct{ error } typeHttpClientstruct{ url*url.URL Client*http.Client timeouttime.Duration } varMetricNameRE=regexp.MustCompile(`^[a-zA-Z_:][a-zA-Z0-9_:]*$`) typeMetricPointstruct{ Metricstring`json:"metric"`//指标名称 TagsMapmap[string]string`json:"tags"`//数据标签 Timeint64`json:"time"`//时间戳,单位是秒 Valuefloat64`json:"value"`//内部字段,最终转换之后的float64数值 } func(c*HttpClient)remoteWritePost(req[]byte)error{ httpReq,err:=http.NewRequest("POST",c.url.String(),bytes.NewReader(req)) iferr!=nil{ returnerr } httpReq.Header.Add("Content-Encoding","snappy") httpReq.Header.Set("Content-Type","application/x-protobuf") httpReq.Header.Set("User-Agent","opcai") httpReq.Header.Set("X-Prometheus-Remote-Write-Version","0.1.0") ctx,cancel:=context.WithTimeout(context.Background(),c.timeout) defercancel() httpReq=httpReq.WithContext(ctx) ifparentSpan:=opentracing.SpanFromContext(ctx);parentSpan!=nil{ varht*nethttp.Tracer httpReq,ht=nethttp.TraceRequest( parentSpan.Tracer(), httpReq, nethttp.OperationName("RemoteStore"), nethttp.ClientTrace(false), ) deferht.Finish() } httpResp,err:=c.Client.Do(httpReq) iferr!=nil{ //ErrorsfromClient.Doarefrom(forexample)networkerrors,soare //recoverable. returnRecoverableError{err} } deferfunc(){ io.Copy(ioutil.Discard,httpResp.Body) httpResp.Body.Close() }() ifhttpResp.StatusCode/100!=2{ scanner:=bufio.NewScanner(io.LimitReader(httpResp.Body,512)) line:="" ifscanner.Scan(){ line=scanner.Text() } err=errors.Errorf("serverreturnedHTTPstatus%s:%s",httpResp.Status,line) } ifhttpResp.StatusCode/100==5{ returnRecoverableError{err} } returnerr } funcbuildWriteRequest(samples[]*prompb.TimeSeries)([]byte,error){ req:=&prompb.WriteRequest{ Timeseries:samples, } data,err:=proto.Marshal(req) iferr!=nil{ returnnil,err } compressed:=snappy.Encode(nil,data) returncompressed,nil } typesamplestruct{ labelslabels.Labels tint64 vfloat64 } const( LABEL_NAME="__name__" ) funcconvertOne(item*MetricPoint)(*prompb.TimeSeries,error){ pt:=prompb.TimeSeries{} pt.Samples=[]prompb.Sample{{}} s:=sample{} s.t=item.Time s.v=item.Value //name if!MetricNameRE.MatchString(item.Metric){ return&pt,errors.New("invalidmetricsname") } nameLs:=labels.Label{ Name:LABEL_NAME, Value:item.Metric, } s.labels=append(s.labels,nameLs) fork,v:=rangeitem.TagsMap{ ifmodel.LabelNameRE.MatchString(k){ ls:=labels.Label{ Name:k, Value:v, } s.labels=append(s.labels,ls) } } pt.Labels=labelsToLabelsProto(s.labels,pt.Labels) //时间赋值问题,使用毫秒时间戳 tsMs:=time.Unix(s.t,0).UnixNano()/1e6 pt.Samples[0].Timestamp=tsMs pt.Samples[0].Value=s.v return&pt,nil } funclabelsToLabelsProto(labelslabels.Labels,buf[]*prompb.Label)[]*prompb.Label{ result:=buf[:0] ifcap(buf)<len(labels){ result=make([]*prompb.Label,0,len(labels)) } for_,l:=rangelabels{ result=append(result,&prompb.Label{ Name:l.Name, Value:l.Value, }) } returnresult } func(c*HttpClient)RemoteWrite(items[]MetricPoint)(errerror){ iflen(items)==0{ return } ts:=make([]*prompb.TimeSeries,len(items)) fori:=rangeitems{ ts[i],err=convertOne(&items[i]) iferr!=nil{ return } } data,err:=buildWriteRequest(ts) iferr!=nil{ return } err=c.remoteWritePost(data) return } funcNewClient(urstring,timeouttime.Duration)(c*HttpClient,errerror){ u,err:=url.Parse(ur) iferr!=nil{ return } c=&HttpClient{ url:u, Client:&http.Client{}, timeout:timeout, } return }
测试
prometheus启动的时候记得加参数–enable-feature=remote-write-receiver
packageprome import( "testing" "time" ) funcTestRemoteWrite(t*testing.T){ c,err:=NewClient("http://localhost:9090/api/v1/write",10*time.Second) iferr!=nil{ t.Fatal(err) } metrics:=[]MetricPoint{ {Metric:"opcai1", TagsMap:map[string]string{"env":"testing","op":"opcai"}, Time:time.Now().Add(-1*time.Minute).Unix(), Value:1}, {Metric:"opcai2", TagsMap:map[string]string{"env":"testing","op":"opcai"}, Time:time.Now().Add(-2*time.Minute).Unix(), Value:2}, {Metric:"opcai3", TagsMap:map[string]string{"env":"testing","op":"opcai"}, Time:time.Now().Unix(), Value:3}, {Metric:"opcai4", TagsMap:map[string]string{"env":"testing","op":"opcai"}, Time:time.Now().Unix(), Value:4}, } err=c.RemoteWrite(metrics) iferr!=nil{ t.Fatal(err) } t.Log("end...") }
使用go test进行测试
gotest-v
总结
这个方法也是在看夜莺v5的代码的时候发现的,刚好有需要统一收集redis的监控指标,刚好可以用上,之前用pushgateway写的实在是慢。
转载请注明:IT运维空间 » 运维技术 » 远程写入prometheus存储
发表评论