掌握HDFS的Java API接口访问

java

     HDFS设计的主要目的是对海量数据进行存储,也就是说在其上能够存储很大量文件(可以存储TB级的文件)。HDFS将这些文件分割之后,存储在不同的DataNode上, HDFS 提供了两种访问接口:Shell接口和Java API 接口,对HDFS里面的文件进行操作,具体每个Block放在哪台DataNode上面,对于开发者来说是透明的。

1、获取文件系统

 1 /**

2 * 获取文件系统

3 *

4 * @return FileSystem

5 */

6 public static FileSystem getFileSystem() {

7 //读取配置文件

8 Configuration conf = new Configuration();

9 // 文件系统

10 FileSystem fs = null;

11

12 String hdfsUri = HDFSUri;

13 if(StringUtils.isBlank(hdfsUri)){

14 // 返回默认文件系统 如果在 Hadoop集群下运行,使用此种方法可直接获取默认文件系统

15 try {

16 fs = FileSystem.get(conf);

17 } catch (IOException e) {

18 logger.error("", e);

19 }

20 }else{

21 // 返回指定的文件系统,如果在本地测试,需要使用此种方法获取文件系统

22 try {

23 URI uri = new URI(hdfsUri.trim());

24 fs = FileSystem.get(uri,conf);

25 } catch (URISyntaxException | IOException e) {

26 logger.error("", e);

27 }

28 }

29

30 return fs;

31 }

 2、创建文件目录

 1 /**

2 * 创建文件目录

3 *

4 * @param path

5 */

6 public static void mkdir(String path) {

7 try {

8 // 获取文件系统

9 FileSystem fs = getFileSystem();

10

11 String hdfsUri = HDFSUri;

12 if(StringUtils.isNotBlank(hdfsUri)){

13 path = hdfsUri + path;

14 }

15

16 // 创建目录

17 fs.mkdirs(new Path(path));

18

19 //释放资源

20 fs.close();

21 } catch (IllegalArgumentException | IOException e) {

22 logger.error("", e);

23 }

24 }

3、删除文件或者文件目录

 1 /**

2 * 删除文件或者文件目录

3 *

4 * @param path

5 */

6 public static void rmdir(String path) {

7 try {

8 // 返回FileSystem对象

9 FileSystem fs = getFileSystem();

10

11 String hdfsUri = HDFSUri;

12 if(StringUtils.isNotBlank(hdfsUri)){

13 path = hdfsUri + path;

14 }

15

16 // 删除文件或者文件目录 delete(Path f) 此方法已经弃用

17 fs.delete(new Path(path),true);

18

19 // 释放资源

20 fs.close();

21 } catch (IllegalArgumentException | IOException e) {

22 logger.error("", e);

23 }

24 }

3、根据filter获取目录下的文件

 1 /**

2 * 根据filter获取目录下的文件

3 *

4 * @param path

5 * @param pathFilter

6 * @return String[]

7 */

8 public static String[] ListFile(String path,PathFilter pathFilter) {

9 String[] files = new String[0];

10

11 try {

12 // 返回FileSystem对象

13 FileSystem fs = getFileSystem();

14

15 String hdfsUri = HDFSUri;

16 if(StringUtils.isNotBlank(hdfsUri)){

17 path = hdfsUri + path;

18 }

19

20 FileStatus[] status;

21 if(pathFilter != null){

22 // 根据filter列出目录内容

23 status = fs.listStatus(new Path(path),pathFilter);

24 }else{

25 // 列出目录内容

26 status = fs.listStatus(new Path(path));

27 }

28

29 // 获取目录下的所有文件路径

30 Path[] listedPaths = FileUtil.stat2Paths(status);

31 // 转换String[]

32 if (listedPaths != null && listedPaths.length > 0){

33 files = new String[listedPaths.length];

34 for (int i = 0; i < files.length; i++){

35 files[i] = listedPaths[i].toString();

36 }

37 }

38 // 释放资源

39 fs.close();

40 } catch (IllegalArgumentException | IOException e) {

41 logger.error("", e);

42 }

43

44 return files;

45 }

4、文件上传至 HDFS

 1 /**

2 * 文件上传至 HDFS

3 *

4 * @param delSrc

5 * @param overwrite

6 * @param srcFile

7 * @param destPath

8 */

9 public static void copyFileToHDFS(boolean delSrc, boolean overwrite,String srcFile,String destPath) {

10 // 源文件路径是Linux下的路径,如果在 windows 下测试,需要改写为Windows下的路径,比如D://hadoop/djt/weibo.txt

11 Path srcPath = new Path(srcFile);

12

13 // 目的路径

14 String hdfsUri = HDFSUri;

15 if(StringUtils.isNotBlank(hdfsUri)){

16 destPath = hdfsUri + destPath;

17 }

18 Path dstPath = new Path(destPath);

19

20 // 实现文件上传

21 try {

22 // 获取FileSystem对象

23 FileSystem fs = getFileSystem();

24 fs.copyFromLocalFile(srcPath, dstPath);

25 fs.copyFromLocalFile(delSrc,overwrite,srcPath, dstPath);

26 //释放资源

27 fs.close();

28 } catch (IOException e) {

29 logger.error("", e);

30 }

31 }

5、从 HDFS 下载文件

 1 /**

2 * 从 HDFS 下载文件

3 *

4 * @param srcFile

5 * @param destPath

6 */

