This project aims to create a system that generates information like n-gram counts, cooccurrence counts, or isolated sentences from a large corpus of webpages for a language of choice. Parallel processing of such tasks can lead to a huge performance benefit over serial processing.
MapReduce provides a programming model for parallel processing. We chose hadoop as a MapReduce framework. Our system is built as a pipeline of hadoop MapReduce jobs. The raw data is kindly provided by the FindLinks team at the university of Leipzig.
Job | Before | Transformations | After |
---|---|---|---|
DocumentJob (format) | Raw data with metadata as input | Basic cleanup, normalize whitespace and URL. Format can be either "leipzig" or "arc". | One document per line with URL and metadata |
DeduplicationJob | One document per line. | Deduplication by URL. | URL based duplicates are removed |
DeduplicationByHostJob | One document per line. | Deduplication by host. | Host based duplicates are removed |
UTF8Job | One document per line. | Detect and remove documents that contain unknown glyphs. | Mostly correct encoded documents |
SentenceJob (language) | One document per line. | Sentence split text. | Sentences are wrapped with XML-s-tags. If possible, a language-specific sentence segmentation model is used. For the language, use its two-letter ISO 639-2 code. |
LanguageJob (language) | One document per line with sentence annotation. Parameter giving expected language. | Detects language per sentence. Estimates language for sentences that could not clearly be classified. Removes sentences with unexpected language estimation. | Sentences are annotated with detected and estimated language. |
SentenceAnnotateJob (n) | One sentence per line, with sentence as key. | Runs arbitrary UIMA components and writes serialized CASes as result | One XML-serialized CAS per line. CAS compressed with GZip. |
NGramCountJob (n) | Serialized per-sentence CASes. | Counts n-grams according to parameter | One entry per line: n-gram#TAB#count |
POSNGramCountJob (n) | Serialized per-sentence CASes. | Counts POS-n-grams according to parameter | One entry per line: n-gram#TAB#count |
NGramWithPOSCountJob (n) | Serialized per-sentence CASes. | Counts n-grams with appended POS-tags ("token/<POS>") according to parameter | One entry per line: n-gram#TAB#count |
CooccurrenceJob (n) | One tokenized document per line. Parameter giving maximum cooccurrence distance to be counted. | Counts cooccurrences with distance up to parameter n. | One entry per line: word1@@-distance word2@@+distance#TAB#count |
SentenceExtractJob | One document per line with sentence annotation (and language annotation). | Extract sentences with expected language (specified in LanguageJob run) and maximum length of 512 characters. | One sentence per line with crawl date URL. |
SentenceExtractCompactJob | Output of SentenceExtractJob | Deduplication and counting of sentences. | One sentence per line with count, first crawl date, total count and up to ten URLs. |
All jobs are located in the package webcorpus.hadoopjobs.
This job takes crawler archives in multiple possible formats and writes out relevant data in a uniform format, such that the following jobs can process it.
There's two configuration options that are of special relevance here:
To only read documents from the archive that are of a specific mime type, you can turn on mime-type filtering using
conf.set("webcorpus.common.io.warcinputformat.filter-mimetypes", true)
. The default value is false
.
To specify a list of mime-types to be read, use conf.set("webcorpus.documentjob.content-type-whitelist", "type1, type2, ...")
, e.g. "text/html" to keep only HTML documents.
<source><location>http://document.url/1</location><date>2012-01-01</date><user>Tom</user><original_encoding>utf-8</original_encoding><language>deu</language></source> Dies ist eine Testdatei mit zwei Absätzen. Zweiter Absatz.
hadoop jar webcorpus.jar webcorpus.hadoopjobs.DocumentJob webcorpus_data/raw webcorpus_data/document --input-format <format>
Here, format
must be one of "warc", "arc" or "leipzig".
http://document.url/1#TAB#<source><location><![CDATA[http://document.url/1]]></location><date>2012-01-01</date><user>Tom</user><original_encoding>utf-8</original_encoding><language>deu</language></source>#TAB#<process><length>83</length></process>#TAB#<p>Dies ist eine Testdatei mit zwei Absätzen.</p><p>Zweiter Absatz.</p>
The complete document is printed in one line with normalized whitespace. #TAB# is used in this example to illustrate the tab character.
Deduplication is performed, to decrease the amount of unnecessary data.
To minimize the false positive rate of the bloom filter, an adequate bit vector size (dedupBloomVectorSize) and number of hashes (dedupBloomNbHash) must be chosen and set in jobconf.txt. For this you have to choose a number n of entries at which the bloom filter should deliver the smallest false positive rate. Then, with a given vector size, use the following equation to calculate the optimal number of hashes:
dedupBloomNbHash = (dedupBloomVectorSize/n) * ln(2)
For example, a bloom filter with a vector size of dedupBloomVectorSize=1024 bits that already contains n~=100 entries delivers the best (theoretical) false positive rate of 0.0073 at this point with dedupBloomNbHash=7 hashes.
http://document.url/1 <source/> <process/> Same content. http://document.url/1 <source/> <process/> Same content. http://document.url/1/print <source/> <process/> Same content.
hadoop jar webcorpus.jar webcorpus.hadoopjobs.DeduplicationJob webcorpus_data/document webcorpus_data/deduplication
http://document.url/1 <source/> <process/> Same content. http://document.url/1/print <source/> <process/> Same content.
Same as DeduplicationJob, but looks for items with reoccurring hosts, by deconstructing URLs.
http://document.url/1 <source/> <process/> Same content. http://document.url/1/print <source/> <process/> Same content.
hadoop jar webcorpus.jar webcorpus.hadoopjobs.DeduplicationByHostJob webcorpus_data/deduplication webcorpus_data/deduplicationByHost
http://document.url/1 <source/> <process/> Same content.
Encoding errors can lead to undesirable behaviour in language technology. This job tries to filter at least the obvious appearances of encoding errors.
... <p>Document cont[UNKNOWN GLYPH]ins encoding error.</p> ...
hadoop jar webcorpus.jar webcorpus.hadoopjobs.UTF8Job webcorpus_data/deduplicationByHost webcorpus_data/utf8
For further processing and as a project goal itself, isolated sentences are needed. This job splits paragraphs to sentences.
// Add to the cache with symlink "segmentizer" in public int run(String[] args): DistributedCache.addCacheArchive(new URI("webcorpus_conf/segmentizer.zip#segmentizer"), conf); DistributedCache.createSymlink(conf); // The segmentizer expects an URL for every config file as parameter // create a HashMap to store all the URLs in the Mapper private Map<String, URL> urlMap; // retrieve from cache in mapper private void getResources(){ URL resource = job.getResource("segmentizer"); File inputPath = new File(resource.getPath()); for (File f : inputPath.listFiles()) { File file = new File(f.getAbsolutePath()); if(file.exists()){ try { urlMap.put(file.getName(), file.toURI().toURL()); } catch (MalformedURLException e) {} } } }
... <p>Paragraph contains two sentences. This is the second sentence.</p> ...
hadoop jar webcorpus.jar webcorpus.hadoopjobs.SentenceJob webcorpus_data/utf8 webcorpus_data/sentence --lang en
... <p><s>Paragraph contains two sentences.</s><s>This is the second sentence.</s></p> ...
Text that has not the language of choice should be omitted. Therefor we perform sentence based language detection using jlani from
... <p><s>Paragraph contains three sentences.</s><s>One english, one gemischtsprachig, one Монгол.</s><s>элдэв гажиг мэдээлэл агуулсан бичлэгүүдийг аль болохоор хурдан хугацаанд устгах юмуу өөрчилнө.</s></p> ...
hadoop jar webcorpus.jar webcorpus.hadoopjobs.LanguageJob webcorpus_data/sentence webcorpus_data/language --lang en
... <p><s lang="en" lani="en">Paragraph contains three sentences.</s><s lang="en" lani="unknown">One english, one gemischtsprachig, one Монгол.</s></p> ...
The second sentence is considered to be an english sentence, as it is following an english sentence and is short enough. The last sentence is removed.
Runs arbitrary UIMA components on deduplified sentences.
This job counts n-grams of tokens.
Expects the output of SentenceAnnotateJob as input.
hadoop jar webcorpus.jar webcorpus.hadoopjobs.NGramCountJob webcorpus_data/token webcorpus_data/ngram -n 3
Calculate some 3-grams.
... und diffusionsgeschlossene Dämmungen 2 und digitale Programme 1 und digitalen Kartensammlungen 2 ...
This job counts n-grams of tokens with POS appended POS tags.
Expects the output of SentenceAnnotateJob as input.
hadoop jar webcorpus.jar webcorpus.hadoopjobs.NGramWithPOSCountJob webcorpus_data/token webcorpus_data/ngram-with-pos -n 3
Calculate some 3-grams.
... und/<KON> diffusionsgeschlossene/<ADJA> Dämmungen/<NN> 2 und/<KON> digitale/<ADJA> Programme/<NN> 1 und/<KON> digitalen/<ADJA> Kartensammlungen/<NN> 2 ...
This job counts POS-n-grams of tokens.
TOK TOK TOK TOK TOK POS TOK POS TOK TOK POS POS ...
Expects the output of SentenceAnnotateJob as input.
hadoop jar webcorpus.jar webcorpus.hadoopjobs.POSNGramCountJob webcorpus_data/token webcorpus_data/pos-ngram -n 3
Calculate some 3-grams.
... und diffusionsgeschlossene Dämmungen 2 und diffusionsgeschlossene <NN> 3 und <ADJA> Dämmungen 2 und <ADJA> <NN> 15 ...
This job counts cooccurrences of tokens.
CooccurrenceJob expects the output of TokenJob as input.
hadoop jar webcorpus.jar webcorpus.hadoopjobs.CooccurrenceJob webcorpus_data/token webcorpus_data/cooccurrence -n 5
Calculate cooccurrences up to distance 5.
... üpp@@-1 ’@@+1 1 üppig@@-2 Bewuchses@@+2 1 üppig@@-2 Buchs@@+2 1 üppig@@-2 Garten@@+2 3 üppig@@-2 Getreidefelder@@+2 1 üppig@@-3 Garten@@+3 1 üppig@@-3 Hmmmmm@@+3 1 üppig@@-3 in@@+3 9 ...
Line format is a follows:
{word at p}@@-i {word at p+i}@@+i count
where p is the position of the left word and i is the distance between the left and the right word.
Preprocessing step for SentenceExtractCompactJob.
Expects output of LanguageJob as input.
hadoop jar webcorpus.jar webcorpus.hadoopjobs.SentenceExtractJob webcorpus_data/language webcorpus_data/sentenceExtract
... 1560 wurde dem Markte Zwiesel ein Wappen zugesprochen. http://zwiesel.de/content.php?content=d_3_9_3 2011-02-27 Die Wappenverleihungsurkunde vom 11. Sept. dieses Jahres lautet wörtlich: http://zwiesel.de/content.php?content=d_3_9_3 2011-02-27 ...
Extracted and deduplicated sentences are a project goal. This job performs the deduplification and provides the desired sentences as output.
Expects output of SentenceExtractJob as input.
hadoop jar webcorpus.jar webcorpus.hadoopjobs.SentenceExtractCompactJob webcorpus_data/sentenceExtract webcorpus_data/sentenceExtractCompact
... Hier steht ein Satz. 3 2011-02-24 http://occurrence1.com/index.php?id=42 http://occurrence2.com ...
Note that total count is higher than the number of URLs. This means, that a Sentence occurs multiple time in at least one page.
HDFS_DIRECTORY=webcorpus JAR=../webcorpus.jar LANGUAGE=de # run hadoop jar ${JAR} webcorpus.hadoopjobs.DocumentJob ${HDFS_DIRECTORY}/raw ${HDFS_DIRECTORY}/document hadoop jar ${JAR} webcorpus.hadoopjobs.DeduplicationJob ${HDFS_DIRECTORY}/document ${HDFS_DIRECTORY}/deduplication hadoop jar ${JAR} webcorpus.hadoopjobs.DeduplicationByHostJob ${HDFS_DIRECTORY}/deduplication ${HDFS_DIRECTORY}/deduplicationByHost hadoop jar ${JAR} webcorpus.hadoopjobs.UTF8Job ${HDFS_DIRECTORY}/deduplicationByHost ${HDFS_DIRECTORY}/utf8 hadoop jar ${JAR} webcorpus.hadoopjobs.SentenceJob ${HDFS_DIRECTORY}/utf8 ${HDFS_DIRECTORY}/sentence --lang en hadoop jar ${JAR} webcorpus.hadoopjobs.LanguageJob ${HDFS_DIRECTORY}/sentence --lang en ${HDFS_DIRECTORY}/language ${LANGUAGE} hadoop jar ${JAR} webcorpus.hadoopjobs.TokenJob ${HDFS_DIRECTORY}/language ${HDFS_DIRECTORY}/token hadoop jar ${JAR} webcorpus.hadoopjobs.NGramCountJob ${HDFS_DIRECTORY}/token ${HDFS_DIRECTORY}/1gram -n 1 hadoop jar ${JAR} webcorpus.hadoopjobs.NGramCountJob ${HDFS_DIRECTORY}/token ${HDFS_DIRECTORY}/2gram -n 2 hadoop jar ${JAR} webcorpus.hadoopjobs.NGramCountJob ${HDFS_DIRECTORY}/token ${HDFS_DIRECTORY}/3gram -n 3 hadoop jar ${JAR} webcorpus.hadoopjobs.NGramCountJob ${HDFS_DIRECTORY}/token ${HDFS_DIRECTORY}/4gram -n 4 hadoop jar ${JAR} webcorpus.hadoopjobs.NGramCountJob ${HDFS_DIRECTORY}/token ${HDFS_DIRECTORY}/5gram -n 5 hadoop jar ${JAR} webcorpus.hadoopjobs.CooccurrenceJob ${HDFS_DIRECTORY}/token ${HDFS_DIRECTORY}/cooccurrence -n 5 hadoop jar ${JAR} webcorpus.hadoopjobs.SentenceExtractJob ${HDFS_DIRECTORY}/language ${HDFS_DIRECTORY}/sentenceExtract hadoop jar ${JAR} webcorpus.hadoopjobs.SentenceExtractCompactJob ${HDFS_DIRECTORY}/sentenceExtract ${HDFS_DIRECTORY}/sentenceExtractCompact
And optionally extract n-gram counts and cooccurrence counts from the HDFS and sort them.
hadoop dfs -cat ${HDFS_DIRECTORY}/1gram/part* > 1gram.txt hadoop dfs -cat ${HDFS_DIRECTORY}/2gram/part* > 2gram.txt hadoop dfs -cat ${HDFS_DIRECTORY}/3gram/part* > 3gram.txt hadoop dfs -cat ${HDFS_DIRECTORY}/4gram/part* > 4gram.txt hadoop dfs -cat ${HDFS_DIRECTORY}/5gram/part* > 5gram.txt hadoop dfs -cat ${HDFS_DIRECTORY}/cooccurrence/part* > cooccurrence.txt sort -k 2 -t " " -r -n < 1gram.txt > 1gram_sorted.txt sort -k 2 -t " " -r -n < 2gram.txt > 2gram_sorted.txt sort -k 2 -t " " -r -n < 3gram.txt > 3gram_sorted.txt sort -k 2 -t " " -r -n < 4gram.txt > 4gram_sorted.txt sort -k 2 -t " " -r -n < 5gram.txt > 5gram_sorted.txt sort -k 2 -t " " -r -n < cooccurrence.txt > cooccurrence_sorted.txt
Note the tab characters between the quotation marks.
All configuration is done per project in a dedicated configuration file called "jobconf.txt".
# Name of job queue. # string, {default, quick} - default: default queueName=default # Length of tested String (each at start and end) for deduplication. # int, (0, MAX_STRING_LENGTH / 2] - default: 1000 dedupTestLength=1000 # The number of bits in the vector. => |Expected Values| * log_2(1 / FP-Rate) # int, (0, MAX_INT] - default: 256 dedupBloomVectorSize=1024 # Bloom filter: number of hashes to consider # int - default: 7 dedupBloomNbHash=7 # Deduplication Bloom Filter hash function # {murmur, jenkins} - default: jenkins dedupBloomHashFunction=jenkins # Maximum length of text, with other language detected than expected. # int - default: 200 maximumUnknownLanguageLength=200 # Drop filtered documents, intead of labeling them. # bool - default: false (for testing, use true for production) dropFilteredItems=true # Use compression for map-reduce output. Always true for intermediate output (mappers). # bool - default: false (for testing, use true for production) useCompression=false # Number of map tasks for the job # int - deault: 80 numberOfMapTasks=80 # Number of reduce tasks for the job # int - default: 80 numberOfReduceTasks=80 # Options for Stanford Tokenizer # string - default: "" #tokenizerOptions= # Separator for multiple keys (e.g. for CooccurrenceJob) # string - default: "@@" #keySeparator=@@ # Sentence separator (e.g. for n-gram) # string - default: "<s>" #sentenceSeparator=<s>