时序数据库InfluxIOx源码学习四(Run命令的执行)

database

欢迎关注公众号:

上篇介绍到:InfluxDB-IOx的命令行及配置,详情见:https://my.oschina.net/u/3374539/blog/5017858

这章记录一下Run命令的执行过程。

 //根据用户在命令行配置的num_threads参数

//来选择创建一个多线程的模型,还是current_thread的模型

//后面有时间深入研究tokio的时候再来分析有什么异同

let tokio_runtime = get_runtime(config.num_threads)?;

//block_on会让线程一直等待方法里的future执行完成

//这是让闭包中的方法占有了io driver 和 timer context

tokio_runtime.block_on(async move {

let host = config.host;

match config.command {

// 省略其它command ...

Command::Run(config) => {

//具体去子类型里执行,然后await一个结果

if let Err(e) = commands::run::command(logging_level, *config).await {

eprintln!("Server command failed: {}", e);

std::process::exit(ReturnCode::Failure as _)

}

}

}

});

influxdb_ioxd::main方法中,忽略一些不太需要重点关注的,分别是初始化log的管理、PanicsTracingCancellationToken等。

    //初始化对象存储

let object_store = ObjectStore::try_from(&config)?;

//可以看到,目前已经支持了

//1.内存(在container环境运行时候使用)

//2.Google

//3.S3

//4.Azure

//5.File 本地文件,方便开发者调试运行在云上时候的文件变化

fn try_from(config: &Config) -> Result<Self, Self::Error> {

match config.object_store {

Some(ObjStoreOpt::Memory) | None => {

//创建一个btreemap用来缓存或者搜索

Ok(Self::new_in_memory(object_store::memory::InMemory::new()))

}

Some(ObjStoreOpt::Google) => {

// 省略

}

Some(ObjStoreOpt::S3) => {

// 省略

}

Some(ObjStoreOpt::Azure) => {

// 省略

}

Some(ObjStoreOpt::File) => match config.database_directory.as_ref() {

Some(db_dir) => {

//去递归创建这个配置路径中的文件夹

//context也是使用的snafu来处理错误的

fs::create_dir_all(db_dir)

.context(CreatingDatabaseDirectory { path: db_dir })?;

//都创建完成,并且没出错误,把路径保存起来

Ok(Self::new_file(object_store::disk::File::new(&db_dir)))

}

// 如果database_directory这个参数没有配置的时候

//使用snafu这个crate来返回一个错误

None => MissingObjectStoreConfig {

object_store: ObjStoreOpt::File,

missing: "data-dir",

}

.fail(),

},

}

}

关于错误处理的代码:

 #[snafu(display("Unable to create database directory {:?}: {}", path, source))]

CreatingDatabaseDirectory {

path: PathBuf,

source: std::io::Error,

},

#[snafu(display(

"Specified {} for the object store, required configuration missing for {}",

object_store,

missing

))]

MissingObjectStoreConfig {

object_store: ObjStoreOpt,

missing: String,

},

我们来测试一下错误的场景,来看看是否符合代码的预期。

// 不传入路径

cargo run run --object-store file

Finished dev [unoptimized + debuginfo] target(s) in 0.42s

Running `./influxdb_iox run --object-store file`

Apr 15 13:38:34.352 INFO influxdb_iox::influxdb_ioxd: Using File for object storage

Server command failed: Run: Specified File for the object store, required configuration missing for data-dir

//传入一个创建不了的路径

cargo run run --object-store file --data-dir /root/1/1

Finished dev [unoptimized + debuginfo] target(s) in 0.47s

Running `./influxdb_iox run --object-store file --data-dir /root/1/1`

Apr 15 13:45:26.664 INFO influxdb_iox::influxdb_ioxd: Using File for object storage

Server command failed: Run: Unable to create database directory "/root/1/1": Read-only file system (os error 30)

可以看到是符合预期的,bingo

//创建一个空的结构体

let connection_manager = ConnectionManager {};

//创建AppServer结构体用来保存基本的信息

//server_config里就是保存的对象存储的信息及线程配置

//如果num_worker_threads没有填写,默认就使用cpu数量

let app_server = Arc::new(AppServer::new(connection_manager, server_config));

//不设置这个writer_id能启动,但是不能做任何操作

if let Some(id) = config.writer_id {

//compare and set 一个非0的数值,错误就打印一个指定的panic

app_server.set_id(id).expect("writer id already set");

//校验所有的配置

if let Err(e) = app_server.load_database_configs().await {

error!(

"unable to load database configurations from object storage: {}",

e

)

}

} else {

warn!("server ID not set. ID must be set via the INFLUXDB_IOX_ID config or API before writing or querying data.");

}

接下来进入load_database_configs方法看看,

let list_result = self

.store

//把write_id和配置的文件路径组合一下,作为一个目录

//遍历文件夹中的所有东西,用一个BTreeSet存所有子文件夹

//用Vec存下所有的文件信息,包括路径、修改时间、大小等

.list_with_delimiter(&self.root_path()?)

.await

.context(StoreError)?;

//拿到配置的server的write_id

