领英多线程题目

假设已有如下两个 function

•Output Compute(Input in)

•Output Merge(Output o1, Output o2)

要求在最多可以有 K 个 thread 的情况下实现

Output MergeAll(List<Input> inputs)

实际上是个 multi-thread programming 的问题

需要写出比较 detail 的代码

这个Merge是Merge之前两个Compute的结果?

对的

shared resources 是List inputs的 话直接用 Collections.synchronizedList() ?

其他写法和 merge k sorted list 类似?

这个思路是对的

这个锁的粒度不够细

贴一下我的代码,基于Executors 线程池(newFixedThreadPool(K))和future实现 。

import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.*;

abstract class Input {
    abstract Output output();
}
abstract class Output {
    abstract Output add(Output o);
}
public class MergeCompute<O extends Output, I extends Input> {
    private ExecutorService executor;
    private MergeCompute() {}
    public MergeCompute(int k) {
        executor = Executors.newFixedThreadPool(k);
    }

    public Output mergeAll(List<Input> inputs) {
        List<Future<Output>> futures = new LinkedList<>();
        for (Input input : inputs) {
            futures.add(executor.submit(new ComputeTask(input)));
        }
        try {
            while (futures.size() > 1) {
                futures.add(executor.submit(
                        new MergeTask(futures.remove(0).get(),
                                futures.remove(0).get()))
                );
            }
            return futures.get(0).get();
        } catch (InterruptedException | ExecutionException e) {
        }
        return null;
    }
}

class ComputeTask implements Callable {
    private Input input;
    public ComputeTask(Input input) {
        this.input = input;
    }

    @Override
    public Output call() {
        return compute(this.input);
    }

    private static Output compute(Input in) {
        return in.output();
    }
}

class MergeTask implements Callable {
    private Output output1;
    private Output output2;
    public MergeTask(Output output1, Output output2) {
        this.output1 = output1;
        this.output2 = output2;
    }

    @Override
    public Output call() {
        return merge(this.output1, this.output2);
    }

    private static Output merge(Output o1, Output o2) {
        return o1.add(o2);
    }
}
1 个赞

这个 class 是否有点多余?直接 lamda 可以搞定吧?

这个方法应该是 某个 utilitly 的 class 里,不应该是 MergeTask 的

总体思路完全正确,写的不错

Callable 的call()不带参数的,这里需要传一个input作为compute的参数,我不知道lamda怎么搞。

public interface Callable<V> {
    V call() throws Exception;
}

大概是

      for (Input input : inputs) {
           final source = input;
           futures.add(executor.submit(() -> compute(source)));
       }

class MergeTask implements Callable<Output>

这里给下 generic 的 output

最好有个 close 的方法把 executor shutdown,资源需要回收

谢谢,这样的确要简洁很多。lamda我要好好学学

改了下你的代码

package linkedin;

import java.io.Closeable;
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.*;

abstract class Input {
	abstract Output output();
}

abstract class Output {
	abstract Output add(Output o);
}

public class MergeCompute<O extends Output, I extends Input> implements Closeable {
	private ExecutorService executor;

	public MergeCompute(int k) {
		executor = Executors.newFixedThreadPool(k);
	}

	public Output mergeAll(List<Input> inputs) {
		List<Future<Output>> futures = new LinkedList<>();
		for (Input input : inputs) {
			final Input source = input;
			futures.add(executor.submit(() -> Util.compute(source)));
		}
		try {
			while (futures.size() > 1) {
				futures.add(executor.submit(new MergeTask(futures.remove(0).get(), futures.remove(0).get())));
			}
			return futures.get(0).get();
		} catch (InterruptedException | ExecutionException e) {
		}
		return null;
	}

	@Override
	public void close() throws IOException {
		this.executor.shutdown();
	}
}

class Util {
	public static Output compute(Input in) {
		return in.output();
	}
	
	public static Output merge(Output o1, Output o2) {
		return o1.add(o2);
	}
}

class MergeTask implements Callable<Output> {
	private Output output1;
	private Output output2;

	public MergeTask(Output output1, Output output2) {
		this.output1 = output1;
		this.output2 = output2;
	}

	@Override
	public Output call() {
		return Util.merge(this.output1, this.output2);
	}

}


1 个赞

这里有个比较严重的bug是 LinkedList 并不是 thread safe 的,凡是对 futures 的操作必须加锁

又看了一下,因为代码必须执行 remove 以后才能执行 add,所以应该没事

改进了一下code,用lamda 更简洁

public class MergeComputeWithLamda<O extends Output, I extends Input> implements Closeable {
    private ExecutorService executor;

    public MergeComputeWithLamda(int k) {
        executor = Executors.newFixedThreadPool(k);
    }

    public Output mergeAll(List<Input> inputs) {
        List<Future<Output>> futures = new LinkedList<>();
        for (Input input : inputs) {
            final Input source = input;
            futures.add(executor.submit(() -> Util.compute(source)));
        }
        try {
            while (futures.size() > 1) {
                final Output o1 = futures.remove(0).get();
                final Output o2 = futures.remove(0).get();
                futures.add(executor.submit(() -> Util.merge(o1, o2)));
            }
            return futures.get(0).get();
        } catch (InterruptedException | ExecutionException e) {
        }
        return null;
    }

    @Override
    public void close() throws IOException {
        this.executor.shutdown();
    }
}

1 个赞

futures 是本地变量, 没有race condition, 不需要加锁

1 个赞