reference
Without HIVE, Spark will read multi txt files from HDFS and transform them to DataFrame, which is to analyze conveniently.
pom.xml
<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>edu.berkeley</groupId>
<artifactId>simple-project</artifactId>
<name>Simple Project</name>
<packaging>jar</packaging>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency> <!-- Spark dependency --> <groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.3.1</version>
</dependency>
<dependency> <!-- Spark dependency --> <groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.10</artifactId>
<version>1.4.1</version>
</dependency>
</dependencies>
</project>
----------------------------------------------
Alert.java
import scala.Serializable;
public class Alert implements Serializable {
private String id;
private String alert;
private String created;
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getAlert() {
return alert;
}
public void setAlert(String alert) {
this.alert = alert;
}
public String getCreated() {
return created;
}
public void setCreated(String created) {
this.created = created;
}
}
--------------------------------------------------
AlertMore.java
import scala.Serializable;
public class AlertMore implements Serializable {
private String id;
private String contactId;
public String getContactId() {
return contactId;
}
public void setContactId(String contactId) {
this.contactId = contactId;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
}
----------------------------------------------------
SimpleApp.java
/* SimpleApp.java */import org.apache.spark.api.java.*;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.*;
import org.apache.spark.api.java.function.Function;
public class SimpleJava {
public static void main(String[] args) {
String logFile = "/user/XXX/sample/contact_alerts"; // Should be some file on your system SparkConf conf = new SparkConf().setAppName("Simple Application");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> logData = sc.textFile(logFile).cache();
SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);JavaRDD<Alert> alerts = logData.map(new Function<String, Alert>() {
public Alert call(String line) throws Exception {
Alert alert = new Alert();
alert.setId(null);
alert.setAlert(null);
alert.setCreated(null);
String[] tokens = line.split(",");
for (int i = 0; i < tokens.length; i++) {
if (i == 0) alert.setId(tokens[i]);
if (i == 3) alert.setAlert(tokens[i]);
if (i == 7) alert.setCreated(tokens[i]);
}
return alert;
}
});
DataFrame alertDF = sqlContext.createDataFrame(alerts, Alert.class);
alertDF.registerTempTable("alerts");
JavaRDD<AlertMore> alertsMore = logData.map(new Function<String, AlertMore>() {
public AlertMore call(String line) throws Exception {
AlertMore alertMore = new AlertMore();
alertMore.setId(null);
alertMore.setContactId(null);
String[] tokens = line.split(",");
for (int i = 0; i < tokens.length; i++) {
if (i == 0) alertMore.setId(tokens[i]);
if (i == 1) alertMore.setContactId(tokens[i]);
}
return alertMore;
}
});
DataFrame alertMoreDF = sqlContext.createDataFrame(alertsMore, AlertMore.class);
alertMoreDF.registerTempTable("alerts_more");
System.out.println("-----------------------------------------------------------------------");
System.out.println("DataFrame - query from alerts");
DataFrame totalAlerts = sqlContext.sql("SELECT * FROM alerts").join(alertMoreDF, alertDF.col("id").equalTo(alertMoreDF.col("id")));
totalAlerts.show();
System.out.println(alertDF.filter(alertDF.col("id").gt(911111)).count());
/* DataFrame from JsonDataFrame dfFromJson = sqlContext.jsonFile("/user/XXXXX/people.json");dfFromJson.show();dfFromJson.select("name").show();dfFromJson.select(dfFromJson.col("name"), dfFromJson.col("age").plus(1)).show();dfFromJson.filter(dfFromJson.col("age").gt(21)).show();dfFromJson.groupBy("age").count().show();*/
}
}
Run:
$ ./bin/spark-submit --class "SimpleJava" --master local[4] ~/work/dev/bigdata/SimpleJava/out/artifacts/SimpleJava_jar/SimpleJava.jar
if java.lang.OutOfMemoryError: GC overhead limit exceeded, added -Dspark.executor.memory=6g