如何實現LoggingMetricsConsumer將指標值輸出到metric.log日志文件

今天就跟大家聊聊有關如何實現 LoggingMetricsConsumer將指標值輸出到metric.log日志文件,可能很多人都不太了解,為了讓大家更加了解,小編給大家總結了以下內容,希望大家根據這篇文章可以有所收獲。

公司主營業務:成都做網站、成都網站制作、移動網站開發等業務。幫助企業客戶真正實現互聯網宣傳,提高企業的競爭能力。成都創新互聯是一支青春激揚、勤奮敬業、活力青春激揚、勤奮敬業、活力澎湃、和諧高效的團隊。公司秉承以“開放、自由、嚴謹、自律”為核心的企業文化,感謝他們對我們的高要求,感謝他們從不同領域給我們帶來的挑戰,讓我們激情的團隊有機會用頭腦與智慧不斷的給客戶帶來驚喜。成都創新互聯推出濂溪免費做網站回饋大家。

前提說明:

          storm從0.9.0開始,增加了指標統計框架,用來收集應用程序的特定指標,并將其輸出到外部系統。

           一般來說,您只需要去實現 LoggingMetricsConsumer,統計將指標值輸出到metric.log日志文件之中。

當然,您也可以自定義一個監聽的類:只需要去實現IMetricsConsumer接口就可以了。這些類可以在代碼里注冊(registerMetricsConsumer),也可以在 storm.yaml配置文件中注冊:

package com.digitalpebble.storm.crawler;

import backtype.storm.Config;
import backtype.storm.metric.MetricsConsumerBolt;
import backtype.storm.metric.api.IMetricsConsumer;
import backtype.storm.task.IErrorReporter;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;
import backtype.storm.utils.Utils;
import com.google.common.base.Joiner;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSortedMap;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.map.ObjectWriter;
import org.mortbay.jetty.Server;
import org.mortbay.jetty.servlet.Context;
import org.mortbay.jetty.servlet.ServletHolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;

/**
 * @author Enno Shioji (enno.shioji@peerindex.com)
 */
public class DebugMetricConsumer implements IMetricsConsumer {
	private static final Logger log = LoggerFactory
			.getLogger(DebugMetricConsumer.class);
	private IErrorReporter errorReporter;
	private Server server;

	// Make visible to servlet threads
	private volatile TopologyContext context;
	private volatile ConcurrentMap<String, Number> metrics;
	private volatile ConcurrentMap<String, Map<String, Object>> metrics_metadata;

	public void prepare(Map stormConf, Object registrationArgument,
			TopologyContext context, IErrorReporter errorReporter) {
		this.context = context;
		this.errorReporter = errorReporter;
		this.metrics = new ConcurrentHashMap<String, Number>();
		this.metrics_metadata = new ConcurrentHashMap<String, Map<String, Object>>();

		try {
			// TODO Config file not tested
			final String PORT_CONFIG_STRING = "topology.metrics.consumers.debug.servlet.port";
			Integer port = (Integer) stormConf.get(PORT_CONFIG_STRING);
			if (port == null) {
				log.warn("Metrics debug servlet's port not specified, defaulting to 7070. You can specify it via "
						+ PORT_CONFIG_STRING + " in storm.yaml");
				port = 7070;
			}
			server = startServlet(port);
		} catch (Exception e) {
			log.error("Failed to start metrics server", e);
			throw new AssertionError(e);
		}
	}

	private static final Joiner ON_COLONS = Joiner.on("::");

	public void handleDataPoints(TaskInfo taskInfo,
			Collection<DataPoint> dataPoints) {
		// In order
		String componentId = taskInfo.srcComponentId;
		Integer taskId = taskInfo.srcTaskId;
		Integer updateInterval = taskInfo.updateIntervalSecs;
		Long timestamp = taskInfo.timestamp;
		for (DataPoint point : dataPoints) {
			String metric_name = point.name;
			try {
				Map<String, Number> metric = (Map<String, Number>) point.value;
				for (Map.Entry<String, Number> entry : metric.entrySet()) {
					String metricId = ON_COLONS.join(componentId, taskId,
							metric_name, entry.getKey());
					Number val = entry.getValue();
					metrics.put(metricId, val);
					metrics_metadata.put(metricId, ImmutableMap
							.<String, Object> of("updateInterval",
									updateInterval, "lastreported", timestamp));
				}
			} catch (RuntimeException e) {
				// One can easily send something else than a Map<String,Number>
				// down the __metrics stream and make this part break.
				// If you ask me either the message should carry type
				// information or there should be different stream per message
				// type
				// This is one of the reasons why I want to write a further
				// abstraction on this facility
				errorReporter.reportError(e);
				metrics_metadata
						.putIfAbsent("ERROR_METRIC_CONSUMER_"
								+ e.getClass().getSimpleName(), ImmutableMap
								.of("offending_message_sample", point.value));
			}
		}
	}

