java怎么连接hdfs文件系统,需要哪些包?

发布网友 发布时间:2022-04-22 06:48

我来回答

2个回答

热心网友 时间:2022-06-16 20:28

package com.wyc.hadoop.fs;

import java.io.BufferedInputStream;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import java.util.Date;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.Progressable;

public class FSOptr {

    /**
     * @param args
     */
    public static void main(String[] args) throws Exception {
        // TODO Auto-generated method stub
        Configuration conf = new Configuration();
        makeDir(conf);
        rename(conf);
        delete(conf);

    }

    // 创建文件目录
    private static void makeDir(Configuration conf) throws Exception {
        FileSystem fs = FileSystem.get(conf);
        Path dir = new Path("/user/hadoop/data/20140318");
        boolean result = fs.mkdirs(dir);// 创建文件夹
        System.out.println("make dir :" + result);

        // 创建文件,并写入内容
        Path dst = new Path("/user/hadoop/data/20140318/tmp");
        byte[] buff = "hello,hadoop!".getBytes();
        FSDataOutputStream outputStream = fs.create(dst);
        outputStream.write(buff, 0, buff.length);
        outputStream.close();
        FileStatus files[] = fs.listStatus(dst);
        for (FileStatus file : files) {
            System.out.println(file.getPath());
        }
        fs.close();
    }

    // 重命名文件
    private static void rename(Configuration conf) throws Exception {

        FileSystem fs = FileSystem.get(conf);
        Path oldName = new Path("/user/hadoop/data/20140318/1.txt");
        Path newName = new Path("/user/hadoop/data/20140318/2.txt");
        fs.rename(oldName, newName);

        FileStatus files[] = fs.listStatus(new Path(
                "/user/hadoop/data/20140318"));
        for (FileStatus file : files) {
            System.out.println(file.getPath());
        }
        fs.close();
    }

    // 删除文件
    @SuppressWarnings("deprecation")
    private static void delete(Configuration conf) throws Exception {
        FileSystem fs = FileSystem.get(conf);
        Path path = new Path("/user/hadoop/data/20140318");
        if (fs.isDirectory(path)) {
            FileStatus files[] = fs.listStatus(path);
            for (FileStatus file : files) {
                fs.delete(file.getPath());
            }
        } else {
            fs.delete(path);
        }

        // 或者
        fs.delete(path, true);

        fs.close();
    }

    /**
     * 下载,将hdfs文件下载到本地磁盘
     * 
     * @param localSrc1
     *            本地的文件地址,即文件的路径
     * @param hdfsSrc1
     *            存放在hdfs的文件地址
     */
    public boolean sendFromHdfs(String hdfsSrc1, String localSrc1) {

        Configuration conf = new Configuration();
        FileSystem fs = null;
        try {
            fs = FileSystem.get(URI.create(hdfsSrc1), conf);
            Path hdfs_path = new Path(hdfsSrc1);
            Path local_path = new Path(localSrc1);

            fs.copyToLocalFile(hdfs_path, local_path);

            return true;
        } catch (IOException e) {
            e.printStackTrace();
        }
        return false;
    }

    /**
     * 上传,将本地文件copy到hdfs系统中
     * 
     * @param localSrc
     *            本地的文件地址,即文件的路径
     * @param hdfsSrc
     *            存放在hdfs的文件地址
     */
    public boolean sendToHdfs1(String localSrc, String hdfsSrc) {
        InputStream in;
        try {
            in = new BufferedInputStream(new FileInputStream(localSrc));
            Configuration conf = new Configuration();// 得到配置对象
            FileSystem fs; // 文件系统
            try {
                fs = FileSystem.get(URI.create(hdfsSrc), conf);
                // 输出流,创建一个输出流
                OutputStream out = fs.create(new Path(hdfsSrc),
                        new Progressable() {
                            // 重写progress方法
                            public void progress() {
                                // System.out.println("上传完一个设定缓存区大小容量的文件!");
                            }
                        });
                // 连接两个流,形成通道,使输入流向输出流传输数据,
                IOUtils.copyBytes(in, out, 10240, true); // in为输入流对象,out为输出流对象,4096为缓冲区大小,true为上传后关闭流
                return true;
            } catch (IOException e) {
                e.printStackTrace();
            }

        } catch (FileNotFoundException e) {
            e.printStackTrace();
        }
        return false;
    }

