pyspark下foreachPartition()向hbase中写数据,数据没有完全写入hbase中
1.问题描述
在使用pyspark过程中,遇到了一个向hbase中写数据的问题,在foreachPartition()方法中使用happybase对每个partition中的数据进行写入hbase的时候会出现数据丢失的问题,在hbase中并未完全的写入所有的数据,只写入了一小部分。
2.具体的业务代码如下:
articleVector是文章的向量,similar是文章之间的相似度
article_vector表结构如下:
create temporary table article.article_vector(
id string comment 'id',
major_id int comment 'major_id',
vector array<string> comment 'keyword vector'
);
计算相似的代码:
from pyspark.ml.feature import Word2Vecfrom pyspark.ml.linalg import Vectors
from pyspark.ml.feature import Word2VecModel
from pyspark.ml.feature import BucketedRandomProjectionLSH
articleVector = spark.sql("select * from article_vector")
def toVector(row):
return row.id, Vectors.dense(row.vector)
train = articleVector.rdd.map(toVector).toDF(["id", "vector"])
brp = BucketedRandomProjectionLSH(inputCol='vector', outputCol='hashes', seed=12345, bucketLength=1.0)
model = brp.fit(train)
similar = model.approxSimilarityJoin(train, train, 2.0, distCol='EuclideanDistance')
存储进hbase中
import happybasedef save_hbase(partitions):
pool = happybase.ConnectionPool(size=10, host='hbase-url')
with pool.connection() as conn:
article_similar = conn.table('article_similar')
for row in partitions:
article_similar.put(str(row.datasetA.id).encode(),
{'similar:{}'.format(row.datasetB.id).encode(): b'%0.4f' % (row.EuclideanDistance)})
conn.close()
similar.foreachPartition(save_hbase)
3.具体问题
article_vector中的数据量为120w条数据,取出来计算完相似度之后得到similar。但是到了save_hbase()这一步就出现问题了,程序跑的过程中并无报错,spark日志中也没有发现异常,但是最终的hbase中article_similar表中却只有6万条记录数。按理说hbase中存储的记录数应该和article_vector中的数据量一致,可以在hbase中根据每一个id查到这个id的对应的相似数据。实际上只存了6万条左右的id,只能查到六万个id对应的相似信息,为什么会这样,是happybase的存储过程中出了什么问题吗?
回答:
与happybase无关,LSH的桶长度设置过小,增大BucketedRandomProjectionLSH中的bucketLength,再增大approxSimilarityJoin中的欧氏距离的阈值。详细的可以查看pyspark.ml.feature中的BucketedRandomProjectionLSH类源码。
以上是 pyspark下foreachPartition()向hbase中写数据,数据没有完全写入hbase中 的全部内容, 来源链接: utcz.com/a/157825.html