7 public static void getFile(String srcFile,String destPath) {

8 // 源文件路径

9 String hdfsUri = HDFSUri;

10 if(StringUtils.isNotBlank(hdfsUri)){

11 srcFile = hdfsUri + srcFile;

12 }

13 Path srcPath = new Path(srcFile);

14

15 // 目的路径是Linux下的路径,如果在 windows 下测试,需要改写为Windows下的路径,比如D://hadoop/djt/

16 Path dstPath = new Path(destPath);

17

18 try {

19 // 获取FileSystem对象

20 FileSystem fs = getFileSystem();

21 // 下载hdfs上的文件

22 fs.copyToLocalFile(srcPath, dstPath);

23 // 释放资源

24 fs.close();

25 } catch (IOException e) {

26 logger.error("", e);

27 }

28 }

6、获取 HDFS 集群节点信息

 1 /**

2 * 获取 HDFS 集群节点信息

3 *

4 * @return DatanodeInfo[]

5 */

6 public static DatanodeInfo[] getHDFSNodes() {

7 // 获取所有节点

8 DatanodeInfo[] dataNodeStats = new DatanodeInfo[0];

9

10 try {

11 // 返回FileSystem对象

12 FileSystem fs = getFileSystem();

13

14 // 获取分布式文件系统

15 DistributedFileSystem hdfs = (DistributedFileSystem)fs;

16

17 dataNodeStats = hdfs.getDataNodeStats();

18 } catch (IOException e) {

19 logger.error("", e);

20 }

21 return dataNodeStats;

22 }

7、查找某个文件在 HDFS集群的位置

 1 /**

2 * 查找某个文件在 HDFS集群的位置

3 *

4 * @param filePath

5 * @return BlockLocation[]

6 */

7 public static BlockLocation[] getFileBlockLocations(String filePath) {

8 // 文件路径

9 String hdfsUri = HDFSUri;

10 if(StringUtils.isNotBlank(hdfsUri)){

11 filePath = hdfsUri + filePath;

12 }

13 Path path = new Path(filePath);

14

15 // 文件块位置列表

16 BlockLocation[] blkLocations = new BlockLocation[0];

17 try {

18 // 返回FileSystem对象

19 FileSystem fs = getFileSystem();

20 // 获取文件目录

21 FileStatus filestatus = fs.getFileStatus(path);

22 //获取文件块位置列表

23 blkLocations = fs.getFileBlockLocations(filestatus, 0, filestatus.getLen());

24 } catch (IOException e) {

25 logger.error("", e);

26 }

27 return blkLocations;

28 }

 8、文件重命名

 1 /**

2 * 文件重命名

3 *

4 * @param srcPath

5 * @param dstPath

6 */

7 public boolean rename(String srcPath, String dstPath){

8 boolean flag = false;

9 try {

10 // 返回FileSystem对象

11 FileSystem fs = getFileSystem();

12

13 String hdfsUri = HDFSUri;

14 if(StringUtils.isNotBlank(hdfsUri)){

15 srcPath = hdfsUri + srcPath;

16 dstPath = hdfsUri + dstPath;

17 }

18

19 flag = fs.rename(new Path(srcPath), new Path(dstPath));

20 } catch (IOException e) {

21 logger.error("{} rename to {} error.", srcPath, dstPath);

22 }

23

24 return flag;

25 }

9、判断目录是否存在

 1 /**

2 * 判断目录是否存在

3 *

4 * @param srcPath

5 * @param dstPath

6 */

7 public boolean existDir(String filePath, boolean create){

8 boolean flag = false;

9

10 if (StringUtils.isEmpty(filePath)){

11 return flag;

12 }

13

14 try{

15 Path path = new Path(filePath);

16 // FileSystem对象

17 FileSystem fs = getFileSystem();

18

19 if (create){

20 if (!fs.exists(path)){

21 fs.mkdirs(path);

22 }

23 }

24

25 if (fs.isDirectory(path)){

26 flag = true;

27 }

28 }catch (Exception e){

29 logger.error("", e);

30 }

31

32 return flag;

33 }

     10  查看HDFS文件的最后修改时间  

  1. public void testgetModifyTime() throws Exception {  
  2.         Configuration conf = new Configuration();  

  3.         FileSystem hdfs = FileSystem.get(conf);  
  4.         Path dst = new Path(hdfsPath);  

  5.         FileStatus files[] = hdfs.listStatus(dst);  
  6. for (FileStatus file : files) {  
  7.             System.out.println(file.getPath() + "\t"  

  8.                     + file.getModificationTime());  
  9.             System.out.println(file.getPath() + "\t"  

  10.                     + new Date(file.getModificationTime()));  

  11.         } 

   

  1.    // 查看HDFS文件是否存在  

  2.   
  3.     public void testExists() throws Exception {  

  4.   
  5.         Configuration conf = new Configuration();  

  6.           
  7.         FileSystem hdfs = FileSystem.get(conf);  
  8.         Path dst = new Path(hdfsPath + "file01.txt");  

  9.   
  10.         boolean ok = hdfs.exists(dst);  

  11.         System.out.println(ok ? "文件存在" : "文件不存在");  

  12.     }  

    

  1.     // 获取HDFS集群上所有节点名称  

  2.     public void testGetHostName() throws Exception {  

  3.   
  4.         Configuration conf = new Configuration();  

  5.           
  6.         DistributedFileSystem hdfs = (DistributedFileSystem) FileSystem  
  7.                 .get(conf);  
  8.         DatanodeInfo[] dataNodeStats = hdfs.getDataNodeStats();  
  9.   
  10.         for (DatanodeInfo dataNode : dataNodeStats) {  

  11.             System.out.println(dataNode.getHostName() + "\t"  

  12.                     + dataNode.getName());  
  13.         }  
  14.     }

  

以上是 掌握HDFS的Java API接口访问 的全部内容, 来源链接: utcz.com/z/393382.html

回到顶部