Java并发 -- Fork/Join框架

 Fork/Join框架是Java7提供的一个用于并行执行任务的框架,是一个把大任务拆解成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架。可以利用所有可用的处理能力来增强应用程序的性能。与任何ExecutorService实现一样,Fork/Join框架将任务分配给线程池中的工作线程。不同之处在于,因为它使用工作窃取算法,处理完任务的工作线程可以从其他忙碌的线程中窃取任务。

注:在并行计算中,窃取工作(work-stealing)是多线程计算机程序的调度策略。它解决了在具有固定数量的处理器(或内核)的静态多线程计算机上执行动态多线程计算的问题,该计算可以产生新的执行线程。它在执行时间,内存使用和处理器间通信方面都非常有效。在工作窃取调度程序中,计算机系统中的每个处理器都有一个要执行的工作任务(计算任务,线程)队列。每个工作任务都包含一系列要顺序执行的指令,但是在其执行过程中,一个工作任务也可能产生新的工作任务,这些新工作任务可以与其他工作并行执行。这些新项目最初放在执行工作任务的处理器的队列中。当处理器的工作处理完毕时,它将查看其他处理器的队列并“窃取”其工作项。实际上,窃取工作会将调度工作分配给空闲的处理器,并且只要所有处理器都有工作要做,就不会发生调度开销。

单线程使用递归遍历maven仓库的文件和文件夹的数量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import java.io.File;

public class RecursiveStatistics {
private static int fileCount = 0;
private static int dirCount = 0;

public static void main(String[] args) {
long begin = System.currentTimeMillis();

File p = new File("E:\\Repository");
statistics(p);

long end = System.currentTimeMillis();
System.out.println("begin:" + begin);
System.out.println("end:" + end);

float spendTime = ((float) (end - begin)) / 1000;
System.out.println("用时" + spendTime + "秒");

System.out.println("文件数量是:" + fileCount);
System.out.println("文件夹数量是:" + dirCount);
}

private static void statistics(File path) {
String[] fileList = path.list();
for (String s : fileList) {
File f = new File(path + "//" + s);
if (f.isDirectory()) {
System.out.println("文件夹:" + f.getName());
dirCount++;
statistics(f);
} else if (f.isFile()) {
System.out.println("文件:" + f.getName());
fileCount++;
}
}
}

}

文件统计

运行结果

使用递归遍历maven仓库的文件和文件夹的数量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
import java.io.File;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;

public class ForkJoinStatistics {

public static void main(String[] args) {
long begin = System.currentTimeMillis();

File file = new File("E:\\Repository");

ForkJoinTask forkJoinTask = new ForkJoinTask(file);
ForkJoinPool forkJoinPool = new ForkJoinPool();
Map<String, Integer> result = (Map<String, Integer>) forkJoinPool.invoke(forkJoinTask);

long end = System.currentTimeMillis();
System.out.println("begin:" + begin);
System.out.println("end:" + end);

float spendTime = ((float) (end - begin)) / 1000;
System.out.println("用时" + spendTime + "秒");

System.out.println("文件夹数量是:" + result.get("dirCount"));
System.out.println("文件数量是:" + result.get("fileCount"));
}

}

//分治统计文件夹和文件数量
class ForkJoinTask extends RecursiveTask<Map> {
private File path;

public ForkJoinTask(File path) {
this.path = path;
}

@Override
protected Map<String, Integer> compute() {
int fileCount = 0;
int dirCount = 0;
Map<String, Integer> map = new ConcurrentHashMap<>();
map.put("fileCount", fileCount);
map.put("dirCount", dirCount);

List<ForkJoinTask> taskList = new LinkedList<>();

if (path.isDirectory()) {
System.out.println("文件夹:" + path.getName());

dirCount++;
map.put("dirCount", dirCount);

String[] fileList = path.list();

if (fileList != null && fileList.length != 0) {
for (String s : fileList) {
File f = new File(path + "//" + s);
ForkJoinTask task = new ForkJoinTask(f);
taskList.add(task);
task.fork();
}

for (ForkJoinTask task : taskList) {
Map<String, Integer> returnMap = task.join();
map.put("fileCount", map.get("fileCount") + returnMap.get("fileCount"));
map.put("dirCount", map.get("dirCount") + returnMap.get("dirCount"));
}

}
} else if (path.isFile()) {
System.out.println("文件:" + path.getName());
fileCount++;
map.put("fileCount", fileCount);
}

return map;
}
}

使用fork-join遍历速度明显提升