Leetcode 443 String Compression 的 map reduce followup

狗家实习面出现的

先是问 https://leetcode.com/problems/string-compression/description/

然后 followup 是 input 是很大的 string 怎么办
主要思路是 map reduce
分配给 每个slave 的具体任务该是什么? 也就是具体怎么切割 string 然后 组装?

补充一下,先是要求实现两个方法,单机版。
即 compress 和 restore。后者是 把compress 以后的 string 还原成 original 长的string。

那么在大数据的情况下,多机版,如何把很长的string compress,这是第一步。

第二步,如何拿到compress 以后的string,将其还原?

目前想到的用map reduce compress的思路:

1.将input s分成多个substring, 分给每个mapper slave

2: mapper slave得到 input: (s, begIdx, endIdx), 扫一遍s,output key-value pairs: (char, idx of char in original s)

3: 用char 做key 把key-value pair 分组成 (char, list of indices of char in original s)。
例如 s = “applebeee”, slave1拿到apple, slave2拿到bee的话,其中一种可能的output: {a:[0], b:[5], e:[6,7,8,4], l:[3], p:[1,2]}。注意list of indices未必是升序,因为两个slave是parallel processes

4: reducer slave拿到(char, list of indices of char in original s)的input, 用LC 128. Longest Consecutive Sequence的思路,拼凑出每个char的consecutive intervals。

  1. 最后把intervals按beg index 排序,得到 a:[0,0], p:[1,2], l:[3,3], e:[4,4], b:[5,5], e:[6,8],return ap2lebe3

这里是根据什么切分?

可以直接给slave substring。加个sequence number用于reduce就可以了。

这个想的太复杂了吧

补充一下单机版String压缩和还原的method,以及调用主函数

import java.util.*;
public class StringCompression{
    public static String compress(String original) {
		if (original == null || original.length() == 0) {
			return original;
		}
		int n = original.length();

		StringBuilder compressed = new StringBuilder();
		int count = 1; // record char count
		int len = 1; // count total length
		Character c1 = original.charAt(0);
		while (len < n) {
		    // increment character count if no new character occur
			if (c1 == original.charAt(len)) {
				count ++;
			} else { //otherwise update character and refresh count
				compressed.append(c1);
				compressed.append(Integer.toString(count));
				c1 = original.charAt(len);
				count = 1;
			}
			len ++;
		}
		//add last character and count to the end
		compressed.append(c1);
		compressed.append(Integer.toString(count));
		return compressed.toString();
	}

	public static String restore(String compressed) {
		if (compressed == null || compressed.length() == 0) {
			return compressed;
		}
		int n = compressed.length();

		StringBuilder original = new StringBuilder(); // store result string
		StringBuilder count = new StringBuilder(); //temporary store character count
		int len = 1;
		Character c1 = compressed.charAt(0);
		while (len <= n) {
		    // if meet new character or end of string
			if((len < n && !Character.isDigit(compressed.charAt(len))) || len == n) {
				if (count.length() > 0) {
					int cnt = Integer.parseInt(count.toString());
					for (int i = 0; i < cnt; i++) {
						original.append(c1);
					}
				}
				// if not finished, refresh count and character cache
				if (len < n) {
				    c1 = compressed.charAt(len);
				    count = new StringBuilder();
				}
			} else {
			    //increment character count otherwise
				count.append(compressed.charAt(len));
			}
			len ++;
		}

		return original.toString();
	}

	public static void main(String[] args)
  {

    String input1 = "aabbbbbbbbbbbbbbbbbbbccc";
    String compressed1 = compress(input1);
    String original1 = restore(compressed1);
    System.out.println("input string: " + input1);
    System.out.println("compressed string: " + compressed1);
    System.out.println("restored original string: " + original1);
    
    String input2 = "a";
    String compressed2 = compress(input2);
    String original2 = restore(compressed2);
    System.out.println("input string: " + input2);
    System.out.println("compressed string: " + compressed2);
    System.out.println("restored original string: " + original2);
    
    String input3 = "abbbbbbbbbbbbbbbb";
    String compressed3 = compress(input3);
    String original3 = restore(compressed3);
    System.out.println("input string: " + input3);
    System.out.println("compressed string: " + compressed3);
    System.out.println("restored original string: " + original3);
  }
}

