博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
【Java多线程】写入同一文件,自定义线程池与线程回收利用2
阅读量:6228 次
发布时间:2019-06-21

本文共 8234 字,大约阅读时间需要 27 分钟。

hot3.png

原始版地址:

起初为了方便快捷,只为实现功能,写了很多垃圾的代码. 造成性能不高,可读性,可维护性不强。

朋友们提了很多意见,我都吸取了经验,于是将代码又改动了一下。

经过测试,运行效率显著提升:

任务完成时间:30508 ms

任务完成时间:30735 ms

任务完成时间:31167 ms

package test.com.linapex.room;import java.io.BufferedReader;import java.io.BufferedWriter;import java.io.File;import java.io.FileInputStream;import java.io.FileOutputStream;import java.io.FileReader;import java.io.OutputStreamWriter;import java.nio.ByteBuffer;import java.nio.channels.FileChannel;import java.util.ArrayList;import java.util.Arrays;import java.util.List;import java.util.concurrent.Callable;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.Future;import java.util.concurrent.locks.ReentrantReadWriteLock;import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;import com.linapex.common.util.ZhengzeValidate;public class TBuilderRoomSqlFileTool2{	final static int BSIZE = 1024 * 1024;	final static int DATACACHENUM = 10000;	static int currThreadCount = 0;	static int maxThreadCount = 9;	static File roomFilterLogFile = new File("roomFilter.log");	static File sqlFile = new File("roomSql.sql");	static File csvFile = new File("D:\\baiduyundownload\\如家汉庭等酒店2000W开房数据\\2000W\\1-200W.csv");	final static String sqlStrTemplate = "INSERT INTO `t_room_record`(id ,name, card, gender, birthday, address, zip, mobile, email, version) VALUES (null,':0', ':1', ':2', ':3', ':4', ':5', ':6', ':7',':8');";	public static BufferedWriter initSQLWrite(File sqlFile) throws Exception	{		if (!sqlFile.exists())		{			if (!sqlFile.createNewFile())			{				System.err.println("创建文件失败,已存在:" + sqlFile.getAbsolutePath());			}		}		return new BufferedWriter(new OutputStreamWriter(new FileOutputStream(sqlFile, true), "UTF-8"));	}	public static void loadCSV(CallBack3
 callBack) throws Exception { // BufferedReader reader = null; // try // { // reader = new BufferedReader(new FileReader(csvFile)); // String str = null; // // int num = 0; // while ((str = reader.readLine()) != null) // { // num++; // callBack.call(num, str); // } // } finally // { // reader.close(); // } FileChannel inChannel = null; try { String enterStr = "\n"; inChannel = new FileInputStream(csvFile).getChannel(); ByteBuffer buffer = ByteBuffer.allocate(BSIZE); StringBuilder newlinesBui = new StringBuilder(); int num = 0; while (inChannel.read(buffer) != -1) { buffer.flip(); //数据组合. String content = new String(buffer.array()); newlinesBui.append(content).toString(); int fromIndex = 0; int endIndex = -1; //循环找到 \n while ((endIndex = newlinesBui.indexOf(enterStr, fromIndex)) > -1) { //得到一行 String line = newlinesBui.substring(fromIndex, endIndex); num++; callBack.call(num, line); fromIndex = endIndex + 1; } newlinesBui.delete(0, fromIndex); buffer.clear(); } } finally { if (inChannel != null) { inChannel.close(); } } } public static void main(String[] args) throws Exception { final ExecutorService threadPool = Executors.newFixedThreadPool(maxThreadCount); final List
> threadResultList = new ArrayList
>(); final BufferedWriter bw = initSQLWrite(sqlFile); //主要的buffer对象. final WriteSqlHandle2 writeSqlFile = new WriteSqlHandle2(DATACACHENUM); StopWatch2 stopWatch = new StopWatch2(); stopWatch.start(); loadCSV(new CallBack3
() { @Override public Void call(int num, String str) { String[] strs = str.split(","); if (strs.length < 8) { writeLog("此条数据不录入::0", Arrays.toString(strs)); return null; } String name = strs[0].trim(); if (!ZhengzeValidate.isChina(name)) { writeLog("此条数据不录入::0", Arrays.toString(strs)); return null; } try { String card = strs[4]; String gender = strs[5]; String birthday = strs[6]; String address = strs[7]; String zip = strs[8]; String mobile = strs[20]; String email = strs[22]; String version = strs[31]; //生成sql语句 final String tempSql = tm(sqlStrTemplate, name, card, gender, birthday, address, zip, mobile, email, version); //添加数据,如果超出了缓存数据,则 开始写入文件系统 if (writeSqlFile.add(tempSql)) { currThreadCount++; //如果提交的线程过多,则取回之后再提交. if (currThreadCount >= maxThreadCount) { // System.out.println(String.format("当前线程数:%s 允许最大线程数:%s 等待线程完成回调.", currThreadCount, maxThreadCount)); for (Future
 fs : threadResultList) { String tempSqlName = fs.get(); currThreadCount--; // System.out.println("已回调线程数:" + (maxThreadCount - currThreadCount) + "  线程返回的值:" + tempSqlName); } threadResultList.clear(); //清空 currThreadCount = threadResultList.size(); // System.out.println(String.format("重新开始提交线程   当前线程数:%s 允许最大线程数:%s 等待线程完成回调.", currThreadCount, maxThreadCount)); } Future
 future = threadPool.submit(new TaskWithResult(writeSqlFile, bw)); threadResultList.add(future); // System.out.println(String.format("开启了%s条线程(保存了%s条数据)", curr_thread_count, num)); } } catch (Exception e) { writeLog("录入错误的数据::0", Arrays.toString(strs)); writeLog("错误的原因::0", e.getMessage()); } return null; } }); writeSqlFile.flush(bw); threadPool.shutdown(); stopWatch.stop(); System.out.println(String.format("任务完成时间:%s ms", stopWatch.getTime())); } public static void writeLog(String str, Object... values) { //FileUtils.doWriteFile(roomFilterLogFile.getAbsolutePath(), tm(str, values) + "\r\n", null, false); } public static String tm(String strSource, Object... values) { if (strSource == null) { return null; } StringBuilder builder = new StringBuilder(strSource); final String prefix = ":"; for (int index = 0; index < values.length; index++) { String value = values[index].toString(); if (value == null) { continue; } String key = new StringBuilder(prefix).append(index).toString(); int i = -1; if ((i = builder.indexOf(key, i)) > -1) { int len = key.length(); builder.replace(i, i + len, value); } } return builder.toString(); }}class TaskWithResult implements Callable
{ WriteSqlHandle2 handle2; BufferedWriter bufferedWriter; public TaskWithResult(WriteSqlHandle2 handle2, BufferedWriter bufferedWriter) { this.handle2 = handle2; this.bufferedWriter = bufferedWriter; } @Override public String call() throws Exception { String fileName = Thread.currentThread().getName(); handle2.save(bufferedWriter); return fileName; }}class WriteSqlHandle2{ ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock(); WriteLock writeLock = readWriteLock.writeLock(); List
 cacheList; int currItemCount = 0; int dataCacheNum; public WriteSqlHandle2() { cacheList = new ArrayList
(); } public WriteSqlHandle2(int dataCacheNum) { this.dataCacheNum = dataCacheNum; cacheList = new ArrayList
(dataCacheNum); } public boolean isCacheExpires() { return currItemCount >= dataCacheNum; } public boolean add(String sqlStr) { try { writeLock.lock(); cacheList.add(sqlStr); currItemCount++; return isCacheExpires(); } finally { writeLock.unlock(); } } public void save(BufferedWriter bw) throws Exception { try { writeLock.lock(); //如果数据没有超出缓存.则返回. if (!isCacheExpires()) { return; } StopWatch2 stopWatch = new StopWatch2(); stopWatch.start(); // System.out.println(String.format("%s,准备消费   需要保存数据的集合长度:%s", Thread.currentThread().getName(), cacheList.size())); for (String str : cacheList) { bw.write(str + "\r\n"); currItemCount--; } stopWatch.stop(); System.out.println(String.format("%s,消费完成,耗费时间:%s ms,消费数据长度:%s", Thread.currentThread().getName(), stopWatch.getTime(), cacheList.size())); cacheList.clear(); //清空数据. } finally { writeLock.unlock(); } } public void flush(BufferedWriter bw) throws Exception { System.out.println(String.format("flush线程:%s, 需要保存数据的集合长度:%s", Thread.currentThread().getName(), cacheList.size())); for (String str : cacheList) { bw.write(str + "\r\n"); } System.out.println(String.format("flush线程:%s, 消费完成,消费数据长度:%s", Thread.currentThread().getName(), cacheList.size())); cacheList.clear(); //清空数据 closeWrite(bw); } private void closeWrite(BufferedWriter bw) throws Exception { bw.flush(); bw.close(); }}class StopWatch2{ long begin; long end; public void start() { begin = System.currentTimeMillis(); } public void stop() { end = System.currentTimeMillis(); } public long getTime() { return end - begin; }}interface CallBack3
{ T call(int num, String str);}

转载于:https://my.oschina.net/linapex/blog/195508

你可能感兴趣的文章
微服务架构的设计模式
查看>>
【C++】继承时构造函数和析构函数
查看>>
python风味之大杂烩
查看>>
NSDate & NSDateFormatter
查看>>
android 点击屏幕关闭 软键盘
查看>>
相似图片搜索的原理(转)
查看>>
钟南山:高收入群体往往老得快
查看>>
Linux Kernel(Android) 加密算法汇总(三)-应用程序调用内核加密算法接口
查看>>
开发中三个经典的原则
查看>>
logging日志管理-将日志写入文件
查看>>
Hibernate 、Hql查询和Criteria查询
查看>>
[saiku] 配置spring-security 允许 iframe加载saiku首页
查看>>
AJAX 页面数据传递
查看>>
滚动条滚动到底部触发事件
查看>>
『SharePoint 2010』Sharepoint 2010 Form 身份认证的实现(基于SQL)
查看>>
python之模块pydoc
查看>>
ASP.NET MVC 下拉列表使用小结
查看>>
nodejs基础 -- NPM 使用介绍
查看>>
Loadrunner中关联的作用:
查看>>
动态创建Fragment
查看>>