Without HIVE, Spark will read multi txt files from HDFS and transform them to DataFrame, which is to analyze conveniently.
pom.xml
<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>edu.berkeley</groupId> <artifactId>simple-project</artifactId> <name>Simple Project</name> <packaging>jar</packaging> <version>1.0-SNAPSHOT</version> <dependencies> <dependency> <!-- Spark dependency --> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.10</artifactId> <version>1.3.1</version> </dependency> <dependency> <!-- Spark dependency --> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.10</artifactId> <version>1.4.1</version> </dependency> </dependencies> </project>
----------------------------------------------
Alert.java
import scala.Serializable; public class Alert implements Serializable { private String id; private String alert;private String created; public String getId() { return id; } public void setId(String id) { this.id = id; } public String getAlert() { return alert; } public void setAlert(String alert) { this.alert = alert; } public String getCreated() { return created; } public void setCreated(String created) { this.created = created; } }--------------------------------------------------AlertMore.javaimport scala.Serializable; public class AlertMore implements Serializable { private String id; private String contactId; public String getContactId() { return contactId; } public void setContactId(String contactId) { this.contactId = contactId; } public String getId() { return id; } public void setId(String id) { this.id = id; } }----------------------------------------------------SimpleApp.java/* SimpleApp.java */import org.apache.spark.api.java.*; import org.apache.spark.SparkConf; import org.apache.spark.sql.*; import org.apache.spark.api.java.function.Function; public class SimpleJava { public static void main(String[] args) { String logFile = "/user/XXX/sample/contact_alerts"; // Should be some file on your system SparkConf conf = new SparkConf().setAppName("Simple Application");JavaSparkContext sc = new JavaSparkContext(conf); JavaRDD<String> logData = sc.textFile(logFile).cache(); SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);JavaRDD<Alert> alerts = logData.map(new Function<String, Alert>() { public Alert call(String line) throws Exception { Alert alert = new Alert(); alert.setId(null); alert.setAlert(null); alert.setCreated(null); String[] tokens = line.split(","); for (int i = 0; i < tokens.length; i++) { if (i == 0) alert.setId(tokens[i]); if (i == 3) alert.setAlert(tokens[i]); if (i == 7) alert.setCreated(tokens[i]); } return alert; } }); DataFrame alertDF = sqlContext.createDataFrame(alerts, Alert.class); alertDF.registerTempTable("alerts"); JavaRDD<AlertMore> alertsMore = logData.map(new Function<String, AlertMore>() { public AlertMore call(String line) throws Exception { AlertMore alertMore = new AlertMore(); alertMore.setId(null); alertMore.setContactId(null); String[] tokens = line.split(","); for (int i = 0; i < tokens.length; i++) { if (i == 0) alertMore.setId(tokens[i]); if (i == 1) alertMore.setContactId(tokens[i]); } return alertMore; } }); DataFrame alertMoreDF = sqlContext.createDataFrame(alertsMore, AlertMore.class); alertMoreDF.registerTempTable("alerts_more"); System.out.println("-----------------------------------------------------------------------"); System.out.println("DataFrame - query from alerts"); DataFrame totalAlerts = sqlContext.sql("SELECT * FROM alerts").join(alertMoreDF, alertDF.col("id").equalTo(alertMoreDF.col("id"))); totalAlerts.show();System.out.println(alertDF.filter(alertDF.col("id").gt(911111)).count());/* DataFrame from JsonDataFrame dfFromJson = sqlContext.jsonFile("/user/XXXXX/people.json");dfFromJson.show();dfFromJson.select("name").show();dfFromJson.select(dfFromJson.col("name"), dfFromJson.col("age").plus(1)).show();dfFromJson.filter(dfFromJson.col("age").gt(21)).show();dfFromJson.groupBy("age").count().show();*/} }Run:$ ./bin/spark-submit --class "SimpleJava" --master local[4] ~/work/dev/bigdata/SimpleJava/out/artifacts/SimpleJava_jar/SimpleJava.jar
if java.lang.OutOfMemoryError: GC overhead limit exceeded, added -Dspark.executor.memory=6g
No comments:
Post a Comment