主函数输出结果如下

input string: aabbbbbbbbbbbbbbbbbbbccc
compressed string: a2b19c3
restored original string: aabbbbbbbbbbbbbbbbbbbccc
input string: a
compressed string: a1
restored original string: a
input string: abbbbbbbbbbbbbbbb
compressed string: a1b16
restored original string: abbbbbbbbbbbbbbbb

followup的第一部分(压缩),input切分可以按固定长度切分(比如len = 8),对substring调用 compress方法,存储输出string,最后reduce的时候按照subsequence number遇到前一段substring 结尾和 后一段substring 开头是相同char的情况下,对输出的string进行修改再拼接(比如前一段为a1b7 ,后一段为 b8, 拼接成a1b15)

1 Like

我目前想法,对于compress,如果input char array是海量数据,但还在HDFS能处理的范围内,比如100TB,我们把input char array存到HDFS这样的分布式文件系统里,用MapReduce。我们用一个Mapper,一个Reducer。设Mapper input为<K1, V1>, Mapper output和Reducer input都是<K2, V2>(Mapper的output就是Reducer的input),Reducer output为<K3, V3>。等下我会解释K1 V1 K2 V2 K3 V3具体都是什么。

举个例子帮助理解:
假设input char array是:
0123456789 (input char array index)
aaabbbbccc (input char array content)

首先把input char array做等长切割,每个切片的长度在一个mapper能处理的范围内。假设切成长度为5的等长片段:
01234 56789
aaabb | bbccc
分成aaabb和bbccc两个array segment。

这里借用Hadoop MapReduce的类型:
LongWritable对应Long,Text对应String,V2Object就是Java的Object。

Mapper input <K1, V1> 的type为<LongWritable, Text>
K1: Array segment offset (a.k.a. start index of array segment)
V1: Array segment content
于是aaabb变成<0L, “aaabb”>输入给Mapper1,bbccc变成<5L, “bbccc”>输入给Mapper2。

Mapper1拿到<0L, “aaabb”>,把aaabb按连续相同字符切分成aaa和bb。
aaa在original input char array的offset是从0到2,可以对字符a压缩成a3。我们把a作为K2,V2Object {compressedStr: “a3”, startIndex: 0, endIndex: 2}作为V2。
同样的算法,bb在original input char array的offset是从3到4,可以对字符b压缩成b2。把b作为K2,V2Object {compressedStr: “b2”, startIndex: 3, endIndex: 4}作为V2。

你会发现这里Mapper output <K2, V2> 的type为<Text, V2Object>

    K2: Current character in this continuous array segment with same char repeating
    V2: V2Object {
        // compressed string of this array segment, for example,
        // "a3" (compress of "aaa"), "b2" (compress of "b2")
        String compressedStr;
        long startIndex;
        long endIndex;
    }

这里的V2Object需要继承Comparable以便后面sort:

    public static class V2Object implements Comparable<V2Object> {
        ...
        @Override
        public int compareTo(V2Object o) {
            // 注意这里按startIndex排序
            return Long.compare(this.startIndex, o.startIndex);
        }
        ...
    }

这些<K2, V2>经过shuffle和sort传给Reducer。所有相同K2的Mapper输出结果都会去到同一个Reducer上。

回到我们的例子,Mapper1输入<0L, “aaabb”>,输出:
<“a”, {compressedStr: “a3”, startIndex: 0, endIndex: 2}>
<“b”, {compressedStr: “b2”, startIndex: 3, endIndex: 4}>
Mapper2输入<5L, “bbccc”>, 输出:
<“b”, {compressedStr: “b2”, startIndex: 5, endIndex: 6}>
<“c”, {compressedStr: “c3”, startIndex: 7, endIndex: 9}>

经过shuffle和sort,Reducer1输入:
<“a”, [{compressedStr: “a3”, startIndex: 0, endIndex: 2}]>
<“b”, [{compressedStr: “b2”, startIndex: 3, endIndex: 4}, {compressedStr: “b2”, startIndex: 5, endIndex: 6}]>
Reducer2输入:
<"c", [{compressedStr: "c3", startIndex: 7, endIndex: 9}]>

