pyspark下foreachPartition()向hbase中写数据,数据没有完全写入hbase中

pyspark下foreachPartition()向hbase中写数据,数据没有完全写入hbase中

1.问题描述

在使用pyspark过程中,遇到了一个向hbase中写数据的问题,在foreachPartition()方法中使用happybase对每个partition中的数据进行写入hbase的时候会出现数据丢失的问题,在hbase中并未完全的写入所有的数据,只写入了一小部分。

2.具体的业务代码如下:

articleVector是文章的向量,similar是文章之间的相似度

article_vector表结构如下:

create temporary table article.article_vector

(

id string comment 'id',

major_id int comment 'major_id',

vector array<string> comment 'keyword vector'

);

计算相似的代码:

from pyspark.ml.feature import Word2Vec

from pyspark.ml.linalg import Vectors

from pyspark.ml.feature import Word2VecModel

from pyspark.ml.feature import BucketedRandomProjectionLSH

articleVector = spark.sql("select * from article_vector")

def toVector(row):

return row.id, Vectors.dense(row.vector)

train = articleVector.rdd.map(toVector).toDF(["id", "vector"])

brp = BucketedRandomProjectionLSH(inputCol='vector', outputCol='hashes', seed=12345, bucketLength=1.0)

model = brp.fit(train)

similar = model.approxSimilarityJoin(train, train, 2.0, distCol='EuclideanDistance')

存储进hbase中

import happybase

def save_hbase(partitions):

pool = happybase.ConnectionPool(size=10, host='hbase-url')

with pool.connection() as conn:

article_similar = conn.table('article_similar')

for row in partitions:

article_similar.put(str(row.datasetA.id).encode(),

{'similar:{}'.format(row.datasetB.id).encode(): b'%0.4f' % (row.EuclideanDistance)})

conn.close()

similar.foreachPartition(save_hbase)

3.具体问题

article_vector中的数据量为120w条数据,取出来计算完相似度之后得到similar。但是到了save_hbase()这一步就出现问题了,程序跑的过程中并无报错,spark日志中也没有发现异常,但是最终的hbase中article_similar表中却只有6万条记录数。按理说hbase中存储的记录数应该和article_vector中的数据量一致,可以在hbase中根据每一个id查到这个id的对应的相似数据。实际上只存了6万条左右的id,只能查到六万个id对应的相似信息,为什么会这样,是happybase的存储过程中出了什么问题吗?


回答:

与happybase无关,LSH的桶长度设置过小,增大BucketedRandomProjectionLSH中的bucketLength,再增大approxSimilarityJoin中的欧氏距离的阈值。详细的可以查看pyspark.ml.feature中的BucketedRandomProjectionLSH类源码。

以上是 pyspark下foreachPartition()向hbase中写数据,数据没有完全写入hbase中 的全部内容, 来源链接: utcz.com/a/157825.html

回到顶部