| File | Date | Author | Commit |
|---|---|---|---|
| project | 2023-05-30 |
|
[58d712] Initial |
| src | 2023-06-06 |
|
[5ac724] write3 models |
| .gitignore | 2023-05-31 |
|
[894dff] - |
| README.md | 2023-06-06 |
|
[397b48] working on the cluster: LDA for Iranian and Aus... |
| args.txt | 2023-06-06 |
|
[4676a2] about to test batch |
| gpl-3.0.txt | 2023-05-30 |
|
[58d712] Initial |
| pplns1.mk | 2023-06-05 |
|
[b9840e] argument parser see xopts in the makefile and i... |
| submit.properties | 2023-06-01 |
|
[242e3b] cluster mode working |
weaves
Revised for Spark 3. Scala version is 2.12 and has been changed to align with Apache Spark.
This follows the article
with the title "Distributed Topic Modelling using Spark NLP and Spark MLLib(LDA)".
The Spark LDA implementation is given in their MLlib guide.
It appears in the Clustering section.
This makes use of the John Snow NLP components.
These can be loaded by Ivy by Spark if the spark.jars.packages configuration parameter set to
"com.johnsnowlabs.nlp:spark-nlp_2.12:4.4.2".
It is hoped that a configurable package that can be run repeatedly on the same corpus of texts.
The corpus can be stored within the cluster on HDFS or within Hive.
A series of Spark jobs can then be submitted to analyze the texts under different configurations. This would use a
some cross-validation resampling to find some robust topics.
Using Hive to capture the data frames is very slow. This is enabled by the Archiver0 class.
The Natural Language Processing pipeline converts texts DataFrame entities. There are two pipelines: StopStem
and WordEmbeddings.
This is used for the Natural Language Processing pipelines. The Spark component is here.
There are some guidelines on quality evaluation.
https://towardsdatascience.com/evaluate-topic-model-in-python-latent-dirichlet-allocation-lda-7d57484bb5d0
Running on the Spark cluster has a user name of hadoop but locally you should be using your host OS username.
When running under YARN, a SparkSession is constructed and the configuration properties are passed on the command-line
via spark-submit. To help with that there is a top-level Makefile and a template in the nlp.git project that creates
the scripts for use by this package and others. The script in the packages is called submit.sh,
For local testing, in Intellij, the Spark configuration properties are added by means of the Customizer. In this
package, ppln1, the configuration is in the test/ directories under site.frdnu the name of my cluster.
Then the JVM needs to be instructed:
--illegal-access=warn -Dartikus.spark.customizer=artikus.spark.site.frdnu.Site
In other derived packages, the Customizers can be installed in the main/ directories. The same process of activation
is needed.
The cluster is started on k1 using a systemd script.
It is often the case that a nodemanager cannot start. When this is noticed, it is possible to restart it with
start-yarn.sh. There are warnings that processes are already running, but the missing nodemanager will be started.
Session0.make will instantiate a Spark session for testing. This is placed at the start of each Test (or as a
beforeAll).
Application code should use Session.instance. This simply performs getOrCreate. This means that if the pipeline is
instantiated under YARN, it will use that Spark Session configuration.
It is possible to use artikus.spark.ctl to load jobs to run under YARN. See submit.sh
The Spark configuration properties contain a key: spark.Loader.Launchable=artikus.spark.ctl.PiLauncher
This is the key used to load the class to parse the command-line arguments.
So BaseLauncher is a Launchable and Launchable has a method called make which creates a Staging object. The Staging
object is the sequence of code to run a sequence of commands.
The BaseLaunchers uses a simple string parsing mechanism. The BaseLauncher has some maintenance operations to clean
the database and remove object directories: dbclean and ctlclean.
Each Staging object can use the BasicArgsParser to find its parameters. The BasicArgsParser only looks for
archive overwrite and filename
The pipelines are in the package artikus.spark.ppln. They load defaults that are very inaccurate, but run quickly.
They can be loaded from a properties file which must be placed at the artikus.spark.ppln with the same name as the
class, so src/test/resources/artikus/spark/ppln/WordEmbedding.properties can be loaded.
This is supposed to produce a great speed up.
Note, if a table is too large, then the Hive client - will stop working. And I may have made the same mistake with
the source data.
Resolved a number of issues:
yarn.resourcemanager.hostname key can be used to set up all threeThe configuration of the cluster uses /home/hadoop/xtra/conf/{spark,hadoop} the files are all soft-links to the files
under Git control in ~hadoop/etc/{spark,hadoop}. Except for the specializations, these are local to the xtra/
directory. Specializations are only on yarn-site.xml for n1 and j1.
Still messages there is not enough space to cache: rdd_16_1 in memory! (computed 34.4 MiB so far)
Relocated the dn directory to the /a/l/ for j1 and k1.
System properties
java.io.tmpdir => /tmp
I've added these properties: -Dlocal.hostname=${HOSTNAME} -Djava.io.tmpdir=/a/l/X-image/tmp/hadoop-tmp
It is possible to use Java system properties in Hadoop and Spark properties.
The make file ppln1.mk provides a means of running jobs on the cluster. Currently, this seems to be the only way
to run the longer pipelines. It needs the fat JAR file to run, this is produced by sbt assembly.
sbt provides the simplest test environment and logs to the file app.log in the project directory.
The LDATest class works. It needs to use Java 11. It must have --illegal-access=warn added as a VM argument.
The test class gets the right classpath.
The logging goes to the app.log file. This has stopped working for the longer pipelines.
Jupyter notebook Spylon and Python can access the classes by /misc/build/0/classes. This should be set to be a soft
link like so classes -> spark-eg0/ppln1/target/scala-2.12/classes. Unfortunately, the classes trick does not work.
It is better to build a Fat Jar and add that to the launcher.jars array.
sbt-assembly as directed in Baeldung
StopStemTest demonstrates how to run the StopStem pipeline; similarly, WordEmbedding.
It is useful to store the intermediate table in Hive. Hive now runs in multi-user mode. It uses PostgreSQL on
another server and Hive runs on the Hadoop server.
The schema of tables stored on Hive is different from when their representation within Spark. It differs in the
representation of vectors. This means there are specialized archive and unarchive methods for the Count Vectorisation
data frame.
Hive maintenance can be done with Jupyter and using Spark SQL, but Hive has a command-line client called Beeline.
Invoked as simply beeline, it supports a ReadLine interface. There is a default username and password of scott
and tiger.
jdbc:hive2://k1:10000> !connect jdbc:hive2://k1:10000 scott tiger
A sample command-line is:
beeline -u jdbc:hive2://k1:10000 -n scott -p tiger --nested-errors=true -e 'select publish_date from stage0 limit 10;'
There's an issue in testing that the WARNING Illegal access messages get in the way of the output when testing.
To suppress the messages this code fragment in a test file is necessary:
val suppressor = {
System.err.close(); System.setErr(System.out); 0
}
In local mode, the application has access to log4j2.properties
I've made various attempts to configure the logging of the YARN cluster mode.
-Dlog4j.debug=true
-Dlog4j2.Configuration.allowedProtocols=file,jar,http,https
-Dlog4j.configuration=file:///home/weaves/public_html/nlp/log4j2.properties
The HTTP mode does not work. It does seem to load from the file URL. Or the app.log target of the logger has gone
somewhere else. This needs more work, but logging is easier to see in client mode.
Some issues with the Encoder for an object. In the end, it wasn'nt needed. Here is an example found somewhere.
implicit val encodeEmployee: Encoder[Employee] = new Encoder[Employee] {
final def apply(a: Employee): Json = Json.obj(
("name", Json.fromString(a.name)),
("password", Json.fromString("[REDACTED]")),
)
}
Markdown reference.
https://daringfireball.net/projects/markdown/syntax
Spark Configuration
https://spark.apache.org/docs/latest/configuration.html#spark-properties
Spark ScalaDoc API reference
https://spark.apache.org/docs/latest/api/scala/org/apache/spark/sql/Dataset.html
This file's Emacs file variables
[ Local Variables: ]
[ mode:markdown ]
[ mode:outline-minor ]
[ mode:auto-fill ]
[ fill-column: 75 ]
[ coding: utf-8 ]
[ comment-column:50 ]
[ comment-start: "[ " ]
[ comment-end:"]" ]
[ End: ]