拿Reducer1输入<“b”, [{compressedStr: “b2”, startIndex: 3, endIndex: 4}, {compressedStr: “b2”, startIndex: 5, endIndex: 6}]>做栗子,reducer拿到V2 array后对array做一遍merge interval, 每个V2Object的startIndex和endIndex作为这个interval的start和end,相邻区间合并起来,并且把comressedStr也更新成合并后的compressedStr,举个栗子,{compressedStr: “b2”, startIndex: 3, endIndex: 4}和{compressedStr: “b2”, startIndex: 5, endIndex: 6}是相邻区间,合并成一个{compressedStr: “b4”, startIndex: 3, endIndex: 6}。reducer经过merge interval这一步之后得到一个新的区间数组,数组中的每个区间其实就是一个V2Object。所以[{compressedStr: “b2”, startIndex: 3, endIndex: 4}, {compressedStr: “b2”, startIndex: 5, endIndex: 6}]变成[{compressedStr: “b4”, startIndex: 3, endIndex: 6}]。然后对新的数组中每个V2Object emit一个<K3, V3> pair,对{compressedStr: “b4”, startIndex: 3, endIndex: 6}来说<K3, V3>就是<3, “b4”>。

你会发现Reducer output <K3, V3> 的type为<LongWritable, Text>:
K3: startIndex of current interval (V2Object) in the V2Object array after merge interval step
V3: compressedStr, e.g. “b4”

这里Reducer1会输出:
<0, “a3”>
<3, “b4”>
Reducer2会输出:
<7, “c3”>

最后所有reducer输出的结果会在final output排序成:
0 a3
3 b4
7 c3

你会发现每行的第二个token连起来就是compress的结果。如果需要可以把这个final output file再处理一下得到compress后的char array。

这里有一个问题,就是假设b开头的<K2, V2>结果数据量太大了,导致一个reducer装不下,这里可以提前sample一下original input array的数据特征,如果发现某个字符开头的<K2, [V2] Arrays>数据量太大,可能可以把K2变成字符和startIndex的组合,Customized Partitioner可以先读字符,然后把不同range的startIndex的相同字符的K2分到不同的reducer上,类似b开头,startIndex < 1000000 的去Reducer1,b开头1000000 <= startIndex && startIndex < 2000000 的去Reducer2,b开头2000000 <= startIndex && startIndex < 3000000的去Reducer3,以此类推这样。不知道对不对。

好难啊,求答案。

这是给实习生考的题目,仁者见仁吧

其实没必要存index,你是一块块切,最后reduce时候就保证顺序即可。
另外你思考一下,分配给 slave substring 的时候, 根据什么决定分配,还是说完全随机的。

这个思路是可行的 :+1:

你可以把每个 substring 一个sequence id,即 1、2、3、4 。。。。

在 reduce 的时候就按照顺序拼,碰到 前 substring 的尾 和 后 substring 的首 字母相同的情况时需要处理一下。其实没那么难。

请问这是哪一步?Mapper还是Reducer?

这里说的是 Mapper

老师能举个完整的maper和reducer的input output的例子吗?

这里说的是Mapper的话,我能想到的是相邻的substring尽量给同一个mapper,这样mapper处理完先写到本地磁盘,对应的reducer也可以尽量跑在同一个机器上,从本地磁盘读mapper的output,这样data locality比较好。这样想正确吗?和老师的思路一致吗?

就用你的例子好了,aaabbbbccc (input char array content)

首先把input char array做等长切割,假设切成长度为5的等长片段:
01234 56789
aaabb | bbccc
分成aaabb和bbccc两个array segment

假设有两个slave, slave 1 拿到 <aaabb, 1>, slave 2 拿到 <bbccc, 2>。数据是 substring 和 sequence id。
mapper 处理完就变成 <a3b2, 1> 和 <b2c3, 2>。

另外思维不要局限在套 Hadoop 方法。这里只是说用 map reduce,没说用 hadoop。

这里可以挖的就很多了,不是说唯一答案的,看你怎么说的有道理了。但是你说的上路了 :+1:

一种简单做法就是 master 机器自己做 reduce 。什么意思呢?就是说 mapper 处理完后返回给 master, master 根据 sequence 来排序组装即可。因为 master 最终要产生最后拼接的 string,一个 reducer 做就可以了,这步其实再用多个slave 有点多余。

原来如此。