spark java API 实现二次排序
package com.spark.sort;import java.io.Serializable;
import scala.math.Ordered;
public class SecondSortKey implements Serializable, Ordered<SecondSortKey> {
/**
* serialVersionUID
*/
private static final long serialVersionUID = -2749925310062789494L;
private String first;
private long second;
public SecondSortKey(String first, long second) {
super();
this.first = first;
this.second = second;
}
public String getFirst() {
return first;
}
public void setFirst(String first) {
this.first = first;
}
public long getSecond() {
return second;
}
public void setSecond(long second) {
this.second = second;
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((first == null) ? 0 : first.hashCode());
result = prime * result + (int) (second ^ (second >>> 32));
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
SecondSortKey other = (SecondSortKey) obj;
if (first == null) {
if (other.first != null)
return false;
} else if (!first.equals(other.first))
return false;
if (second != other.second)
return false;
return true;
}
@Override
public boolean $greater(SecondSortKey that) {
if (this.first.compareTo(that.getFirst()) > 0) {
return true;
} else if (this.first.equals(that.getFirst()) && this.second > that.getSecond()) {
return true;
}
return false;
}
@Override
public boolean $greater$eq(SecondSortKey that) {
if (this.$greater(that)) {
return true;
}else if(this.first.equals(that.getFirst()) && this.second == that.getSecond()){
return true;
}
return false;
}
@Override
public boolean $less(SecondSortKey that) {
if (this.first.compareTo(that.getFirst()) < 0) {
return true;
} else if (this.first.equals(that.getFirst()) && this.second < that.getSecond()) {
return true;
}
return false;
}
@Override
public boolean $less$eq(SecondSortKey that) {
if (this.$less(that)) {
return true;
}else if(this.first.equals(that.getFirst()) && this.second == that.getSecond()){
return true;
}
return false;
}
@Override
public int compare(SecondSortKey that) {
if (this.first.compareTo(that.getFirst()) != 0) {
return this.first.compareTo(that.getFirst());
} else {
return (int) (this.second - that.getSecond());
}
}
@Override
public int compareTo(SecondSortKey that) {
if (this.first.compareTo(that.getFirst()) != 0) {
return this.first.compareTo(that.getFirst());
} else {
return (int) (this.second - that.getSecond());
}
}
}
1
package com.spark.sort;2
3 import org.apache.spark.SparkConf;
4 import org.apache.spark.api.java.JavaPairRDD;
5 import org.apache.spark.api.java.JavaRDD;
6 import org.apache.spark.api.java.JavaSparkContext;
7 import org.apache.spark.api.java.function.Function;
8 import org.apache.spark.api.java.function.PairFunction;
9
10 import scala.Tuple2;
11
12 public class SecondSort {
13
14 public static void main(String[] args) {
15 SparkConf sparkConf = new SparkConf().setAppName("secondsort").setMaster("local");
16 JavaSparkContext jsc = new JavaSparkContext(sparkConf);
17 JavaRDD<String> textFileRDD = jsc.textFile("D:\\test\\input\\sort");
18 JavaPairRDD<SecondSortKey,String> pairRDD = textFileRDD.mapToPair(new PairFunction<String, SecondSortKey, String>() {
19 @Override
20 public Tuple2<SecondSortKey, String> call(String t) throws Exception {
21 String[] split = t.split("\t");
22 String first = split[0];
23 Long second = Long.valueOf(split[1]);
24 SecondSortKey ssk = new SecondSortKey(first, second);
25 return new Tuple2<SecondSortKey, String>(ssk, t);
26 }
27 });
28
29 //排序
30 JavaPairRDD<SecondSortKey, String> sortByKeyRDD =pairRDD.sortByKey();
31
32 //过滤自定义的key
33 JavaRDD<String> mapRDD = sortByKeyRDD.map(new Function<Tuple2<SecondSortKey,String>, String>() {
34
35 @Override
36 public String call(Tuple2<SecondSortKey, String> v1) throws Exception {
37
38 return v1._2;
39 }
40 });
41
42 mapRDD.saveAsTextFile("D:\\test\\output\\sort");
43
44 jsc.close();
45 }
46
47
48 }
源数据:
a 12
a 2
b 26
c 85
ab 32
ab 23
ac 12
b 85
a 36
b 69
c 25
排序之后:
a 2
a 12
a 36
ab 23
ab 32
ac 12
b 26
b 69
b 85
c 25
c 85
以上是 spark java API 实现二次排序 的全部内容, 来源链接: utcz.com/z/393459.html