Menu

Documentation

This project aims to create a system that generates information like n-gram counts, cooccurrence counts, or isolated sentences from a large corpus of webpages for a language of choice. Parallel processing of such tasks can lead to a huge performance benefit over serial processing.

MapReduce provides a programming model for parallel processing. We chose hadoop as a MapReduce framework. Our system is built as a pipeline of hadoop MapReduce jobs. The raw data is kindly provided by the FindLinks team at the university of Leipzig.

Project Subtasks

  • Extract documents from the raw data and provide them for further tasks in a standardized format aligned with metadata such as document URL and crawl date. We will refer to this job as DocumentJob.
  • Web crawling usually leads to a lot of noise, so some basic cleanup tasks need to be performed:
    • Deduplication of documents, as a documents might occur multiple times due to recrawling of a document or context variations, like a print page for a document that occurred already as a normal page. (DeduplicationJob, DeduplicationByHostJob)
    • Filtering of documents with malformed encodings. (UTF8Job)
  • Inner segmentation of documents for further processing:
  • Filtering of sentences with a language other than the chosen language. (LanguageJob)
  • Generate corpus of n-grams. (NGramCountJob, POSNGramCountJob, NGramWithPOSCountJob)
  • Generate corpus of cooccurrences. (CooccurrenceJob)
  • Extract sentences with clearly detected language in a standardized format. (SentenceExtractJob)

Overview

Job Before Transformations After
DocumentJob (format) Raw data with metadata as input Basic cleanup, normalize whitespace and URL. Format can be either "leipzig" or "arc". One document per line with URL and metadata
DeduplicationJob One document per line. Deduplication by URL. URL based duplicates are removed
DeduplicationByHostJob One document per line. Deduplication by host. Host based duplicates are removed
UTF8Job One document per line. Detect and remove documents that contain unknown glyphs. Mostly correct encoded documents
SentenceJob (language) One document per line. Sentence split text. Sentences are wrapped with XML-s-tags. If possible, a language-specific sentence segmentation model is used. For the language, use its two-letter ISO 639-2 code.
LanguageJob (language) One document per line with sentence annotation. Parameter giving expected language. Detects language per sentence. Estimates language for sentences that could not clearly be classified. Removes sentences with unexpected language estimation. Sentences are annotated with detected and estimated language.
SentenceAnnotateJob (n) One sentence per line, with sentence as key. Runs arbitrary UIMA components and writes serialized CASes as result One XML-serialized CAS per line. CAS compressed with GZip.
NGramCountJob (n) Serialized per-sentence CASes. Counts n-grams according to parameter One entry per line: n-gram#TAB#count
POSNGramCountJob (n) Serialized per-sentence CASes. Counts POS-n-grams according to parameter One entry per line: n-gram#TAB#count
NGramWithPOSCountJob (n) Serialized per-sentence CASes. Counts n-grams with appended POS-tags ("token/<POS>") according to parameter One entry per line: n-gram#TAB#count
CooccurrenceJob (n) One tokenized document per line. Parameter giving maximum cooccurrence distance to be counted. Counts cooccurrences with distance up to parameter n. One entry per line: word1@@-distance word2@@+distance#TAB#count
SentenceExtractJob One document per line with sentence annotation (and language annotation). Extract sentences with expected language (specified in LanguageJob run) and maximum length of 512 characters. One sentence per line with crawl date URL.
SentenceExtractCompactJob Output of SentenceExtractJob Deduplication and counting of sentences. One sentence per line with count, first crawl date, total count and up to ten URLs.

Pipeline

Visualization

foobar

Hadoop Jobs

All jobs are located in the package webcorpus.hadoopjobs.

DocumentJob

