Friday, September 11, 2015

transform txt file of HDFS to DataFrame in Spark, and join multi DataFrames

reference

Without HIVE, Spark will read multi txt files from HDFS and transform them to DataFrame, which is to analyze conveniently.

pom.xml
<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0"         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>edu.berkeley</groupId>
    <artifactId>simple-project</artifactId>
    <name>Simple Project</name>
    <packaging>jar</packaging>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <dependency> <!-- Spark dependency -->            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.10</artifactId>
            <version>1.3.1</version>
        </dependency>

        <dependency> <!-- Spark dependency -->            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.10</artifactId>
            <version>1.4.1</version>
        </dependency>
    </dependencies>

</project>

----------------------------------------------
Alert.java
import scala.Serializable;

public class Alert implements Serializable {
    private String id;
    private String alert;
    private String created;


    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public String getAlert() {
        return alert;
    }

    public void setAlert(String alert) {
        this.alert = alert;
    }

    public String getCreated() {
        return created;
    }

    public void setCreated(String created) {
        this.created = created;
    }
}
--------------------------------------------------
AlertMore.java
import scala.Serializable;

public class AlertMore implements Serializable {
    private String id;
    private String contactId;

    public String getContactId() {
        return contactId;
    }

    public void setContactId(String contactId) {
        this.contactId = contactId;
    }

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }
}
----------------------------------------------------
SimpleApp.java
/* SimpleApp.java */import org.apache.spark.api.java.*;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.*;
import org.apache.spark.api.java.function.Function;


public class SimpleJava {
    public static void main(String[] args) {
        String logFile = "/user/XXX/sample/contact_alerts"; // Should be some file on your system        SparkConf conf = new SparkConf().setAppName("Simple Application");
        JavaSparkContext sc = new JavaSparkContext(conf);

        JavaRDD<String> logData = sc.textFile(logFile).cache();

        SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);
JavaRDD<Alert> alerts = logData.map(new Function<String, Alert>() {
    public Alert call(String line) throws Exception {
        Alert alert = new Alert();
        alert.setId(null);
        alert.setAlert(null);
        alert.setCreated(null);

        String[] tokens = line.split(",");
        for (int i = 0; i < tokens.length; i++) {
            if (i == 0) alert.setId(tokens[i]);
            if (i == 3) alert.setAlert(tokens[i]);
            if (i == 7) alert.setCreated(tokens[i]);
        }

        return alert;
    }
});
DataFrame alertDF = sqlContext.createDataFrame(alerts, Alert.class);
alertDF.registerTempTable("alerts");

JavaRDD<AlertMore> alertsMore = logData.map(new Function<String, AlertMore>() {
    public AlertMore call(String line) throws Exception {
        AlertMore alertMore = new AlertMore();
        alertMore.setId(null);
        alertMore.setContactId(null);

        String[] tokens = line.split(",");
        for (int i = 0; i < tokens.length; i++) {
            if (i == 0) alertMore.setId(tokens[i]);
            if (i == 1) alertMore.setContactId(tokens[i]);
        }

        return alertMore;
    }
});
DataFrame alertMoreDF = sqlContext.createDataFrame(alertsMore, AlertMore.class);
alertMoreDF.registerTempTable("alerts_more");

System.out.println("-----------------------------------------------------------------------");
System.out.println("DataFrame - query from alerts");
DataFrame totalAlerts = sqlContext.sql("SELECT * FROM alerts").join(alertMoreDF, alertDF.col("id").equalTo(alertMoreDF.col("id")));
totalAlerts.show();
System.out.println(alertDF.filter(alertDF.col("id").gt(911111)).count());

/*  DataFrame from JsonDataFrame dfFromJson = sqlContext.jsonFile("/user/XXXXX/people.json");dfFromJson.show();dfFromJson.select("name").show();dfFromJson.select(dfFromJson.col("name"), dfFromJson.col("age").plus(1)).show();dfFromJson.filter(dfFromJson.col("age").gt(21)).show();dfFromJson.groupBy("age").count().show();*/
} }


Run:
$ ./bin/spark-submit --class "SimpleJava" --master local[4] ~/work/dev/bigdata/SimpleJava/out/artifacts/SimpleJava_jar/SimpleJava.jar
if java.lang.OutOfMemoryError: GC overhead limit exceeded, added -Dspark.executor.memory=6g

No comments:

Post a Comment