Skip to content
This repository was archived by the owner on Apr 23, 2024. It is now read-only.

Commit 24ff555

Browse files
author
Bruno Silva
committed
Some classes of stock prices prediction
1 parent 2a594e6 commit 24ff555

File tree

9 files changed

+413
-48
lines changed

9 files changed

+413
-48
lines changed

.gitignore

+3-1
Original file line numberDiff line numberDiff line change
@@ -9,4 +9,6 @@
99
/project
1010
/derby.log
1111
/metastore_db
12-
/spark-warehouse
12+
/spark-warehouse
13+
/stocks.parquet
14+
/emas.parquet

hadoop/bin/winutils.exe

45.9 KB
Binary file not shown.

src/main/scala/com/dev/bruno/ml/SparkApp.scala

-47
This file was deleted.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
package com.dev.bruno.ml.lr
2+
3+
import java.io.File
4+
5+
object ClosePriceTask {
6+
7+
import org.apache.spark.SparkConf
8+
import org.apache.spark.ml.feature.VectorAssembler
9+
import org.apache.spark.ml.regression.LinearRegression
10+
import org.apache.spark.sql.SparkSession
11+
12+
def main(args: Array[String]): Unit = {
13+
val source = new File("./stocks.parquet")
14+
15+
if (!source.exists || !source.isDirectory) {
16+
System.out.println("Please execute CsvMergeTask class before.")
17+
return
18+
}
19+
20+
// Dependency to run in standalone mode on windows
21+
val hadoopFolder = new File("./hadoop").getAbsolutePath
22+
System.setProperty("hadoop.home.dir", hadoopFolder)
23+
24+
// Basic configuration
25+
val conf = new SparkConf()
26+
.setAppName("ClosePriceTask")
27+
.setMaster("local[*]")
28+
29+
// Initialization of Spark And Spark SQL Context
30+
val sqlContext = SparkSession.builder.config(conf).getOrCreate
31+
32+
// Columns to be use as input in Linear Regression Algorithm
33+
val features = Array("Open", "High", "Low", "NameIndex")
34+
35+
// It is necessary to aggregate all features in one array
36+
// to use Linear Regression Algorithm
37+
val assembler = new VectorAssembler()
38+
.setInputCols(features)
39+
.setOutputCol("features")
40+
41+
val dataset = sqlContext.read.parquet("stocks.parquet")
42+
43+
val featuredDataset = assembler.transform(dataset).sort("Date")
44+
45+
// Split our dataset in two random ones for training and testing
46+
val trainingDataset = featuredDataset.filter("Date <= '2016-12-31'")
47+
val testDataset = featuredDataset.filter("Date > '2016-12-31'")
48+
49+
// Linear Regression Algorithm
50+
// TODO Try to understand why we need to use setLabelCol
51+
val linearRegression = new LinearRegression()
52+
.setLabelCol("Close")
53+
.setFeaturesCol("features")
54+
.setPredictionCol("ClosePredicted")
55+
56+
// Our training model to use in prediction
57+
val model = linearRegression.fit(trainingDataset)
58+
59+
// A new column called prediction will be included in testDataset
60+
val predictedDataset = model.transform(testDataset)
61+
62+
// Selecting only important columns to compare and show
63+
predictedDataset.select("Date", "Name", "Close", "ClosePredicted").show()
64+
65+
sqlContext.close()
66+
}
67+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
package com.dev.bruno.ml.lr
2+
3+
import java.io.File
4+
5+
object NextOpenPriceTask {
6+
7+
import org.apache.spark.SparkConf
8+
import org.apache.spark.ml.feature.VectorAssembler
9+
import org.apache.spark.ml.regression.LinearRegression
10+
import org.apache.spark.sql.SparkSession
11+
12+
def main(args: Array[String]): Unit = {
13+
val source = new File("./stocks.parquet")
14+
15+
if (!source.exists || !source.isDirectory) {
16+
System.out.println("Please execute CsvMergeTask class before.")
17+
return
18+
}
19+
20+
// Dependency to run in standalone mode on windows
21+
val hadoopFolder = new File("./hadoop").getAbsolutePath
22+
System.setProperty("hadoop.home.dir", hadoopFolder)
23+
24+
// Basic configuration
25+
val conf = new SparkConf()
26+
.setAppName("NextOpenPriceTask")
27+
.setMaster("local[*]")
28+
29+
// Initialization of Spark And Spark SQL Context
30+
val sqlContext = SparkSession.builder.config(conf).getOrCreate
31+
32+
val dataset = sqlContext.read.parquet("stocks.parquet")
33+
34+
// Geting the NextOpenPrice for all dataset
35+
dataset.createOrReplaceTempView("temp_stocks")
36+
37+
val nextOpenDatasetSql = "select date_add(Date, -1) as Date, " + "NameIndex, Open as NextOpenPrice from temp_stocks "
38+
39+
val nextOpenDataset = sqlContext.sql(nextOpenDatasetSql)
40+
nextOpenDataset.createOrReplaceTempView("temp_next_openprice")
41+
42+
val sql = "select s.*, o.NextOpenPrice from temp_stocks s, temp_next_openprice o" + " where to_date(s.Date) = o.Date and s.NameIndex = o.NameIndex"
43+
val updatedDataset = sqlContext.sql(sql)
44+
45+
// Columns to be use as input in Linear Regression Algorithm
46+
val features = Array("Open", "Close", "High", "Low", "NameIndex")
47+
48+
// It is necessary to aggregate all features in one array
49+
// to use Linear Regression Algorithm
50+
val assembler = new VectorAssembler()
51+
.setInputCols(features)
52+
.setOutputCol("features")
53+
54+
// Linear Regration Algorithm
55+
// TODO Try to understand why we need to use setLabelCol
56+
val linearRegression = new LinearRegression
57+
linearRegression.setLabelCol("NextOpenPrice")
58+
linearRegression.setFeaturesCol("features")
59+
linearRegression.setPredictionCol("NextOpenPricePredicted")
60+
61+
val featuredDataset = assembler.transform(updatedDataset).sort("Date")
62+
63+
// Split our dataset in two random ones for training and testing
64+
val trainingDataset = featuredDataset.filter("Date <= '2016-12-31'")
65+
val testDataset = featuredDataset.filter("Date > '2016-12-31'")
66+
67+
// Our training model to use in prediction
68+
val model = linearRegression.fit(trainingDataset)
69+
70+
// A new column called prediction will be included in testDataset
71+
val predictedDataset = model.transform(testDataset)
72+
73+
// Selecting only important columns to compare and show
74+
predictedDataset.select("Date", "Name", "NextOpenPrice", "NextOpenPricePredicted").show()
75+
76+
sqlContext.close()
77+
}
78+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
package com.dev.bruno.ml.model
2+
3+
import java.sql.Timestamp
4+
5+
@SerialVersionUID(100L)
6+
class Ema extends Serializable {
7+
8+
private var _nameIndex: Double = .0
9+
10+
private var _ema6: Double = .0
11+
12+
private var _ema10: Double = .0
13+
14+
private var _date: Timestamp = _
15+
16+
def this(nameIndex: Double, ema6: Double, ema10: Double, date: Timestamp) {
17+
this()
18+
this._nameIndex = nameIndex
19+
this._date = date
20+
this._ema6 = ema6
21+
this._ema10 = ema10
22+
}
23+
24+
def nameIndex: Double = _nameIndex
25+
26+
def nameIndex(nameIndex: Double): Unit = {
27+
this._nameIndex = nameIndex
28+
}
29+
30+
def date: Timestamp = _date
31+
32+
def date(date: Timestamp): Unit = {
33+
this._date = date
34+
}
35+
36+
def ema6: Double = _ema6
37+
38+
def ema6(ema6: Double): Unit = {
39+
this._ema6 = ema6
40+
}
41+
42+
def ema10: Double = _ema10
43+
44+
def ema10(ema10: Double): Unit = {
45+
this._ema10 = ema10
46+
}
47+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
package com.dev.bruno.ml.model
2+
3+
import java.sql.Timestamp
4+
5+
@SerialVersionUID(100L)
6+
class Stock extends Serializable {
7+
8+
private var _name: String = _
9+
10+
private var _nameIndex: Double = .0
11+
12+
private var _date: Timestamp = _
13+
14+
private var _open: Double = .0
15+
16+
private var _close: Double = .0
17+
18+
private var _low: Double = .0
19+
20+
private var _high: Double = .0
21+
22+
private var _volume: Double = .0
23+
24+
def name: String = _name
25+
26+
def name(name: String): Unit = {
27+
this._name = name
28+
}
29+
30+
def nameIndex: Double = _nameIndex
31+
32+
def nameIndex(nameIndex: Double): Unit = {
33+
this._nameIndex = nameIndex
34+
}
35+
36+
def date: Timestamp = _date
37+
38+
def date(date: Timestamp): Unit = {
39+
this._date = date
40+
}
41+
42+
def open: Double = _open
43+
44+
def open(open: Double): Unit = {
45+
this._open = open
46+
}
47+
48+
def close: Double = _close
49+
50+
def close(close: Double): Unit = {
51+
this._close = close
52+
}
53+
54+
def low: Double = _low
55+
56+
def low(low: Double): Unit = {
57+
this._low = low
58+
}
59+
60+
def high: Double = _high
61+
62+
def high(high: Double): Unit = {
63+
this._high = high
64+
}
65+
66+
def volume: Double = _volume
67+
68+
def volume(volume: Double): Unit = {
69+
this._volume = volume
70+
}
71+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
package com.dev.bruno.ml.util
2+
3+
import java.io.File
4+
5+
import org.apache.spark.SparkConf
6+
import org.apache.spark.ml.feature.StringIndexer
7+
import org.apache.spark.sql._
8+
9+
object CsvMergeTask {
10+
11+
def main(args: Array[String]): Unit = {
12+
13+
if (args.length == 0) {
14+
println("Please inform as args the location of CSV files.")
15+
return
16+
}
17+
18+
// Dependency to run in standalone mode on windows
19+
val hadoopFolder = new File("./hadoop").getAbsolutePath
20+
System.setProperty("hadoop.home.dir", hadoopFolder)
21+
22+
// Basic configuration
23+
val conf = new SparkConf()
24+
.setAppName("CsvMergeTask")
25+
.setMaster("local[*]")
26+
27+
// Initialization Spark SQL Context
28+
val sqlContext = SparkSession.builder.config(conf).getOrCreate
29+
30+
val sparkContext = sqlContext.sparkContext
31+
32+
val reader = sqlContext.read
33+
.format("com.databricks.spark.csv")
34+
.option("header", "true") // The CSV file has header and use them as column names
35+
.option("inferSchema", "true") // Discover the column types
36+
37+
//Loading CSV files directory
38+
val filter: String = "Open is not null and High is not null and Low is not null " + " and Volume is not null and Date is not null and Name is not null"
39+
val dataset = reader.load(args(0)).filter(filter).distinct()
40+
41+
// Creating a index to use Name as a feature on Linear Regression
42+
val indexer = new StringIndexer()
43+
.setInputCol("Name")
44+
.setOutputCol("NameIndex")
45+
46+
val indexedDataset = indexer.fit(dataset).transform(dataset)
47+
48+
// Saving the dataset as parquet
49+
indexedDataset.write.mode(SaveMode.Overwrite).parquet("stocks.parquet")
50+
}
51+
}

0 commit comments

Comments
 (0)