    /**
     * 移动
     * 
     * @param old_st原来存放的路径
     * @param new_st移动到的路径
     */
    public boolean moveFileName(String old_st, String new_st) {

        try {

            // 下载到服务器本地
            boolean down_flag = sendFromHdfs(old_st, "/home/hadoop/文档/temp");
            Configuration conf = new Configuration();
            FileSystem fs = null;

            // 删除源文件
            try {
                fs = FileSystem.get(URI.create(old_st), conf);
                Path hdfs_path = new Path(old_st);
                fs.delete(hdfs_path);
            } catch (IOException e) {
                e.printStackTrace();
            }

            // 从服务器本地传到新路径
            new_st = new_st + old_st.substring(old_st.lastIndexOf("/"));
            boolean uplod_flag = sendToHdfs1("/home/hadoop/文档/temp", new_st);

            if (down_flag && uplod_flag) {
                return true;
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        return false;
    }

    // copy本地文件到hdfs
    private static void CopyFromLocalFile(Configuration conf) throws Exception {
        FileSystem fs = FileSystem.get(conf);
        Path src = new Path("/home/hadoop/word.txt");
        Path dst = new Path("/user/hadoop/data/");
        fs.copyFromLocalFile(src, dst);
        fs.close();
    }

    // 获取给定目录下的所有子目录以及子文件
    private static void getAllChildFile(Configuration conf) throws Exception {
        FileSystem fs = FileSystem.get(conf);
        Path path = new Path("/user/hadoop");
        getFile(path, fs);
    }

    private static void getFile(Path path, FileSystem fs)throws Exception {
        FileStatus[] fileStatus = fs.listStatus(path);
        for (int i = 0; i < fileStatus.length; i++) {
            if (fileStatus[i].isDir()) {
                Path p = new Path(fileStatus[i].getPath().toString());
                getFile(p, fs);
            } else {
                System.out.println(fileStatus[i].getPath().toString());
            }
        }
    }
    
    
    //判断文件是否存在
    private static boolean isExist(Configuration conf,String path)throws Exception{
        FileSystem fileSystem = FileSystem.get(conf);
        return fileSystem.exists(new Path(path));
    }
    
    //获取hdfs集群所有主机结点数据
    private static void getAllClusterNodeInfo(Configuration conf)throws Exception{
        FileSystem fs = FileSystem.get(conf);
        DistributedFileSystem hdfs = (DistributedFileSystem)fs;
        DatanodeInfo[] dataNodeStats = hdfs.getDataNodeStats();
        String[] names = new String[dataNodeStats.length];
        System.out.println("list of all the nodes in HDFS cluster:"); //print info

        for(int i=0; i < dataNodeStats.length; i++){
            names[i] = dataNodeStats[i].getHostName();
            System.out.println(names[i]); //print info

        }
    }
    
    //get the locations of a file in HDFS
    private static void getFileLocation(Configuration conf)throws Exception{
        FileSystem fs = FileSystem.get(conf);
        Path f = new Path("/user/cluster/dfs.txt");
        FileStatus filestatus = fs.getFileStatus(f);
        BlockLocation[] blkLocations = fs.getFileBlockLocations(filestatus,0,filestatus.getLen());
        int blkCount = blkLocations.length;
        for(int i=0; i < blkCount; i++){
            String[] hosts = blkLocations[i].getHosts();
            //Do sth with the block hosts

            System.out.println(hosts);
        }
    }
    
     //get HDFS file last modification time
    private static void getModificationTime(Configuration conf)throws Exception{
        FileSystem fs = FileSystem.get(conf);
        Path f = new Path("/user/cluster/dfs.txt");
        FileStatus filestatus = fs.getFileStatus(f);
        
        long modificationTime = filestatus.getModificationTime(); // measured in milliseconds since the epoch

        Date d = new Date(modificationTime);
        System.out.println(d);
    }
    
}

热心网友 时间:2022-06-16 20:29

楼主,根据官网的API和文档、例子,入门应该不难的

声明声明:本网页内容为用户发布,旨在传播知识,不代表本网认同其观点,若有侵权等问题请及时与本网联系,我们将在第一时间删除处理。E-MAIL:11247931@qq.com