为什么要改造XXL-JOB原有的日志文件生成体系
xxl-job原本自己的客户端日志文件生成策略是:一个日志记录就生成一个文件,也就是当数据库存在一条日志logId,对应的客户端就会生成一个文件,由于定时任务跑批很多,并且有些任务间隔时间很短,比如几秒触发一次,这样的结果就是客户端会生成大量的文件,但是每个文件的内容其实不多,但大量的单独文件相比会占用更多的磁盘,造成磁盘资源紧张,同时对文件系统性能存在影响,长久以来,就会触发资源报警,所以,如果不想经常去清理日志文件的话,那么将零碎的文件通过某种方式进行整合就显得迫切需要了。
本文篇幅较长,代码涉及较多啦~
改造后的日志文件生成策略
基本描述
减少日志文件个数,其实就是要将分散的日志文件进行合并,同时建立外部索引文件维护各自当次日志Id所对应的起始日志内容。在读取的时候先通过读取索引文件,进而再读取真实的日志内容。
下面是日志文件描述分析图:
对于一个执行器,在执行器下的所有的定时任务每天只生成一个logId_jobId_index.log的索引文件,用来维护日志Id与任务Id的对应关系;
对于一个定时任务,每天只生成一个jobId.log的日志内容文件,里面保存当天所有的日志内容;
对于日志索引文件jobId_index.log,用来维护日志内容中的索引,便于知道jobId.log中对应行数是属于哪个logId的,便于查找。
大致思路就是如此,可以预期,日志文件大大的减少,磁盘占用应该会得到改善.
笔者改造的这个功能已经在线上跑了快接近一年了,目前暂未出现问题,如有需要可结合自身业务进行相应的调整与改造,以及认真测试,以防未知错误。
代码实操
本文改造是基于XXL-JOB 1.8.2 版本改造,其他版本暂未实验
打开代码目录xxl-job-core模块,主要涉及以下几个文件的改动:
- XxlJobFileAppender.java
- XxlJobLogger.java
- JobThread.java
- ExecutorBizImpl.java
- LRUCacheUtil.java
XxlJobFileAppender.java
代码中部分原有未涉及改动的方法此处不再粘贴。
package com.xxl.job.core.log;
import com.xxl.job.core.biz.model.LogResult;
import com.xxl.job.core.util.LRUCacheUtil;
import org.apache.commons.io.FilenameUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.StringUtils;
import java.io.*;
import java.text.DecimalFormat;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.regex.Pattern;
/**
* store trigger log in each log-file
* @author xuxueli 2016-3-12 19:25:12
*/
public class XxlJobFileAppender {
private static Logger logger = LoggerFactory.getLogger(XxlJobFileAppender.class);
// for JobThread (support log for child thread of job handler)
//public static ThreadLocal<String> contextHolder = new ThreadLocal<String>();
public static final InheritableThreadLocal<String> contextHolder = new InheritableThreadLocal<String>();
// for logId,记录日志Id
public static final InheritableThreadLocal<Integer> contextHolderLogId = new InheritableThreadLocal<>();
// for JobId,定时任务的Id
public static final InheritableThreadLocal<Integer> contextHolderJobId = new InheritableThreadLocal<>();
// 使用一个缓存map集合来存取索引偏移量信息
public static final LRUCacheUtil<Integer, Map<String ,Long>> indexOffsetCacheMap = new LRUCacheUtil<>(80);
private static final String DATE_FOMATE = "yyyy-MM-dd";
private static final String UTF_8 = "utf-8";
// 文件名后缀
private static final String FILE_SUFFIX = ".log";
private static final String INDEX_SUFFIX = "_index";
private static final String LOGID_JOBID_INDEX_SUFFIX = "logId_jobId_index";
private static final String jobLogIndexKey = "jobLogIndexOffset";
private static final String indexOffsetKey = "indexOffset";
/**
* log base path
*
* strut like:
* ---/
* ---/gluesource/
* ---/gluesource/10_1514171108000.js
* ---/gluesource/10_1514171108000.js
* ---/2017-12-25/
* ---/2017-12-25/639.log
* ---/2017-12-25/821.log
*
*/
private static String logBasePath = "/data/applogs/xxl-job/jobhandler";
private static String glueSrcPath = logBasePath.concat("/gluesource");
public static void initLogPath(String logPath){
// init
if (logPath!=null && logPath.trim().length()>0) {
logBasePath = logPath;
}
// mk base dir
File logPathDir = new File(logBasePath);
if (!logPathDir.exists()) {
logPathDir.mkdirs();
}
logBasePath = logPathDir.getPath();
// mk glue dir
File glueBaseDir = new File(logPathDir, "gluesource");
if (!glueBaseDir.exists()) {
glueBaseDir.mkdirs();
}
glueSrcPath = glueBaseDir.getPath();
}
public static String getLogPath() {
return logBasePath;
}
public static String getGlueSrcPath() {
return glueSrcPath;
}
/**
* 重写生成日志目录和日志文件名的方法:
* log filename,like "logPath/yyyy-MM-dd/jobId.log"
* @param triggerDate
* @param jobId
* @return
*/
public static String makeLogFileNameByJobId(Date triggerDate, int jobId) {
// filePath/yyyy-MM-dd
// avoid concurrent problem, can not be static
SimpleDateFormat sdf = new SimpleDateFormat(DATE_FOMATE);
File logFilePath = new File(getLogPath(), sdf.format(triggerDate));
if (!logFilePath.exists()) {
logFilePath.mkdir();
}
// 生成日志索引文件
String logIndexFileName = logFilePath.getPath()
.concat("/")
.concat(String.valueOf(jobId))
.concat(INDEX_SUFFIX)
.concat(FILE_SUFFIX);
File logIndexFilePath = new File(logIndexFileName);
if (!logIndexFilePath.exists()) {
try {
logIndexFilePath.createNewFile();
logger.debug("生成日志索引文件,文件路径:{}", logIndexFilePath);
} catch (IOException e) {
logger.error(e.getMessage(), e);
}
}
// 在yyyy-MM-dd文件夹下生成当天的logId对应的jobId的全局索引,减少对后管的修改
String logIdJobIdIndexFileName = logFilePath.getPath()
.concat("/")
.concat(LOGID_JOBID_INDEX_SUFFIX)
.concat(FILE_SUFFIX);
File logIdJobIdIndexFileNamePath = new File(logIdJobIdIndexFileName);
if (!logIdJobIdIndexFileNamePath.exists()) {
try {
logIdJobIdIndexFileNamePath.createNewFile();
logger.debug("生成logId与jobId的索引文件,文件路径:{}", logIdJobIdIndexFileNamePath);
} catch (IOException e) {
logger.error(e.getMessage(), e);
}
}
// filePath/yyyy-MM-dd/jobId.log log日志文件
String logFileName = logFilePath.getPath()
.concat("/")
.concat(String.valueOf(jobId))
.concat(FILE_SUFFIX);
return logFileName;
}
/**
* 后管平台读取详细日志查看,生成文件名
* admin read log, generate logFileName bu logId
* @param triggerDate
* @param logId
* @return
*/
public static String makeFileNameForReadLog(Date triggerDate, int logId) {
// filePath/yyyy-MM-dd
SimpleDateFormat sdf = new SimpleDateFormat(DATE_FOMATE);
File logFilePath = new File(getLogPath(), sdf.format(triggerDate));
if (!logFilePath.exists()) {
logFilePath.mkdir();
}
String logIdJobIdFileName = logFilePath.getPath().concat("/")
.concat(LOGID_JOBID_INDEX_SUFFIX)
.concat(FILE_SUFFIX);
// find logId->jobId mapping
// 获取索引映射
String infoLine = readIndex(logIdJobIdFileName, logId);
String[] arr = infoLine.split("->");
int jobId = 0;
try {
jobId = Integer.parseInt(arr[1]);
} catch (Exception e) {
logger.error("makeFileNameForReadLog StringArrayException,{},{}", e.getMessage(), e);
throw new RuntimeException("StringArrayException");
}
String logFileName = logFilePath.getPath().concat("/")
.concat(String.valueOf(jobId)).concat(FILE_SUFFIX);
return logFileName;
}
/**
* 向日志文件中追加内容,向索引文件中追加索引
* append log
* @param logFileName
* @param appendLog
*/
public static void appendLogAndIndex(String logFileName, String appendLog) {
// log file
if (logFileName == null || logFileName.trim().length() == 0) {
return;
}
File logFile = new File(logFileName);
if (!logFile.exists()) {
try {
logFile.createNewFile();
} catch (Exception e) {
logger.error(e.getMessage(), e);
return;
}
}
// start append, count line num
long startLineNum = countFileLineNum(logFileName);
logger.debug("开始追加日志文件,开始行数:{}", startLineNum);
// log
if (appendLog == null) {
appendLog = "";
}
appendLog += "\r\n";
// append file content
try {
FileOutputStream fos = null;
try {
fos = new FileOutputStream(logFile, true);
fos.write(appendLog.getBytes("utf-8"));
fos.flush();
} finally {
if (fos != null) {
try {
fos.close();
} catch (IOException e) {
logger.error(e.getMessage(), e);
}
}
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
// end append, count line num,再次计数
long endLineNum = countFileLineNum(logFileName);
Long lengthTmp = endLineNum - startLineNum;
int length = 0;
try {
length = lengthTmp.intValue();
} catch (Exception e) {
logger.error("Long to int Exception", e);
}
logger.debug("结束追加日志文件,结束行数:{}, 长度:{}", endLineNum, length);
Map<String, Long> indexOffsetMap = new HashMap<>();
appendIndexLog(logFileName, startLineNum, length, indexOffsetMap);
appendLogIdJobIdFile(logFileName, indexOffsetMap);
}
/**
* 创建日志Id与JobId的映射关系
* @param logFileName
* @param indexOffsetMap
*/
public static void appendLogIdJobIdFile(String logFileName, Map indexOffsetMap) {
// 获取ThreadLocal中变量保存的值
int logId = XxlJobFileAppender.contextHolderLogId.get();
int jobId = XxlJobFileAppender.contextHolderJobId.get();
File file = new File(logFileName);
// 获取父级目录,寻找同文件夹下的索引文件
String parentDirName = file.getParent();
// logId_jobId_index fileName
String logIdJobIdIndexFileName = parentDirName.concat("/")
.concat(LOGID_JOBID_INDEX_SUFFIX)
.concat(FILE_SUFFIX);
// 从缓存中获取logId
boolean jobLogIndexOffsetExist = indexOffsetCacheMap.exists(logId);
Long jobLogIndexOffset = null;
if (jobLogIndexOffsetExist) {
jobLogIndexOffset = indexOffsetCacheMap.get(logId).get(jobLogIndexKey);
}
if (jobLogIndexOffset == null) {
// 为空则添加
StringBuffer stringBuffer = new StringBuffer();
stringBuffer.append(logId).append("->").append(jobId).append("\r\n");
Long currentPoint = getAfterAppendIndexLog(logIdJobIdIndexFileName, stringBuffer.toString());
indexOffsetMap.put(jobLogIndexKey, currentPoint);
indexOffsetCacheMap.save(logId, indexOffsetMap);
}
// 不为空说明缓存已经存在,不做其他处理
}
/**
* 追加索引文件内容,返回偏移量
* @param fileName
* @param content
* @return
*/
private static Long getAfterAppendIndexLog(String fileName, String content) {
RandomAccessFile raf = null;
Long point = null;
try {
raf = new RandomAccessFile(fileName, "rw");
long end = raf.length();
// 因为是追加内容,所以将指针放入到文件的末尾
raf.seek(end);
raf.writeBytes(content);
// 获取当前的指针偏移量
/**
* 偏移量放入到缓存变量中:注意,此处获取偏移量,是获取开始的地方的偏移量,不能再追加内容后再获取,否则会获取到结束
* 时的偏移量
*/
point = end;
} catch (IOException e) {
logger.error(e.getMessage(), e);
} finally {
try {
raf.close();
} catch (IOException e) {
logger.error(e.getMessage(), e);
}
}
return point;
}
/**
* 追加索引日志,like "345->(577,10)"
* @param logFileName
* @param from
* @param length
* @param indexOffsetMap
*/
public static void appendIndexLog(String logFileName, Long from, int length, Map indexOffsetMap) {
int strLength = logFileName.length();
// 通过截取获取索引文件名
String prefixFilePath = logFileName.substring(0, strLength - 4);
String logIndexFilePath = prefixFilePath.concat(INDEX_SUFFIX).concat(FILE_SUFFIX);
File logIndexFile = new File(logIndexFilePath);
if (!logIndexFile.exists()) {
try {
logIndexFile.createNewFile();
} catch (IOException e) {
logger.error(e.getMessage(), e);
return;
}
}
int logId = XxlJobFileAppender.contextHolderLogId.get();
StringBuffer stringBuffer = new StringBuffer();
// 判断是添加还是修改
boolean indexOffsetExist = indexOffsetCacheMap.exists(logId);
Long indexOffset = null;
if (indexOffsetExist) {
indexOffset = indexOffsetCacheMap.get(logId).get(indexOffsetKey);
}
if (indexOffset == null) {
// append
String lengthStr = getFormatNum(length);
stringBuffer.append(logId).append("->(")
.append(from).append(",").append(lengthStr).append(")\r\n");
// 添加新的索引,记录偏移量
Long currentIndexPoint = getAfterAppendIndexLog(logIndexFilePath, stringBuffer.toString());
indexOffsetMap.put(indexOffsetKey, currentIndexPoint);
} else {
String infoLine = getIndexLineIsExist(logIndexFilePath, logId);
// 修改索引文件内容
int startTmp = infoLine.indexOf("(");
int endTmp = infoLine.indexOf(")");
String[] lengthTmp = infoLine.substring(startTmp + 1, endTmp).split(",");
int lengthTmpInt = 0;
try {
lengthTmpInt = Integer.parseInt(lengthTmp[1]);
from = Long.valueOf(lengthTmp[0]);
} catch (Exception e) {
logger.error("appendIndexLog StringArrayException,{},{}", e.getMessage(), e);
throw new RuntimeException("StringArrayException");
}
int modifyLength = length + lengthTmpInt;
String lengthStr2 = getFormatNum(modifyLength);
stringBuffer.append(logId).append("->(")
.append(from).append(",").append(lengthStr2).append(")\r\n");
modifyIndexFileContent(logIndexFilePath, infoLine, stringBuffer.toString());
}
}
/**
* handle getFormatNum
* like 5 to 005
* @return
*/
private static String getFormatNum(int num) {
DecimalFormat df = new DecimalFormat("000");
String str1 = df.format(num);
return str1;
}
/**
* 查询索引是否存在
* @param filePath
* @param logId
* @return
*/
private static String getIndexLineIsExist(String filePath, int logId) {
// 读取索引问价判断是否存在,日志每生成一行就会调用一次,所以索引文件需要将同一个logId对应的进行合并
String prefix = logId + "->";
Pattern pattern = Pattern.compile(prefix + ".*?");
String indexInfoLine = "";
RandomAccessFile raf = null;
try {
raf = new RandomAccessFile(filePath, "rw");
String tmpLine = null;
// 偏移量
boolean indexOffsetExist = indexOffsetCacheMap.exists(logId);
Long cachePoint = null;
if (indexOffsetExist) {
cachePoint = indexOffsetCacheMap.get(logId).get(indexOffsetKey);
}
if (null == cachePoint) {
cachePoint = Long.valueOf(0);
}
raf.seek(cachePoint);
while ((tmpLine = raf.readLine()) != null) {
final long point = raf.getFilePointer();
boolean matchFlag = pattern.matcher(tmpLine).find();
if (matchFlag) {
indexInfoLine = tmpLine;
break;
}
cachePoint = point;
}
} catch (IOException e) {
logger.error(e.getMessage(), e);
} finally {
try {
raf.close();
} catch (IOException e) {
logger.error(e.getMessage(), e);
}
}
return indexInfoLine;
}
/**
* 在后管页面上需要查询执行日志的时候,获取索引信息,
* 此处不能与上个通用,因为读取时没有map存取偏移量信息,相对隔离
* @param filePath
* @param logId
* @return
*/
private static String readIndex(String filePath, int logId) {
filePath = FilenameUtils.normalize(filePath);
String prefix = logId + "->";
Pattern pattern = Pattern.compile(prefix + ".*?");
String indexInfoLine = "";
BufferedReader bufferedReader = null;
try {
bufferedReader = new BufferedReader(new FileReader(filePath));
String tmpLine = null;
while ((tmpLine = bufferedReader.readLine()) != null) {
boolean matchFlag = pattern.matcher(tmpLine).find();
if (matchFlag) {
indexInfoLine = tmpLine;
break;
}
}
bufferedReader.close();
} catch (IOException e) {
logger.error(e.getMessage(), e);
} finally {
if (bufferedReader != null) {
try {
bufferedReader.close();
} catch (IOException e) {
logger.error(e.getMessage(), e);
}
}
}
return indexInfoLine;
}
/**
* 修改 logIndexFile 内容
* @param indexFileName
* @param oldContent
* @param newContent
* @return
*/
private static boolean modifyIndexFileContent(String indexFileName, String oldContent, String newContent) {
RandomAccessFile raf = null;
int logId = contextHolderLogId.get();
try {
raf = new RandomAccessFile(indexFileName, "rw");
String tmpLine = null;
// 偏移量
boolean indexOffsetExist = indexOffsetCacheMap.exists(logId);
Long cachePoint = null;
if (indexOffsetExist) {
cachePoint = indexOffsetCacheMap.get(logId).get(indexOffsetKey);
}
if (null == cachePoint) {
cachePoint = Long.valueOf(0);
}
raf.seek(cachePoint);
while ((tmpLine = raf.readLine()) != null) {
final long point = raf.getFilePointer();
if (tmpLine.contains(oldContent)) {
String str = tmpLine.replace(oldContent, newContent);
raf.seek(cachePoint);
raf.writeBytes(str);
}
cachePoint = point;
}
} catch (IOException e) {
logger.error(e.getMessage(), e);
} finally {
try {
raf.close();
} catch (IOException e) {
logger.error(e.getMessage(), e);
}
}
return true;
}
/**
* 统计文件内容行数
* @param logFileName
* @return
*/
private static long countFileLineNum(String logFileName) {
File file = new File(logFileName);
if (file.exists()) {
try {
FileReader fileReader = new FileReader(file);
LineNumberReader lineNumberReader = new LineNumberReader(fileReader);
lineNumberReader.skip(Long.MAX_VALUE);
// getLineNumber() 从0开始计数,所以加1
long totalLines = lineNumberReader.getLineNumber() + 1;
fileReader.close();
lineNumberReader.close();
return totalLines;
} catch (IOException e) {
logger.error(e.getMessage(), e);
}
}
return 0;
}
/**
* 重写读取日志:1. 读取logIndexFile;2.logFile
* @param logFileName
* @param logId
* @param fromLineNum
* @return
*/
public static LogResult readLogByIndex(String logFileName, int logId, int fromLineNum) {
int strLength = logFileName.length();
// 获取文件名前缀出去.log
String prefixFilePath = logFileName.substring(0, strLength-4);
String logIndexFilePath = prefixFilePath.concat(INDEX_SUFFIX).concat(FILE_SUFFIX);
// valid logIndex file
if (StringUtils.isEmpty(logIndexFilePath)) {
return new LogResult(fromLineNum, 0, "readLogByIndex fail, logIndexFile not found", true);
}
logIndexFilePath = FilenameUtils.normalize(logIndexFilePath);
File logIndexFile = new File(logIndexFilePath);
if (!logIndexFile.exists()) {
return new LogResult(fromLineNum, 0, "readLogByIndex fail, logIndexFile not exists", true);
}
// valid log file
if (StringUtils.isEmpty(logFileName)) {
return new LogResult(fromLineNum, 0, "readLogByIndex fail, logFile not found", true);
}
logFileName = FilenameUtils.normalize(logFileName);
File logFile = new File(logFileName);
if (!logFile.exists()) {
return new LogResult(fromLineNum, 0, "readLogByIndex fail, logFile not exists", true);
}
// read logIndexFile
String indexInfo = readIndex(logIndexFilePath, logId);
int startNum = 0;
int endNum = 0;
if (!StringUtils.isEmpty(indexInfo)) {
int startTmp = indexInfo.indexOf("(");
int endTmp = indexInfo.indexOf(")");
String[] fromAndTo = indexInfo.substring(startTmp + 1, endTmp).split(",");
try {
startNum = Integer.parseInt(fromAndTo[0]);
endNum = Integer.parseInt(fromAndTo[1]) + startNum;
} catch (Exception e) {
logger.error("readLogByIndex StringArrayException,{},{}", e.getMessage(), e);
throw new RuntimeException("StringArrayException");
}
}
// read File
StringBuffer logContentBuffer = new StringBuffer();
int toLineNum = 0;
LineNumberReader reader = null;
try {
reader = new LineNumberReader(new InputStreamReader(new FileInputStream(logFile), UTF_8));
String line = null;
while ((line = reader.readLine()) != null) {
// [from, to], start as fromNum(logIndexFile)
toLineNum = reader.getLineNumber();
if (toLineNum >= startNum && toLineNum < endNum) {
logContentBuffer.append(line).append("\n");
}
// break when read over
if (toLineNum >= endNum) {
break;
}
}
} catch (IOException e) {
logger.error(e.getMessage(), e);
} finally {
if (reader != null) {
try {
reader.close();
} catch (IOException e) {
logger.error(e.getMessage(), e);
}
}
}
LogResult logResult = new LogResult(fromLineNum, toLineNum, logContentBuffer.toString(), false);
return logResult;
}
}
XxlJobLogger.java
package com.xxl.job.core.log;
import com.xxl.job.core.util.DateUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.helpers.FormattingTuple;
import org.slf4j.helpers.MessageFormatter;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.Date;
/**
* Created by xuxueli on 17/4/28.
*/
public class XxlJobLogger {
private static Logger logger = LoggerFactory.getLogger("xxl-job logger");
/**
* append log
*
* @param callInfo
* @param appendLog
*/
private static void logDetail(StackTraceElement callInfo, String appendLog) {
/*// "yyyy-MM-dd HH:mm:ss [ClassName]-[MethodName]-[LineNumber]-[ThreadName] log";
StackTraceElement[] stackTraceElements = new Throwable().getStackTrace();
StackTraceElement callInfo = stackTraceElements[1];*/
StringBuffer stringBuffer = new StringBuffer();
stringBuffer.append(DateUtil.formatDateTime(new Date())).append(" ")
.append("["+ callInfo.getClassName() + "#" + callInfo.getMethodName() +"]").append("-")
.append("["+ callInfo.getLineNumber() +"]").append("-")
.append("["+ Thread.currentThread().getName() +"]").append(" ")
.append(appendLog!=null?appendLog:"");
String formatAppendLog = stringBuffer.toString();
// appendlog
String logFileName = XxlJobFileAppender.contextHolder.get();
if (logFileName!=null && logFileName.trim().length()>0) {
// XxlJobFileAppender.appendLog(logFileName, formatAppendLog);
// 此处修改方法调用
// modify appendLogAndIndex for addIndexLogInfo
XxlJobFileAppender.appendLogAndIndex(logFileName, formatAppendLog);
} else {
logger.info(">>>>>>>>>>> {}", formatAppendLog);
}
}
}
JobThread.java
@Override
public void run() {
......
// execute
while(!toStop){
running = false;
idleTimes++;
TriggerParam triggerParam = null;
ReturnT<String> executeResult = null;
try {
// to check toStop signal, we need cycle, so wo cannot use queue.take(), instand of poll(timeout)
triggerParam = triggerQueue.poll(3L, TimeUnit.SECONDS);
if (triggerParam!=null) {
running = true;
idleTimes = 0;
triggerLogIdSet.remove(triggerParam.getLogId());
// log filename, like "logPath/yyyy-MM-dd/9999.log"
// String logFileName = XxlJobFileAppender.makeLogFileName(new Date(triggerParam.getLogDateTim()), triggerParam.getLogId());
// modify 将生成的日志文件重新命名,以jobId命名
String logFileName = XxlJobFileAppender.makeLogFileNameByJobId(new Date(triggerParam.getLogDateTim()), triggerParam.getJobId());
XxlJobFileAppender.contextHolderJobId.set(triggerParam.getJobId());
// 此处根据xxl-job版本号做修改
XxlJobFileAppender.contextHolderLogId.set(Integer.parseInt(String.valueOf(triggerParam.getLogId())));
XxlJobFileAppender.contextHolder.set(logFileName);
ShardingUtil.setShardingVo(new ShardingUtil.ShardingVO(triggerParam.getBroadcastIndex(), triggerParam.getBroadcastTotal()));
......
}
ExecutorBizImpl.java
package com.xxl.job.core.biz.impl;
import com.xxl.job.core.biz.ExecutorBiz;
import com.xxl.job.core.biz.model.LogResult;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.biz.model.TriggerParam;
import com.xxl.job.core.enums.ExecutorBlockStrategyEnum;
import com.xxl.job.core.executor.XxlJobExecutor;
import com.xxl.job.core.glue.GlueFactory;
import com.xxl.job.core.glue.GlueTypeEnum;
import com.xxl.job.core.handler.IJobHandler;
import com.xxl.job.core.handler.impl.GlueJobHandler;
import com.xxl.job.core.handler.impl.ScriptJobHandler;
import com.xxl.job.core.log.XxlJobFileAppender;
import com.xxl.job.core.thread.JobThread;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Date;
/**
* Created by xuxueli on 17/3/1.
*/
public class ExecutorBizImpl implements ExecutorBiz {
private static Logger logger = LoggerFactory.getLogger(ExecutorBizImpl.class);
/**
* 重写读取日志方法
* @param logDateTim
* @param logId
* @param fromLineNum
* @return
*/
@Override
public ReturnT<LogResult> log(long logDateTim, long logId, int fromLineNum) {
// log filename: logPath/yyyy-MM-dd/9999.log
String logFileName = XxlJobFileAppender.makeFileNameForReadLog(new Date(logDateTim), (int)logId);
LogResult logResult = XxlJobFileAppender.readLogByIndex(logFileName, Integer.parseInt(String.valueOf(logId)), fromLineNum);
return new ReturnT<LogResult>(logResult);
}
}
LRUCacheUtil.java
通过LinkedHashMap实现一个缓存容器
package com.xxl.job.core.util;
import java.util.LinkedHashMap;
import java.util.Map;
/**
* @Author: liangxuanhao
* @Description: 使用LinkedHashMap实现一个固定大小的缓存器
* @Date:
*/
public class LRUCacheUtil<K, V> extends LinkedHashMap<K, V> {
// 最大缓存容量
private static final int CACHE_MAX_SIZE = 100;
private int limit;
public LRUCacheUtil() {
this(CACHE_MAX_SIZE);
}
public LRUCacheUtil(int cacheSize) {
// true表示更新到末尾
super(cacheSize, 0.75f, true);
this.limit = cacheSize;
}
/**
* 加锁同步,防止多线程时出现多线程安全问题
*/
public synchronized V save(K key, V val) {
return put(key, val);
}
public V getOne(K key) {
return get(key);
}
public boolean exists(K key) {
return containsKey(key);
}
/**
* 判断是否超限
* @param elsest
* @return 超限返回true,否则返回false
*/
@Override
protected boolean removeEldestEntry(Map.Entry elsest) {
// 在put或者putAll方法后调用,超出容量限制,按照LRU最近最少未使用进行删除
return size() > limit;
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
for (Map.Entry<K, V> entry : entrySet()) {
sb.append(String.format("%s:%s ", entry.getKey(), entry.getValue()));
}
return sb.toString();
}
}
结果
执行效果如图所示:达到预期期望,且效果良好。