假设已有如下两个 function
•Output Compute(Input in)
•Output Merge(Output o1, Output o2)
要求在最多可以有 K 个 thread 的情况下实现
Output MergeAll(List<Input> inputs)
实际上是个 multi-thread programming 的问题
需要写出比较 detail 的代码
假设已有如下两个 function
•Output Compute(Input in)
•Output Merge(Output o1, Output o2)
要求在最多可以有 K 个 thread 的情况下实现
Output MergeAll(List<Input> inputs)
实际上是个 multi-thread programming 的问题
需要写出比较 detail 的代码
这个Merge是Merge之前两个Compute的结果?
对的
这个思路是对的
这个锁的粒度不够细
贴一下我的代码,基于Executors 线程池(newFixedThreadPool(K))和future实现 。
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.*;
abstract class Input {
abstract Output output();
}
abstract class Output {
abstract Output add(Output o);
}
public class MergeCompute<O extends Output, I extends Input> {
private ExecutorService executor;
private MergeCompute() {}
public MergeCompute(int k) {
executor = Executors.newFixedThreadPool(k);
}
public Output mergeAll(List<Input> inputs) {
List<Future<Output>> futures = new LinkedList<>();
for (Input input : inputs) {
futures.add(executor.submit(new ComputeTask(input)));
}
try {
while (futures.size() > 1) {
futures.add(executor.submit(
new MergeTask(futures.remove(0).get(),
futures.remove(0).get()))
);
}
return futures.get(0).get();
} catch (InterruptedException | ExecutionException e) {
}
return null;
}
}
class ComputeTask implements Callable {
private Input input;
public ComputeTask(Input input) {
this.input = input;
}
@Override
public Output call() {
return compute(this.input);
}
private static Output compute(Input in) {
return in.output();
}
}
class MergeTask implements Callable {
private Output output1;
private Output output2;
public MergeTask(Output output1, Output output2) {
this.output1 = output1;
this.output2 = output2;
}
@Override
public Output call() {
return merge(this.output1, this.output2);
}
private static Output merge(Output o1, Output o2) {
return o1.add(o2);
}
}
这个 class 是否有点多余?直接 lamda 可以搞定吧?
这个方法应该是 某个 utilitly 的 class 里,不应该是 MergeTask 的
总体思路完全正确,写的不错
Callable 的call()不带参数的,这里需要传一个input作为compute的参数,我不知道lamda怎么搞。
public interface Callable<V> {
V call() throws Exception;
}
大概是
for (Input input : inputs) {
final source = input;
futures.add(executor.submit(() -> compute(source)));
}
class MergeTask implements Callable<Output>
这里给下 generic 的 output
最好有个 close 的方法把 executor shutdown,资源需要回收
谢谢,这样的确要简洁很多。lamda我要好好学学
改了下你的代码
package linkedin;
import java.io.Closeable;
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.*;
abstract class Input {
abstract Output output();
}
abstract class Output {
abstract Output add(Output o);
}
public class MergeCompute<O extends Output, I extends Input> implements Closeable {
private ExecutorService executor;
public MergeCompute(int k) {
executor = Executors.newFixedThreadPool(k);
}
public Output mergeAll(List<Input> inputs) {
List<Future<Output>> futures = new LinkedList<>();
for (Input input : inputs) {
final Input source = input;
futures.add(executor.submit(() -> Util.compute(source)));
}
try {
while (futures.size() > 1) {
futures.add(executor.submit(new MergeTask(futures.remove(0).get(), futures.remove(0).get())));
}
return futures.get(0).get();
} catch (InterruptedException | ExecutionException e) {
}
return null;
}
@Override
public void close() throws IOException {
this.executor.shutdown();
}
}
class Util {
public static Output compute(Input in) {
return in.output();
}
public static Output merge(Output o1, Output o2) {
return o1.add(o2);
}
}
class MergeTask implements Callable<Output> {
private Output output1;
private Output output2;
public MergeTask(Output output1, Output output2) {
this.output1 = output1;
this.output2 = output2;
}
@Override
public Output call() {
return Util.merge(this.output1, this.output2);
}
}
这里有个比较严重的bug是 LinkedList 并不是 thread safe 的,凡是对 futures 的操作必须加锁
又看了一下,因为代码必须执行 remove 以后才能执行 add,所以应该没事
改进了一下code,用lamda 更简洁
public class MergeComputeWithLamda<O extends Output, I extends Input> implements Closeable {
private ExecutorService executor;
public MergeComputeWithLamda(int k) {
executor = Executors.newFixedThreadPool(k);
}
public Output mergeAll(List<Input> inputs) {
List<Future<Output>> futures = new LinkedList<>();
for (Input input : inputs) {
final Input source = input;
futures.add(executor.submit(() -> Util.compute(source)));
}
try {
while (futures.size() > 1) {
final Output o1 = futures.remove(0).get();
final Output o2 = futures.remove(0).get();
futures.add(executor.submit(() -> Util.merge(o1, o2)));
}
return futures.get(0).get();
} catch (InterruptedException | ExecutionException e) {
}
return null;
}
@Override
public void close() throws IOException {
this.executor.shutdown();
}
}
futures 是本地变量, 没有race condition, 不需要加锁