• 欢迎访问开心洋葱网站,在线教程,推荐使用最新版火狐浏览器和Chrome浏览器访问本网站,欢迎加入开心洋葱 QQ群
  • 为方便开心洋葱网用户,开心洋葱官网已经开启复制功能!
  • 欢迎访问开心洋葱网站,手机也能访问哦~欢迎加入开心洋葱多维思维学习平台 QQ群
  • 如果您觉得本站非常有看点,那么赶紧使用Ctrl+D 收藏开心洋葱吧~~~~~~~~~~~~~!
  • 由于近期流量激增,小站的ECS没能经的起亲们的访问,本站依然没有盈利,如果各位看如果觉着文字不错,还请看官给小站打个赏~~~~~~~~~~~~~!

java nginx监控服务程序调度算法实现

JAVA相关 水墨上仙 2837次浏览

监控服务程序实现调度算法
完成nginx服务监控(从nginx配置解析出对应的服务作为监控对象之五,还有可以从数据库里读出待监控的服务)与更新服务后的监控算法:
处理休眠队列———将所有的待监控服务记录放入一个优先级队列里(休眠队列,最小堆的数据结构,堆顶为绝对间隔时间最小的,优先执行),每次只需要检查堆顶就可以了,需要执行的放进执行队列里,删除的不加入执行队列
执行线程———将执行列里的记录抛给异步执行的池里,每一个都是异步调用运行
回收线程———-运行完成的请求回收休眠队列,不回收已删除的。
更新线程———定时加载新的数据,设置好绝对间隔时间,放入休眠队列
来源:http://blog.csdn.net/duck_genuine/article/details/8276169

package com.wole.monitor;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.eclipse.jetty.util.ConcurrentHashSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import com.wole.monitor.dao.ServiceDao;
import com.wole.servicemonitor.util.ServiceUtils;
/**
 * 管理并调度某一个服务数据源的监控池
 * @author yzygenuine
 *
 */