	private static final ObjectMapper OM = new ObjectMapper();

	private Server startServlet(int serverPort) throws Exception {
		// Setup HTTP server
		Server server = new Server(serverPort);
		Context root = new Context(server, "/");
		server.start();

		HttpServlet servlet = new HttpServlet() {
			@Override
			protected void doGet(HttpServletRequest req,
					HttpServletResponse resp) throws ServletException,
					IOException {
				SortedMap<String, Number> metrics = ImmutableSortedMap
						.copyOf(DebugMetricConsumer.this.metrics);
				SortedMap<String, Map<String, Object>> metrics_metadata = ImmutableSortedMap
						.copyOf(DebugMetricConsumer.this.metrics_metadata);

				Map<String, Object> toplevel = ImmutableMap
						.of("retrieved",
								new Date(),

								// TODO this call fails with mysterious
								// exception
								// "java.lang.IllegalArgumentException: Could not find component common for __metrics"
								// Mailing list suggests it's a library version
								// issue but couldn't find anything suspicious
								// Need to eventually investigate
								// "sources",
								// context.getThisSources().toString(),

								"metrics", metrics, "metric_metadata",
								metrics_metadata);

				ObjectWriter prettyPrinter = OM
						.writerWithDefaultPrettyPrinter();
				prettyPrinter.writeValue(resp.getWriter(), toplevel);
			}
		};

		root.addServlet(new ServletHolder(servlet), "/metrics");

		log.info("Started metric server...");
		return server;

	}

	public void cleanup() {
		try {
			server.stop();
		} catch (Exception e) {
			throw new AssertionError(e);
		}
	}

}

看完上述內容,你們對如何實現 LoggingMetricsConsumer將指標值輸出到metric.log日志文件有進一步的了解嗎?如果還想了解更多知識或者相關內容,請關注創新互聯行業資訊頻道,感謝大家的支持。

網頁名稱:如何實現LoggingMetricsConsumer將指標值輸出到metric.log日志文件
標題網址:http://m.kartarina.com/article18/pgojdp.html

成都網站建設公司_創新互聯,為您提供網頁設計公司軟件開發App設計響應式網站ChatGPT虛擬主機

廣告

聲明:本網站發布的內容(圖片、視頻和文字)以用戶投稿、用戶轉載內容為主,如果涉及侵權請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網站立場,如需處理請聯系客服。電話:028-86922220;郵箱:631063699@qq.com。內容未經允許不得轉載,或轉載時需注明來源: 創新互聯

小程序開發
主站蜘蛛池模板: 无码日韩人妻av一区免费| 免费看无码特级毛片| 久久久久久国产精品无码超碰| 无码精品人妻一区二区三区免费看 | 中文AV人妻AV无码中文视频 | 亚洲爆乳大丰满无码专区| 日韩美无码五月天| 中文字幕无码亚洲欧洲日韩| 免费无遮挡无码视频网站| 无码一区二区三区老色鬼| HEYZO无码综合国产精品| 无码少妇A片一区二区三区| 免费无码VA一区二区三区| 亚洲精品高清无码视频| 一本大道久久东京热无码AV| 亚洲国产精品无码久久久秋霞1| 午夜人性色福利无码视频在线观看 | 日韩精品无码一区二区三区四区| 亚洲AV无码AV吞精久久| 无码人妻精品中文字幕免费东京热 | 最新无码A∨在线观看| 韩国精品一区二区三区无码视频 | 亚洲中文字幕无码爆乳| 无码国产精品一区二区免费式芒果| 亚洲高清无码综合性爱视频| 无码精品A∨在线观看无广告| 亚洲爆乳大丰满无码专区| 亚洲精品无码成人片久久不卡| 亚洲AV中文无码字幕色三 | 蜜芽亚洲av无码一区二区三区| 亚洲国产精品无码久久久| 亚洲gv猛男gv无码男同短文| 亚洲av无码乱码国产精品| 一本一道av中文字幕无码 | 亚洲a∨无码男人的天堂| 久久亚洲精品无码aⅴ大香| 精品人妻系列无码一区二区三区| 秋霞鲁丝片Av无码少妇| 中文字幕久久久人妻无码| a级毛片无码免费真人| 国产精品va在线观看无码|