Amachinelearningsystemonspark [操作系统入门]
简介
https://github.com/fanqingsong/machine_learning_system_on_spark
a simple machine learning system demo, for ML study. Based on machine_learning_system repo, add new process for ml model service with celery and spark.
技术栈
category name comment frontend
reactjs
frontend framework
frontend
redux
state management
frontend
react-C3JS
D3 based graph tool
frontend
react-bootstrap
style component library
frontend
data-ui
react data visualization tool
backend
django
backend framework
backend
django-rest-knox
authentication library
backend
djangorestframework
restful framework
backend
spark.ml
machine learning tool
架构
Generally, train process is time consumming, and predict process is quick. So set train flow as async mode, and predict flow as sync mode.
train flow
- user start model train from browser
- django recieve the "start train" message
- django schedule spark.ml celery process to train model and return to user immediately.
- browser query train status to django
- django query train status from spark.ml celery process.
- django feedback the train over status to browser
predict flow
- user input prediction features on browser, then click submit
- browser send prediction features to django
- django call prediction api with prediction features
- django feedback the prediction result to browser
+---------+ +-------------+ +------------+
| | start train| | | |
| +------------> | start | |
| | | +---train----> |
| | query train | | |
| +--status----> | | |
| | | +----query --> |
| <---train ---+ | train | |
| | over | | status | |
| browser| | django | | spark.ml |
| | | | | on celery |
| | predict | | | |
| +------------> | predict | |
| | | +----------->+ |
| <--predict---+ | | |
| | result | | | |
| | | | | |
| | | | | |
+---------+ +-------------+ +------------+
Django和Celery集成
https://www.cnblogs.com/wdliu/p/9530219.html
https://www.pythonf.cn/read/7143
Celery
https://docs.celeryproject.org/en/stable/getting-started/index.html
Task queues are used as a mechanism to distribute work across threads or machines.
A task queue’s input is a unit of work called a task. Dedicated worker processes constantly monitor task queues for new work to perform.
https://www.celerycn.io/
任务队列一般用于线程或计算机之间分配工作的一种机制。
任务队列的输入是一个称为任务的工作单元,有专门的工作进行不断的监视任务队列,进行执行新的任务工作。
Celery 通过消息机制进行通信,通常使用中间人(Broker)作为客户端和职程(Worker)调节。启动一个任务,客户端向消息队列发送一条消息,然后中间人(Broker)将消息传递给一个职程(Worker),最后由职程(Worker)进行执行中间人(Broker)分配的任务。
Celery 可以有多个职程(Worker)和中间人(Broker),用来提高Celery的高可用性以及横向扩展能力。
Celery 是用 Python 编写的,但协议可以用任何语言实现。除了 Python 语言实现之外,还有Node.js的node-celery和php的celery-php。
可以通过暴露 HTTP 的方式进行,任务交互以及其它语言的集成开发。
https://docs.celeryproject.org/en/stable/reference/index.html
celery
— Distributed processingThis module is the main entry-point for the Celery API. It includes commonly needed things for calling tasks, and creating Celery applications.
Celery
Celery application instance
group
group tasks together
chain
chain tasks together
chord
chords enable callbacks for groups
signature()
create a new task signature
Signature
object describing a task invocation
current_app
proxy to the current application instance
current_task
proxy to the currently executing task
Spark.ml
https://spark.apache.org/docs/2.1.0/api/python/pyspark.ml.html#
- ML Pipeline APIs
- Transformer
- Estimator
- Model
- Pipeline
- PipelineModel
- pyspark.ml.clustering module
- BisectingKMeans
- BisectingKMeansModel
- BisectingKMeansSummaryE
- KMeans
- KMeansModel
- GaussianMixture
- GaussianMixtureModel
- GaussianMixtureSummaryE
- LDA
- LDAModel
- LocalLDAModel
- DistributedLDAModel
http://dblab.xmu.edu.cn/blog/1779-2/
KMeans 是一个迭代求解的聚类算法,其属于 划分(Partitioning) 型的聚类方法,即首先创建K个划分,然后迭代地将样本从一个划分转移到另一个划分来改善最终聚类的质量。
ML包下的KMeans方法位于org.apache.spark.ml.clustering包下,其过程大致如下:
1.根据给定的k值,选取k个样本点作为初始划分中心;
2.计算所有样本点到每一个划分中心的距离,并将所有样本点划分到距离最近的划分中心;
3.计算每个划分中样本点的平均值,将其作为新的中心;
循环进行2~3步直至达到最大迭代次数,或划分中心的变化小于某一预定义阈值
显然,初始划分中心的选取在很大程度上决定了最终聚类的质量,和MLlib包一样,ML包内置的KMeans类也提供了名为 KMeans||
的初始划分中心选择方法,它是著名的 KMeans++
方法的并行化版本,其思想是令初始聚类中心尽可能的互相远离,具体实现细节可以参见斯坦福大学的B Bahmani在PVLDB上的论文Scalable
K-Means++,这里不再赘述。
spark sql
https://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html#
- pyspark.sql.SparkSession
Main entry point for DataFrame and SQL functionality.
- pyspark.sql.DataFrame
A distributed collection of data grouped into named columns.
- pyspark.sql.Column
A column expression in a DataFrame.
- pyspark.sql.Row
A row of data in a DataFrame.
- pyspark.sql.GroupedData
Aggregation methods, returned by DataFrame.groupBy().
- pyspark.sql.DataFrameNaFunctions
Methods for handling missing data (null values).
- pyspark.sql.DataFrameStatFunctions
Methods for statistics functionality.
- pyspark.sql.functions
List of built-in functions available for DataFrame.
- pyspark.sql.types
List of data types available.
- pyspark.sql.Window
For working with window functions.
https://www.cnblogs.com/Finley/p/6390528.html
DataFrame提供了一些常用操作的实现, 可以使用这些接口查看或修改DataFrame:
df.collect()
: 以Row列表的方式显示df中的所有数据df.show()
: 以可视化表格的方式打印df中的所有数据df.count()
: 显示df中数据的行数df.describe()
返回一个新的DataFrame对象包含对df中数值列的统计数据df.cache()
: 以MEMORY_ONLY_SER
方式进行持久化df.persist(level)
: 以指定的方式进行持久化df.unpersist()
: 删除缓存DataFrame的一些属性可以用于查看它的结构信息:
df.columns
: 返回各列名称的列表
df.schema
: 以StructType对象的形式返回df的表结构
df.dtypes
: 以列表的形式返回每列的名称和类型。
[(‘name‘, ‘string‘), (‘id‘, ‘int‘)]
df.rdd
将DataFrame对象转换为rddDataFrame支持使用Map和Reduce操作:
df.map(func)
: 等同于df.rdd.map(func)
df.reduce(func)
: 等同于df.rdd.reduce(func)
A machine learning system on spark
以上是 Amachinelearningsystemonspark [操作系统入门] 的全部内容, 来源链接: utcz.com/z/519313.html