let server_id = self.require_id()?;

let handles: Vec<_> = list_result

//配置的文件夹下的所有文件夹

.common_prefixes

.into_iter()

//全部进行map转换

.map(|mut path| {

let store = Arc::clone(&self.store);

let config = Arc::clone(&self.config);

let exec = Arc::clone(&self.exec);

//先找database的相关信息文件,名字叫rules.pb

path.set_file_name(DB_RULES_FILE_NAME);

//感觉是需要io来读取文件内容,所以开一个异步

tokio::task::spawn(async move {

let mut res = get_store_bytes(&path, &store).await;

//省略错误处理。。

let res = res.unwrap().freeze();

//解析文件内容,根据文件名可以看出是个pb文件。

match DatabaseRules::decode(res) {

Err(e) => {

//省略错误。。

}

//根据解析出来的文件内容,在内存中恢复回来db的相关信息

Ok(rules) => match config.create_db(rules) {

Err(e) => error!("error adding database to config: {}", e),

//提交一个后台任务,用来不断的检测chunks的状态

//比如达到了某个大小,然后写入到存储等

Ok(handle) => handle.commit(server_id, store, exec),

},

}

})

})

.collect();

//等待所有任务完成

futures::future::join_all(handles).await;

这里就启动完成了一个基本的服务,创建了存储路径、初始化数据库的基本配置、启动了一个用来刷盘、整理chunk的后台任务。

接下来就是启动连接相关的了。

    //从启动命令行中读取grpc的地址

let grpc_bind_addr = config.grpc_bind_address;

//绑定这个地址

let socket = tokio::net::TcpListener::bind(grpc_bind_addr)

.await

.context(StartListeningGrpc { grpc_bind_addr })?;

//真正的协议启动

let grpc_server = rpc::serve(socket, Arc::clone(&app_server), frontend_shutdown.clone()).fuse();

//同样的启动http相关的服务,使用的hyper库

let bind_addr = config.http_bind_address;

let addr = AddrIncoming::bind(&bind_addr).context(StartListeningHttp { bind_addr })?;

let http_server = http::serve(addr, Arc::clone(&app_server), frontend_shutdown.clone()).fuse();

//省略后面的停止流程。。。

然后看grpc的启动的服务

    //启动起来健康检查的服务

let stream = TcpListenerStream::new(socket);

let (mut health_reporter, health_service) = tonic_health::server::health_reporter();

//标识相对应的服务已经是可以提供服务的状态了

let services = [

generated_types::STORAGE_SERVICE,

generated_types::IOX_TESTING_SERVICE,

generated_types::ARROW_SERVICE,

];

for service in &services {

health_reporter

.set_service_status(service, tonic_health::ServingStatus::Serving)

.await;

}

//增加一堆使用grpc的服务,并启动起来

tonic::transport::Server::builder()

.add_service(health_service)

.add_service(testing::make_server())

.add_service(storage::make_server(Arc::clone(&server)))

.add_service(flight::make_server(Arc::clone(&server)))

.add_service(write::make_server(Arc::clone(&server)))

.add_service(management::make_server(Arc::clone(&server)))

.add_service(operations::make_server(server))

.serve_with_incoming_shutdown(stream, shutdown.cancelled())

.await

然后是http相关的启动

pub async fn serve<M>(

addr: AddrIncoming,

server: Arc<AppServer<M>>,

shutdown: CancellationToken,

) -> Result<(), hyper::Error>

where

M: ConnectionManager + Send + Sync + Debug + "static,

{

//初始化路由相关的信息

let router = router(server);

let service = RouterService::new(router).unwrap();

//启动服务

hyper::Server::builder(addr)

.serve(service)

.with_graceful_shutdown(shutdown.cancelled())

.await

}

顺便看一下都提供了哪些地址可以被访问的:

 Router::builder()

.data(server)

//写了一个拦截,打印请求参数和返回结果

.middleware(Middleware::pre(|req| async move {

debug!(request = ?req, "Processing request");

Ok(req)

}))

.middleware(Middleware::post(|res| async move {

debug!(response = ?res, "Successfully processed request");

Ok(res)

})) // this endpoint is for API backward compatibility with InfluxDB 2.x

.post("/api/v2/write", write::<M>)

.get("/health", health)

.get("/metrics", handle_metrics)

.get("/iox/api/v1/databases/:name/query", query::<M>)

.get("/iox/api/v1/databases/:name/wal/meta", get_wal_meta::<M>)

.get("/api/v1/partitions", list_partitions::<M>)

.post("/api/v1/snapshot", snapshot_partition::<M>)

//错误的时候调用的处理拦截

.err_handler_with_info(error_handler)

.build()

.unwrap()

做一个/health的测试:

curl localhost:8080/health

OK%

可以看到成功返回了值。

到这里基本启动就完成了,后面再用到的时候会继续对启动里的细节做研究,比如PanicsLog等等吧,欢迎持续关注。

祝玩儿的开心

以上是 时序数据库InfluxIOx源码学习四(Run命令的执行) 的全部内容, 来源链接: utcz.com/z/535505.html

回到顶部