public class MonitorsManage {
	private final static Logger logger = LoggerFactory.getLogger(MonitorsManage.class);
	private ServiceDao dao;
	/**
	 * 执行的一个并发池
	 */
	private Executor commExecutor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 5, TimeUnit.SECONDS,
			new SynchronousQueue<Runnable>());
	/**
	 * 
	 */
	private CompletionService<Response> completionService = new ExecutorCompletionService<Response>(commExecutor);
	/**
	 * 正在执行中的MonitorService集合
	 */
	private ConcurrentHashSet<MonitorService> currentSet = new ConcurrentHashSet<MonitorService>();
	/**
	 * 等待优先级队列
	 */
	private Queue<MonitorService> sleepQueue = new PriorityBlockingQueue<MonitorService>();
	/**
	 * 执行队列
	 */
	private Queue<MonitorService> executeQueue = new LinkedBlockingQueue<MonitorService>();
	/**
	 * 是否关闭
	 */
	private AtomicBoolean isClose = new AtomicBoolean(false);
	/**
	 * 生产者启动时间
	 */
	private AtomicLong startTime = new AtomicLong(0);
	/**
	 * 相对于启动的间隔时间
	 */
	private AtomicLong intervalTime = new AtomicLong(0);
	public void close() {
		logger.info("closing................");
		isClose.compareAndSet(false, true);
	}
	
	public void init() {
		logger.info("初始化");
	}
	public void work() {
		logger.info("开始工作");
		// 生产者启动工作
		Thread productThread = new Thread(new ProductMonitor(1000));
		// 消费者启动工作
		Thread consumerThread = new Thread(new ConsumerMonitor(1000));
		// 回收者启动工作
		Thread recoverThread = new Thread(new RecoverMonitor(1000));
		// 启动定时加载数据工作
		Thread refreshThread = new Thread(new RefreshMonitorService(60000, dao));
		productThread.start();
		consumerThread.start();
		recoverThread.start();
		refreshThread.start();
	}
	/**
	 * 生产者
	 * 
	 * @author yzygenuine
	 * 
	 */
	class ProductMonitor implements Runnable {
		long sleepTime = 1000;
		public ProductMonitor(long sleepTime) {
			this.sleepTime = sleepTime;
		}
		@Override
		public void run() {
			logger.info("生产者开启工作");
			// 开始进行定时监控
			long now = System.currentTimeMillis();
			long lastTime = now;
			startTime.addAndGet(now);
			try {
				do {
					Thread.sleep(sleepTime);
					logger.debug("生产者休息{}ms", sleepTime);
					now = System.currentTimeMillis();
					intervalTime.addAndGet(now - lastTime);
					while (sleepQueue.size() > 0) {
						MonitorService service = sleepQueue.peek();
						if (service.getCurrentTime() - intervalTime.get() < 1) {
							service = sleepQueue.poll();// 出队并检查是否被删除,如果没被删除则进入执行队列
							if (!currentSet.contains(service)) {
								logger.info("service {} 已被删除,不加入执行队列了", service.toString());
								continue;
							}
							executeQueue.add(service);
						} else {
							logger.debug("还有{}秒可执行", service.getCurrentTime() - intervalTime.get());
							break;
						}
					}
					if (sleepQueue.size() <= 0) {
						logger.debug("生产队列为空");
					}
					lastTime = now;
				} while (!isClose.get());
			} catch (Exception e) {
				logger.error("", e);
			}
		}
	}
	/**
	 * 消费者
	 * 
	 * @author yzygenuine
	 * 
	 */
	class ConsumerMonitor implements Runnable {
		long sleepTime = 1000;
		public ConsumerMonitor(long sleepTime) {
			this.sleepTime = sleepTime;
			if (sleepTime < 1000) {
				throw new RuntimeException("请配置sleepTime值大一些");
			}
		}
		@Override
		public void run() {
			logger.info("消费者开启工作");
			try {
				do {
					Thread.sleep(sleepTime);
					logger.debug("消费者休息{}ms", sleepTime);
					while (executeQueue.size() > 0) {
						final MonitorService service = executeQueue.poll();
						completionService.submit(new ExecuteCallable(service));
					}
					logger.debug("消费队列为空");
				} while (!isClose.get());
			} catch (Exception e) {
				logger.error("", e);
			}
		}
	}
	/**
	 * 执行回调类
	 * 
	 * @author yzygenuine
	 * 
	 */
	class ExecuteCallable implements Callable<Response> {
		final MonitorService service;
		public ExecuteCallable(MonitorService service) {
			this.service = service;
		}
		@Override
		public Response call() throws Exception {
			logger.debug("执行");
			Map<String, String> r = new HashMap<String, String>();
			Response response = new Response();
			response.service = service;
			response.response = r;
			Monitor m = MonitorFactory.getMonitor(service);
			response.isNeedWarn = m.isNeedWarnging(service, r);
			if (response.isNeedWarn) {
				response.isSucToNotify = m.sendNotify(service, r);
			}
			return response;
		}
	}
	/**
	 * 回收者
	 * 
	 * @author yzygenuine
	 * 
	 */
	class RecoverMonitor implements Runnable {
		private long sleepTime = 1000;
		private long count = 0;
		public RecoverMonitor(long sleepTime) {
			this.sleepTime = sleepTime;
			if (sleepTime < 1000) {
				throw new RuntimeException("请配置sleepTime值大一些");
			}
		}
		@Override
		public void run() {
			logger.info("回收者开启工作");
			try {
				do {
					// Thread.sleep(sleepTime);
					Future<Response> response = completionService.take();
					// 重置后进入休眠队列
					MonitorService s = response.get().service;
					if (!currentSet.contains(s)) {
						logger.info("service {} 已被删除,不回收了", s.toString());
						continue;
					}
					// 当前程序已运动的时间+相对间隔时间=绝对的间隔时间
					s.setCurrentTime(s.getIntervalTime() + intervalTime.get());
					sleepQueue.add(s);
					count++;
					logger.info("回收,当前回收数量:" + count);
				} while (!isClose.get());
			} catch (Exception e) {
				logger.error("", e);
			}
		}
	}
	/**
	 * 加载新的数据
	 * 
	 * @author yzygenuine
	 * 
	 */
	class RefreshMonitorService implements Runnable {
		private long sleepTime = 1000;
		private ServiceDao dao;
		public RefreshMonitorService(long sleepTime, ServiceDao dao) {
			this.sleepTime = sleepTime;
			if (sleepTime < 60000) {
				logger.warn("刷新加载数据的间隔时间不能太短");
				throw new RuntimeException("刷新加载数据的间隔时间不能太短");
			}
			this.dao = dao;
		}
		private void firstLoad() {
			List<MonitorService> monitorService = dao.getService();
			logger.info("加载记录:" + monitorService.size());
			// 将被监控服务加入优先级队列里
			for (int j = 0; j < monitorService.size(); j++) {
				MonitorService service = monitorService.get(j);
				// 初始化好时间
				service.setCurrentTime(service.getIntervalTime() + intervalTime.get());
				currentSet.add(service);
				sleepQueue.add(service);
			}
		}
		@Override
		public void run() {
			logger.info("读取新的service开启工作");
			firstLoad();
			try {
				do {
					logger.info("定时加载新的数据监听者休息{}ms", sleepTime);
					Thread.sleep(sleepTime);
					logger.info("##########开始执行更新数据############");
					// 加载新的所有所数据 ,与当前的数据比较
					List<MonitorService> deleteList = dao.deleteService();
					List<MonitorService> addList = dao.incrementalService();
					logger.info("删除旧的数据共:{}", deleteList.size());
					currentSet.removeAll(deleteList);
					logger.info("增加新的数据共:{}", addList.size());
					currentSet.addAll(addList);
					logger.info("更新后的currentSet size:{}", currentSet.size());
					for (MonitorService service : addList) {
						// 初始化绝对间隔时间
						service.setCurrentTime(service.getIntervalTime() + intervalTime.get());
						sleepQueue.add(service);
					}
					logger.info("########这一轮更新结束");
				} while (!isClose.get());
			} catch (Exception e) {
				logger.error("", e);
			}
		}
	}
	/**
	 * 响应的封装类
	 * 
	 * @author yzygenuine
	 * 
	 */
	class Response {
		public Map<String, String> response;
		public MonitorService service;
		public boolean isNeedWarn;
		public boolean isSucToNotify;
	}
	public void setDao(ServiceDao dao) {
		this.dao = dao;
	}
}


开心洋葱 , 版权所有丨如未注明 , 均为原创丨未经授权请勿修改 , 转载请注明java nginx监控服务程序调度算法实现
喜欢 (0)
加载中……