This job takes crawler archives in multiple possible formats and writes out relevant data in a uniform format, such that the following jobs can process it.

  • Filters documents with missing URL, or content=null or content="null".
  • Wraps URL entries in source metadata (sm) with CDATA markup.
  • Normalizes Encoding to UT8 if other source encoding is given in source metadata.
  • Adds processing metadata (pm) and adds document length as entry "length".
  • Detects paragraphs by multiple line breaks and wraps them with p-tags.
  • Normalizes whitespace (replace all line breaks, tabs and multiple whitespaces with single whitespaces) and trim text.
  • Detects duplicates by same URL, same length and same content in first and last n characters.
  • Output one document per line. Format (replace #TAB# with tab): URL#TAB#sm#TAB#pm#TAB#document

Configuration

There's two configuration options that are of special relevance here:

To only read documents from the archive that are of a specific mime type, you can turn on mime-type filtering using
conf.set("webcorpus.common.io.warcinputformat.filter-mimetypes", true). The default value is false.
To specify a list of mime-types to be read, use conf.set("webcorpus.documentjob.content-type-whitelist", "type1, type2, ..."), e.g. "text/html" to keep only HTML documents.

Data example

Before
<source><location>http://document.url/1</location><date>2012-01-01</date><user>Tom</user><original_encoding>utf-8</original_encoding><language>deu</language></source>

              Dies ist eine Testdatei       mit zwei Absätzen.


Zweiter Absatz.


Call
hadoop jar webcorpus.jar webcorpus.hadoopjobs.DocumentJob webcorpus_data/raw webcorpus_data/document --input-format <format>

Here, format must be one of "warc", "arc" or "leipzig".

After
http://document.url/1#TAB#<source><location><![CDATA[http://document.url/1]]></location><date>2012-01-01</date><user>Tom</user><original_encoding>utf-8</original_encoding><language>deu</language></source>#TAB#<process><length>83</length></process>#TAB#<p>Dies ist eine Testdatei mit zwei Absätzen.</p><p>Zweiter Absatz.</p> 

The complete document is printed in one line with normalized whitespace. #TAB# is used in this example to illustrate the tab character.

DeduplicationJob

Deduplication is performed, to decrease the amount of unnecessary data.

  • Create key, by concatenating document length and first and last n characters.
  • Use BloomFilter class (Bloom filters explained by example), to test if document already appeared.
  • If Conf.dropFilteredItems is set to false, every item with reoccurring URL will be labeled with is_duplicate:=[true|false].

To minimize the false positive rate of the bloom filter, an adequate bit vector size (dedupBloomVectorSize) and number of hashes (dedupBloomNbHash) must be chosen and set in jobconf.txt. For this you have to choose a number n of entries at which the bloom filter should deliver the smallest false positive rate. Then, with a given vector size, use the following equation to calculate the optimal number of hashes:

dedupBloomNbHash = (dedupBloomVectorSize/n) * ln(2)

For example, a bloom filter with a vector size of dedupBloomVectorSize=1024 bits that already contains n~=100 entries delivers the best (theoretical) false positive rate of 0.0073 at this point with dedupBloomNbHash=7 hashes.

Data example

Before
http://document.url/1   <source/>   <process/>   Same content.
http://document.url/1   <source/>   <process/>   Same content.
http://document.url/1/print   <source/>   <process/>   Same content.

Call
hadoop jar webcorpus.jar webcorpus.hadoopjobs.DeduplicationJob webcorpus_data/document webcorpus_data/deduplication
After
http://document.url/1   <source/>   <process/>   Same content.
http://document.url/1/print   <source/>   <process/>   Same content.

DeduplicationByHostJob

Same as DeduplicationJob, but looks for items with reoccurring hosts, by deconstructing URLs.

Data example

Before
http://document.url/1   <source/>   <process/>   Same content.
http://document.url/1/print   <source/>   <process/>   Same content.

Call
hadoop jar webcorpus.jar webcorpus.hadoopjobs.DeduplicationByHostJob webcorpus_data/deduplication webcorpus_data/deduplicationByHost
After
http://document.url/1   <source/>   <process/>   Same content.

UTF8Job

Encoding errors can lead to undesirable behaviour in language technology. This job tries to filter at least the obvious appearances of encoding errors.

  • Labels (encoding_error:=[true|false]) or removes documents with defective encoding.
  • Detects defective encoding just by looking for "�" (unknown glyph).

Data example

Before
... <p>Document cont[UNKNOWN GLYPH]ins encoding error.</p> ...
Call
hadoop jar webcorpus.jar webcorpus.hadoopjobs.UTF8Job webcorpus_data/deduplicationByHost webcorpus_data/utf8
After Document is removed.

SentenceJob

For further processing and as a project goal itself, isolated sentences are needed. This job splits paragraphs to sentences.

  • Wraps sentences with "s"-Tags.
  • The sentence splitter from ASV Toolbox is used
  • webcorpus_conf/segmentizer.zip
    • boundariesFile.txt
    • postList.txt
    • postRules.txt
    • preList.txt
    • preRules.txt Adding these files as single files to the distributed cache has been problematic (reason unclear yet), but adding them as an archive file finally worked as expected.
// Add to the cache with symlink "segmentizer" in public int run(String[] args):
DistributedCache.addCacheArchive(new URI("webcorpus_conf/segmentizer.zip#segmentizer"), conf);
DistributedCache.createSymlink(conf);

// The segmentizer expects an URL for every config file as parameter 
// create a HashMap to store all the URLs in the Mapper
private Map<String, URL> urlMap;

// retrieve from cache in mapper
private void getResources(){
   URL resource = job.getResource("segmentizer");
   File inputPath = new File(resource.getPath());
   for (File f : inputPath.listFiles()) {
      File file = new File(f.getAbsolutePath());
      if(file.exists()){
         try {
            urlMap.put(file.getName(), file.toURI().toURL());
         } catch (MalformedURLException e) {}
      }
   }
}

Data example

Before
... <p>Paragraph contains two sentences. This is the second sentence.</p> ...

Call
hadoop jar webcorpus.jar webcorpus.hadoopjobs.SentenceJob webcorpus_data/utf8 webcorpus_data/sentence --lang en
After
... <p><s>Paragraph contains two sentences.</s><s>This is the second sentence.</s></p> ...

LanguageJob

Text that has not the language of choice should be omitted. Therefor we perform sentence based language detection using jlani from

ASV Toolbox.

  • Takes optional parameter [language_name] (ISO 639-1) to set language to look for.
    • hadoop jar webcorpus.jar webcorpus.hadoopjobs.LanguageJob webcorpus_data/sentence webcorpus_data/language [language_name]
  • Sentences, where the detected language (lani) matches language_name, will be labeled with: lang=language_name, lani=language_name
  • Where lani does not match language_name for a sequence of sentences, following rules apply:
    • if sequence is at the beginning of a paragraph: lang=lani, lani=lani
    • if sequence is at the end of a paragraph: lang=lani, lani=lani
    • else if sequence is not longer than Conf.maximumUnknownLanguageLength: lang=language_name, lani=lani
    • else: lang=lani, lani=lani
  • if Conf.dropFilteredItems is true, only sentences with lang=language_name will be kept.
  • jlani expects some files. In order to work with hadoop, they are packed into jlani.zip, which remains in the hdfs (webcorpus_conf/jlani.zip):
    • blacklist_utf8.txt
    • mappings.txt
    • de.ser.gz
    • en.ser.gz
    • ... (models for more languages)
  • The files are loaded from the distributed cache in a similar way, SentenceJob loads files.

Data example

Before
... <p><s>Paragraph contains three sentences.</s><s>One english, one gemischtsprachig, one Монгол.</s><s>элдэв гажиг мэдээлэл агуулсан бичлэгүүдийг аль болохоор хурдан хугацаанд устгах юмуу өөрчилнө.</s></p> ...

Call
hadoop jar webcorpus.jar webcorpus.hadoopjobs.LanguageJob webcorpus_data/sentence webcorpus_data/language --lang en
After
... <p><s lang="en" lani="en">Paragraph contains three sentences.</s><s lang="en" lani="unknown">One english, one gemischtsprachig, one Монгол.</s></p> ...

The second sentence is considered to be an english sentence, as it is following an english sentence and is short enough. The last sentence is removed.

SentenceAnnotateJob

Runs arbitrary UIMA components on deduplified sentences.

  • Counts n-grams based on tokens.
  • Expects output of SentenceExtractCompactJob as input
  • Writes one XML-serialized CAS per line. Compresses each CAS with GZip.
  • hadoop jar webcorpus.jar webcorpus.hadoopjobs.SentenceAnnotateJob webcorpus_data/sentenceExtractCompact webcorpus_data/sentenceAnnotate

NGramCountJob

This job counts n-grams of tokens.

  • Counts n-grams based on tokens.
  • Expects a parameter to set n:
    • hadoop jar webcorpus.jar webcorpus.hadoopjobs.NGramCountJob webcorpus_data/token webcorpus_data/ngram -n <n>

Data example

Before

Expects the output of SentenceAnnotateJob as input.

Call
hadoop jar webcorpus.jar webcorpus.hadoopjobs.NGramCountJob webcorpus_data/token webcorpus_data/ngram -n 3

Calculate some 3-grams.

After
...
und diffusionsgeschlossene Dämmungen   2
und digitale Programme   1
und digitalen Kartensammlungen   2
...

NGramWithPOSCountJob

This job counts n-grams of tokens with POS appended POS tags.

  • Counts n-grams based on tokens.
  • Expects a parameter to set n:
    • hadoop jar webcorpus.jar webcorpus.hadoopjobs.NGramWithPOSCountJob webcorpus_data/token webcorpus_data/ngram-with-pos -n <n<

Data example

Before

Expects the output of SentenceAnnotateJob as input.

Call
hadoop jar webcorpus.jar webcorpus.hadoopjobs.NGramWithPOSCountJob webcorpus_data/token webcorpus_data/ngram-with-pos -n 3

Calculate some 3-grams.

After
...
und/<KON> diffusionsgeschlossene/<ADJA> Dämmungen/<NN>   2
und/<KON> digitale/<ADJA> Programme/<NN>   1
und/<KON> digitalen/<ADJA> Kartensammlungen/<NN>   2
...

POSNGramCountJob

This job counts POS-n-grams of tokens.

  • POS-n-grams are created by partially substituting tokens with their POS tags
  • All possible combinations are counted:
TOK TOK TOK
TOK TOK POS
TOK POS TOK
TOK POS POS
...
  • Expects a parameter to set n:
    • hadoop jar webcorpus.jar webcorpus.hadoopjobs.POSNGramCountJob webcorpus_data/token webcorpus_data/pos-ngram -n <n<

Data example

Before

Expects the output of SentenceAnnotateJob as input.

Call
hadoop jar webcorpus.jar webcorpus.hadoopjobs.POSNGramCountJob webcorpus_data/token webcorpus_data/pos-ngram -n 3

Calculate some 3-grams.

After
...
und diffusionsgeschlossene Dämmungen   2
und diffusionsgeschlossene <NN>   3
und <ADJA> Dämmungen   2
und <ADJA> <NN>   15
...

CooccurrenceJob

This job counts cooccurrences of tokens.

  • Counts cooccurrences based on tokens with distance up to n.
  • Outputs all cooccurrences for all distances (1, 2, ..., n) at once.
  • Expects a parameter to set n:
    • hadoop jar webcorpus.jar webcorpus.hadoopjobs.CooccurrenceJob webcorpus_data/token webcorpus_data/cooccurrence -n <n<

Data example

Before

CooccurrenceJob expects the output of TokenJob as input.

Call
hadoop jar webcorpus.jar webcorpus.hadoopjobs.CooccurrenceJob webcorpus_data/token webcorpus_data/cooccurrence -n 5

Calculate cooccurrences up to distance 5.

After
...
üpp@@-1 ’@@+1   1
üppig@@-2 Bewuchses@@+2   1
üppig@@-2 Buchs@@+2   1
üppig@@-2 Garten@@+2   3
üppig@@-2 Getreidefelder@@+2   1
üppig@@-3 Garten@@+3   1
üppig@@-3 Hmmmmm@@+3   1
üppig@@-3 in@@+3   9
...

Line format is a follows:

{word at p}@@-i {word at p+i}@@+i   count

where p is the position of the left word and i is the distance between the left and the right word.

SentenceExtractJob

Preprocessing step for SentenceExtractCompactJob.

  • Extracts sentences along with their corresponding dates and URLs.
  • Sentences longer than 512 characters are omitted.
  • Sentences, where lani detected "unknown", are omitted.

Data example

Before

Expects output of LanguageJob as input.

Call
hadoop jar webcorpus.jar webcorpus.hadoopjobs.SentenceExtractJob webcorpus_data/language webcorpus_data/sentenceExtract
After
...
1560 wurde dem Markte Zwiesel ein Wappen zugesprochen.   http://zwiesel.de/content.php?content=d_3_9_3   2011-02-27
Die Wappenverleihungsurkunde vom 11. Sept. dieses Jahres lautet wörtlich:   http://zwiesel.de/content.php?content=d_3_9_3   2011-02-27
...

SentenceExtractCompactJob

Extracted and deduplicated sentences are a project goal. This job performs the deduplification and provides the desired sentences as output.

  • Reduces the Output of SentenceExtractJob.
  • Counts the number of total occurrences of a sentence.
  • Outputs up to 10 URLs per Sentence.

Data example

Before

Expects output of SentenceExtractJob as input.

Call
hadoop jar webcorpus.jar webcorpus.hadoopjobs.SentenceExtractCompactJob webcorpus_data/sentenceExtract webcorpus_data/sentenceExtractCompact
After
...
Hier steht ein Satz.   3   2011-02-24   http://occurrence1.com/index.php?id=42   http://occurrence2.com
...

Note that total count is higher than the number of URLs. This means, that a Sentence occurs multiple time in at least one page.


Example Shellscript to run the jobs

HDFS_DIRECTORY=webcorpus
JAR=../webcorpus.jar
LANGUAGE=de

# run
hadoop jar ${JAR} webcorpus.hadoopjobs.DocumentJob ${HDFS_DIRECTORY}/raw ${HDFS_DIRECTORY}/document
hadoop jar ${JAR} webcorpus.hadoopjobs.DeduplicationJob ${HDFS_DIRECTORY}/document ${HDFS_DIRECTORY}/deduplication
hadoop jar ${JAR} webcorpus.hadoopjobs.DeduplicationByHostJob ${HDFS_DIRECTORY}/deduplication ${HDFS_DIRECTORY}/deduplicationByHost
hadoop jar ${JAR} webcorpus.hadoopjobs.UTF8Job ${HDFS_DIRECTORY}/deduplicationByHost ${HDFS_DIRECTORY}/utf8
hadoop jar ${JAR} webcorpus.hadoopjobs.SentenceJob ${HDFS_DIRECTORY}/utf8 ${HDFS_DIRECTORY}/sentence --lang en
hadoop jar ${JAR} webcorpus.hadoopjobs.LanguageJob ${HDFS_DIRECTORY}/sentence --lang en ${HDFS_DIRECTORY}/language ${LANGUAGE}
hadoop jar ${JAR} webcorpus.hadoopjobs.TokenJob ${HDFS_DIRECTORY}/language ${HDFS_DIRECTORY}/token
hadoop jar ${JAR} webcorpus.hadoopjobs.NGramCountJob ${HDFS_DIRECTORY}/token ${HDFS_DIRECTORY}/1gram -n 1
hadoop jar ${JAR} webcorpus.hadoopjobs.NGramCountJob ${HDFS_DIRECTORY}/token ${HDFS_DIRECTORY}/2gram -n 2
hadoop jar ${JAR} webcorpus.hadoopjobs.NGramCountJob ${HDFS_DIRECTORY}/token ${HDFS_DIRECTORY}/3gram -n 3
hadoop jar ${JAR} webcorpus.hadoopjobs.NGramCountJob ${HDFS_DIRECTORY}/token ${HDFS_DIRECTORY}/4gram -n 4
hadoop jar ${JAR} webcorpus.hadoopjobs.NGramCountJob ${HDFS_DIRECTORY}/token ${HDFS_DIRECTORY}/5gram -n 5 
hadoop jar ${JAR} webcorpus.hadoopjobs.CooccurrenceJob ${HDFS_DIRECTORY}/token ${HDFS_DIRECTORY}/cooccurrence -n 5
hadoop jar ${JAR} webcorpus.hadoopjobs.SentenceExtractJob ${HDFS_DIRECTORY}/language ${HDFS_DIRECTORY}/sentenceExtract
hadoop jar ${JAR} webcorpus.hadoopjobs.SentenceExtractCompactJob ${HDFS_DIRECTORY}/sentenceExtract ${HDFS_DIRECTORY}/sentenceExtractCompact

And optionally extract n-gram counts and cooccurrence counts from the HDFS and sort them.

hadoop dfs -cat ${HDFS_DIRECTORY}/1gram/part* > 1gram.txt
hadoop dfs -cat ${HDFS_DIRECTORY}/2gram/part* > 2gram.txt
hadoop dfs -cat ${HDFS_DIRECTORY}/3gram/part* > 3gram.txt
hadoop dfs -cat ${HDFS_DIRECTORY}/4gram/part* > 4gram.txt
hadoop dfs -cat ${HDFS_DIRECTORY}/5gram/part* > 5gram.txt

hadoop dfs -cat ${HDFS_DIRECTORY}/cooccurrence/part* > cooccurrence.txt

sort -k 2 -t "   " -r -n < 1gram.txt > 1gram_sorted.txt
sort -k 2 -t "   " -r -n < 2gram.txt > 2gram_sorted.txt
sort -k 2 -t "   " -r -n < 3gram.txt > 3gram_sorted.txt
sort -k 2 -t "   " -r -n < 4gram.txt > 4gram_sorted.txt
sort -k 2 -t "   " -r -n < 5gram.txt > 5gram_sorted.txt

sort -k 2 -t "   " -r -n < cooccurrence.txt > cooccurrence_sorted.txt

Note the tab characters between the quotation marks.

Configuration

All configuration is done per project in a dedicated configuration file called "jobconf.txt".

Documented example configuration file

# Name of job queue.
# string, {default, quick} - default: default
queueName=default

# Length of tested String (each at start and end)  for deduplication.
# int, (0, MAX_STRING_LENGTH / 2] - default: 1000 
dedupTestLength=1000

# The number of bits in the vector. => |Expected Values| * log_2(1 / FP-Rate)
# int, (0, MAX_INT] - default: 256 
dedupBloomVectorSize=1024

# Bloom filter: number of hashes to consider
# int - default: 7
dedupBloomNbHash=7

# Deduplication Bloom Filter hash function
# {murmur, jenkins} - default: jenkins
dedupBloomHashFunction=jenkins

# Maximum length of text, with other language detected than expected.
# int - default: 200
maximumUnknownLanguageLength=200

# Drop filtered documents, intead of labeling them.
# bool - default: false (for testing, use true for production)
dropFilteredItems=true

# Use compression for map-reduce output. Always true for intermediate output (mappers).
# bool - default: false (for testing, use true for production)
useCompression=false

# Number of map tasks for the job
# int - deault: 80 
numberOfMapTasks=80

# Number of reduce tasks for the job
# int - default: 80
numberOfReduceTasks=80

# Options for Stanford Tokenizer
# string - default: ""
#tokenizerOptions=

# Separator for multiple keys (e.g. for CooccurrenceJob)
# string - default: "@@"
#keySeparator=@@

# Sentence separator (e.g. for n-gram)
# string - default: "<s>"
#sentenceSeparator=<s>


Related

Wiki: Home