实现prom2click的类似功能,使用java实现。
先调查prometheus查询请求参数等。
http://49.4.48.241:32007/api/v1/query?query=go_memstats_frees_total%20[40d]&time=1580524260.813
http://49.4.48.241:32007/api/v1/query?query=go_memstats_frees_total{instance="pushgateway",job="pushgateway"}[40d]&time=1580524260.813
打印出请求头和参数:
header:host==192.168.2.133:31636
header:user-agent==Prometheus/2.14.0
header:content-length==107
header:accept-encoding==snappy
header:content-encoding==snappy
header:content-type==application/x-protobuf
header:x-prometheus-remote-read-version==0.1.0
url:http://192.168.2.133:31636/read
ip:172.16.0.252
startTime == 1577068260813
endTime == 1580524260813
matcher value==instance
matcher type value==0
matcher type==EQ
matcher value==pushgateway
matcher value==job
matcher type value==0
matcher type==EQ
matcher value==pushgateway
matcher value==__name__
matcher type value==0
matcher type==EQ
matcher value==go_memstats_frees_total
matcher value==instance
matcher type value==0
matcher type==EQ
matcher value==pushgateway
matcher value==job
matcher type value==0
matcher type==EQ
matcher value==pushgateway
matcher value==__name__
matcher type value==0
matcher type==EQ
matcher value==go_memstats_frees_total
使用了snappy压缩和protobuf序列化。所以需要使用proto生成java文件:
下载protoc-3.5.1-win32.zip
使用proto生成java文件:
protoc --java_out=. remote.proto
protoc --java_out=. rpc.proto
protoc --java_out=. types.proto
依赖以下文件:
protoc --java_out=. gogoproto\gogo.proto
protoc --java_out=. google\protobuf\timestamp.proto
protoc --java_out=. google\api\annotations.proto
protoc --java_out=. google\api\http.proto
默认生成了prometheus下Remote.java,Rpc.java,Types.java
com\google\protobuf\GoGoProtos.java
com\google\api\AnnotationsProto.java
com\google\api\CustomHttpPattern.java
com\google\api\CustomHttpPatternOrBuilder.java
com\google\api\Http.java
com\google\api\HttpOrBuilder.java
com\google\api\HttpProto.java
com\google\api\HttpRule.java
com\google\api\HttpRuleOrBuilder.java
修改prometheus配置文件:
remote_write: - url: "http://localhost:9201/write" remote_read: # - url: "http://localhost:9201/read" - url: "http://192.168.2.133:31636/read"
实现代码如下:
package com.chinasoft;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.sql.Array;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import javax.servlet.ServletOutputStream;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.springframework.stereotype.Controller;
import org.springframework.util.StringUtils;
import org.springframework.web.bind.annotation.RequestMapping;
import org.xerial.snappy.Snappy;
import org.xerial.snappy.SnappyInputStream;
import prometheus.Remote;
import prometheus.Remote.Query;
import prometheus.Remote.QueryResult;
import prometheus.Remote.ReadRequest;
import prometheus.Remote.ReadResponse;
import prometheus.Types.Label;
import prometheus.Types.LabelMatcher;
import prometheus.Types.Sample;
import prometheus.Types.TimeSeries;
import ru.yandex.clickhouse.ClickHouseDataSource;
import ru.yandex.clickhouse.settings.ClickHouseProperties;
@Controller
public class DBController {
@RequestMapping("/read")
public void readDB(HttpServletRequest request, HttpServletResponse response) {
printHeaders(request, response);
try {
ReadRequest readRequest = readParam(request);
List<Query> queryList = readRequest.getQueriesList();
for (Query query : queryList) {
System.out.println("startTime == " + query.getStartTimestampMs());
System.out.println("endTime == " + query.getEndTimestampMs());
String name = "";
for (LabelMatcher labelMatcher : query.getMatchersList()) {
name = labelMatcher.getValue();
System.out.println("matcher value==" + labelMatcher.getName());
System.out.println("matcher type value==" + labelMatcher.getTypeValue());
System.out.println("matcher type==" + labelMatcher.getType());
System.out.println("matcher value==" + labelMatcher.getValue());
}
// ReadResponse readResponse = queryBySql(name, query.getStartTimestampMs(), query.getEndTimestampMs());
ReadResponse readResponse = queryBySqlNew(query);
writeResponse(readResponse, response);
}
} catch (Exception e) {
e.printStackTrace();
}
}
private void writeResponse(ReadResponse readResponse, HttpServletResponse response) throws IOException {
byte[] datas = readResponse.toByteArray();
ServletOutputStream outStream = response.getOutputStream();
datas = Snappy.compress(datas);
response.setHeader("content-length", "" + datas.length);
outStream.write(datas);
outStream.flush();
outStream.close();
}
private ReadRequest readParam(HttpServletRequest request) throws IOException {
SnappyInputStream stream = new SnappyInputStream(request.getInputStream());
ByteArrayOutputStream baos = new ByteArrayOutputStream();
byte[] data = new byte[1024];
int len = 0;
while ((len = stream.read(data)) > -1) {
baos.write(data, 0, len);
}
stream.close();
ReadRequest readRequest = Remote.ReadRequest.parseFrom(baos.toByteArray());
return readRequest;
}
private void printHeaders(HttpServletRequest request, HttpServletResponse response) {
Enumeration<String> headers = request.getHeaderNames();
for (; headers.hasMoreElements();) {
String header = headers.nextElement();
System.out.println("header:" + header + "==" + request.getHeader(header));
response.setHeader(header, request.getHeader(header));
}
// Enumeration<String> attributeNames = request.getAttributeNames();
// for (; attributeNames.hasMoreElements();) {
// String attributeName = attributeNames.nextElement();
// System.out.println("attributeName:" + attributeName + "==" + request.getAttribute(attributeName));
// }
Map<String, String[]> parameterMap = request.getParameterMap();
for (Entry<String, String[]> entry : parameterMap.entrySet()) {
System.out.println("Parameters:" + entry.getKey() + ",value==" + Arrays.asList(entry.getValue()));
}
System.out.println("url:" + request.getRequestURL().toString());
System.out.println("ip:" + request.getRemoteHost());
}
private ReadResponse queryBySqlNew(Query query) {
ReadResponse readResponse = Remote.ReadResponse.newBuilder().build();
QueryResult queryResult = Remote.QueryResult.newBuilder().build();
readResponse.setResultsList(Arrays.asList(queryResult));
String sql = buildSql(query);
try {
Connection connection = getConn();
Statement statement = connection.createStatement();
System.out.println(sql);
ResultSet rs = statement.executeQuery(sql);
Map<String, TimeSeries> timeSerieMap = new HashMap<String, TimeSeries>();
while (rs.next()) {
String tagStrkey = rs.getString("tags");
TimeSeries timeSeries = null;
if (!timeSerieMap.containsKey(tagStrkey)) {
timeSeries = TimeSeries.newBuilder().build();
timeSerieMap.put(tagStrkey, timeSeries);
List<Sample> sampleList = new ArrayList<Sample>();
List<Label> labelList = new ArrayList<Label>();
Array array = rs.getArray("tags");
setLabelList(array, labelList);
timeSeries.setLabelsList(labelList);
timeSeries.setSamplesList(sampleList);
} else {
timeSeries = timeSerieMap.get(tagStrkey);
}
System.out.println(rs.getTimestamp(2).getTime());
System.out.println(rs.getDouble(1));
Sample sample = Sample.newBuilder().build();
timeSeries.getSamplesList()
.add(sample.setTimestamp(rs.getTimestamp("ts").getTime()).setValue(rs.getDouble("avg")));
}
queryResult.setTimeseriesList(new ArrayList<TimeSeries>(timeSerieMap.values()));
} catch (Exception e) {
e.printStackTrace();
}
return readResponse;
}
private void setLabelList(Array array, List<Label> labelList) {
try {
String[] arrays = (String[]) array.getArray();
for (int j = 1; j < arrays.length; j++) {
String str = arrays[j];
String[] strs = str.split("=");
labelList.add(Label.newBuilder().build().setName(strs[0]).setValue(strs[1]));
}
String[] first = arrays[0].split("=");
labelList.add(Label.newBuilder().build().setName(first[0]).setValue(first[1]));
} catch (SQLException e) {
e.printStackTrace();
}
}
private String buildSql(Query query) {
StringBuilder sb = new StringBuilder("select avg,ts,tags from metrics.samplesnew where");
sb.append(" ts>=toDateTime(");
sb.append(query.getStartTimestampMs() / 1000);
sb.append(") and ts<=toDateTime(");
sb.append(query.getEndTimestampMs() / 1000);
sb.append(")");
for (LabelMatcher labelMatcher : query.getMatchersList()) {
System.out.println("matcher value==" + labelMatcher.getName());
System.out.println("matcher type value==" + labelMatcher.getTypeValue());
System.out.println("matcher type==" + labelMatcher.getType());
System.out.println("matcher value==" + labelMatcher.getValue());
if ("__name__".equals(labelMatcher.getName())) {
appendNameWhere(labelMatcher, sb);
} else {
appendTagsWhere(labelMatcher, sb);
}
}
sb.append(" order by ts");
return sb.toString();
}
private void appendNameWhere(LabelMatcher labelMatcher, StringBuilder sb) {
String tagV = labelMatcher.getValue();
tagV = tagV.replaceAll("‘", "\\‘");
if (StringUtils.isEmpty(tagV)) {
return;
}
switch (labelMatcher.getTypeValue()) {
case LabelMatcher.Type.EQ_VALUE:
sb.append(" and name=‘");
sb.append(tagV);
sb.append("‘");
break;
case LabelMatcher.Type.NEQ_VALUE:
sb.append(" and name!=‘");
sb.append(tagV);
sb.append("‘");
break;
case LabelMatcher.Type.RE_VALUE:
sb.append(" and match(name, ‘");
sb.append(tagV.replaceAll("/", "\\/"));
sb.append("‘) = 1");
break;
case LabelMatcher.Type.NRE_VALUE:
sb.append(" and match(name, ‘");
sb.append(tagV.replaceAll("/", "\\/"));
sb.append("‘) = 0");
break;
default:
sb.append(" and name=‘");
sb.append(tagV);
sb.append("‘");
}
}
private void appendTagsWhere(LabelMatcher labelMatcher, StringBuilder sb) {
String tagK = labelMatcher.getName();
String tagV = labelMatcher.getValue();
tagV = tagV.replaceAll("‘", "\\‘");
if (StringUtils.isEmpty(tagV)) {
return;
}
// 等于 arrayExists(x -> x IN (%s), tags) = 1
// 正则匹配 arrayExists(x -> 1 == match(x, ‘^%s=%s‘),tags) = 1
switch (labelMatcher.getTypeValue()) {
case LabelMatcher.Type.EQ_VALUE:
sb.append(" and arrayExists(x -> x IN (‘");
sb.append(tagK);
sb.append("=");
sb.append(tagV);
sb.append("‘), tags) = 1");
break;
case LabelMatcher.Type.NEQ_VALUE:
sb.append(" and arrayExists(x -> x IN (‘");
sb.append(tagK);
sb.append("=");
sb.append(tagV);
sb.append("‘), tags) = 0");
break;
case LabelMatcher.Type.RE_VALUE:
sb.append(" and arrayExists(x -> 1 == match(x, ‘^");
sb.append(tagK);
sb.append("=");
sb.append(tagV.replaceAll("/", "\\/"));
sb.append("‘),tags) = 1");
break;
case LabelMatcher.Type.NRE_VALUE:
sb.append(" and arrayExists(x -> 1 == match(x, ‘^");
sb.append(tagK);
sb.append("=");
sb.append(tagV.replaceAll("/", "\\/"));
sb.append("‘),tags) = 0");
break;
default:
sb.append(" and arrayExists(x -> x IN (‘");
sb.append(tagK);
sb.append("=");
sb.append(tagV);
sb.append("‘), tags) = 1");
}
}
private ReadResponse queryBySql(String name, long start, long end) {
try {
Connection connection = getConn();
Statement statement = connection.createStatement();
String sql = "select avg,ts,tags from metrics.samplesnew where name=‘" + name
+ "‘ and toUnixTimestamp(ts)>=" + start / 1000 + " and toUnixTimestamp(ts)<=" + end / 1000
+ " order by ts";
ResultSet rs = statement.executeQuery(sql);
System.out.println(sql);
QueryResult queryResult = Remote.QueryResult.newBuilder().build();
TimeSeries timeSeries = TimeSeries.newBuilder().build();
queryResult.setTimeseriesList(Arrays.asList(timeSeries));
ReadResponse readResponse = Remote.ReadResponse.newBuilder().build();
readResponse.setResultsList(Arrays.asList(queryResult));
List<Sample> sampleList = new ArrayList<Sample>();
List<Label> labelList = new ArrayList<Label>();
timeSeries.setLabelsList(labelList);
timeSeries.setSamplesList(sampleList);
Array array = null;
while (rs.next()) {
System.out.println(rs.getTimestamp(2).getTime());
System.out.println(rs.getDouble(1));
Sample sample = Sample.newBuilder().build();
sampleList.add(sample.setTimestamp(rs.getTimestamp("ts").getTime()).setValue(rs.getDouble("avg")));
array = rs.getArray(3);
}
if (array != null) {
String[] arrays = (String[]) array.getArray();
int i = 0;
for (String str : arrays) {
if (i == 0) {
i++;
continue;
}
String[] strs = str.split("=");
labelList.add(Label.newBuilder().build().setName(strs[0]).setValue(strs[1]));
}
String[] first = arrays[0].split("=");
labelList.add(Label.newBuilder().build().setName(first[0]).setValue(first[1]));
}
return readResponse;
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
private static String clickhouseAddress = "jdbc:clickhouse://117.78.23.187:32123";
private static String clickhouseUsername = "root";
private static String clickhousePassword = "123456";
private static String clickhouseDB = "metrics";
private static Integer clickhouseSocketTimeout = 600000;
private ClickHouseDataSource clickHouseDataSource;
{
ClickHouseProperties properties = new ClickHouseProperties();
properties.setUser(clickhouseUsername);
properties.setPassword(clickhousePassword);
properties.setDatabase(clickhouseDB);
properties.setSocketTimeout(clickhouseSocketTimeout);
properties.setConnectionTimeout(60000);
clickHouseDataSource = new ClickHouseDataSource(clickhouseAddress, properties);
}
private Connection getConn() {
try {
return clickHouseDataSource.getConnection();
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
}
执行命令在环境上运行:
/CloudResetPwdUpdateAgent/depend/jre1.8.0_131/bin/java -jar agent-0.0.1-SNAPSHOT.jar
nohup方式
nohup /CloudResetPwdUpdateAgent/depend/jre1.8.0_131/bin/java -jar agent-0.0.1-SNAPSHOT.jar > agent.log 2>&1 &
在日志中打印出执行的sql语句:
select avg,ts,tags from metrics.samplesnew where ts>=toDateTime(1577068260) and ts<=toDateTime(1580524260) and arrayExists(x -> x IN (‘instance=pushgateway‘), tags) = 1 and arrayExists(x -> x IN (‘job=pushgateway‘), tags) = 1 and name=‘go_memstats_frees_total‘ order by ts
工程路径:
https://files.cnblogs.com/files/yaoyu1983/agent.zip
原文:https://www.cnblogs.com/yaoyu1983/p/12334842.html