java 使用hadoop 连接mysql

发布网友 发布时间:2022-04-22 06:48

我来回答

3个回答

懂视网 时间:2022-04-08 11:23

    运行MapReduce时候报错:java.io.IOException: com.mysql.jdbc.Driver,一般是由于程序找不到mysql驱动包。解决方法是让每个tasktracker运行MapReduce程序时都可以找到该驱动包。

添加包有两种方式:

(1)在每个节点下的${HADOOP_HOME}/lib下添加该包。重启集群,一般是比较原始的方法。

(2)a)把包传到集群上: hadoop fs -put mysql-connector-java-5.1.0- bin.jar /hdfsPath/

       b)在mr程序提交job前,添加语句:DistributedCache.addFileToClassPath(new Path(“/hdfsPath/mysql- connector-java-5.1.0-bin.jar”),conf);

mysql数据库存储到hadoop hdfs

mysql表创建和数据初始化

DROP TABLE IF EXISTS `wu_testhadoop`;
CREATE TABLE `wu_testhadoop` (
 `id` int(11) NOT NULL AUTO_INCREMENT,
 `title` varchar(255) DEFAULT NULL,
 `content` varchar(255) DEFAULT NULL,
 PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8;

-- ----------------------------
-- Records of wu_testhadoop
-- ----------------------------
INSERT INTO `wu_testhadoop` VALUES ('1', '123', '122312');
INSERT INTO `wu_testhadoop` VALUES ('2', '123', '123456');

定义hadoop数据访问

mysql表创建完毕后,我们需要定义hadoop访问mysql的规则;

hadoop提供了org.apache.hadoop.io.Writable接口来实现简单的高效的可序列化的协议,该类基于DataInput和DataOutput来实现相关的功能。

hadoop对数据库访问也提供了org.apache.hadoop.mapred.lib.db.DBWritable接口,其中write方法用于对PreparedStatement对象设定值,readFields方法用于对从数据库读取出来的对象进行列的值绑定;

以上两个接口的使用如下(内容是从源码得来)

writable

 public class MyWritable implements Writable {
 // Some data 
 private int counter;
 private long timestamp;
 
 public void write(DataOutput out) throws IOException {
  out.writeInt(counter);
  out.writeLong(timestamp);
 }
 
 public void readFields(DataInput in) throws IOException {
  counter = in.readInt();
  timestamp = in.readLong();
 }
 
 public static MyWritable read(DataInput in) throws IOException {
  MyWritable w = new MyWritable();
  w.readFields(in);
  return w;
 }
 }
 


DBWritable

public class MyWritable implements Writable, DBWritable {
 // Some data 
 private int counter;
 private long timestamp;
 
 //Writable#write() implementation
 public void write(DataOutput out) throws IOException {
 out.writeInt(counter);
 out.writeLong(timestamp);
 }
 
 //Writable#readFields() implementation
 public void readFields(DataInput in) throws IOException {
 counter = in.readInt();
 timestamp = in.readLong();
 }
 
 public void write(PreparedStatement statement) throws SQLException {
 statement.setInt(1, counter);
 statement.setLong(2, timestamp);
 }
 
 public void readFields(ResultSet resultSet) throws SQLException {
 counter = resultSet.getInt(1);
 timestamp = resultSet.getLong(2);
 } 
 }

数据库对应的实现

package com.wyg.hadoop.mysql.bean;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.lib.db.DBWritable;

public class DBRecord implements Writable, DBWritable{
	private int id;
	private String title;
	private String content;
	public int getId() {
		return id;
	}

	public void setId(int id) {
		this.id = id;
	}

	public String getTitle() {
		return title;
	}

	public void setTitle(String title) {
		this.title = title;
	}

	public String getContent() {
		return content;
	}

	public void setContent(String content) {
		this.content = content;
	}

	@Override
	public void readFields(ResultSet set) throws SQLException {
		this.id = set.getInt("id");
		this.title = set.getString("title");
		this.content = set.getString("content");
	}

	@Override
	public void write(PreparedStatement pst) throws SQLException {
		pst.setInt(1, id);
		pst.setString(2, title);
		pst.setString(3, content);
	}

	@Override
	public void readFields(DataInput in) throws IOException {
		this.id = in.readInt();
		this.title = Text.readString(in);
		this.content = Text.readString(in);
	}

	@Override
	public void write(DataOutput out) throws IOException {
		out.writeInt(this.id);
		Text.writeString(out, this.title);
		Text.writeString(out, this.content);
	}

	@Override
	public String toString() {
		 return this.id + " " + this.title + " " + this.content; 
	}
}


实现Map/Reduce

package com.wyg.hadoop.mysql.mapper;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;

import com.wyg.hadoop.mysql.bean.DBRecord;

@SuppressWarnings("deprecation")
public class DBRecordMapper extends MapReduceBase implements Mapper<LongWritable, DBRecord, LongWritable, Text>{

	@Override
	public void map(LongWritable key, DBRecord value,
			OutputCollector<LongWritable, Text> collector, Reporter reporter)
			throws IOException {
		collector.collect(new LongWritable(value.getId()), new Text(value.toString())); 
	}
	
}

测试hadoop连接mysql并将数据存储到hdfs

package com.wyg.hadoop.mysql.db;
import java.io.IOException;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.lib.IdentityReducer;
import org.apache.hadoop.mapred.lib.db.DBConfiguration;
import org.apache.hadoop.mapred.lib.db.DBInputFormat;

import com.wyg.hadoop.mysql.bean.DBRecord;
import com.wyg.hadoop.mysql.mapper.DBRecordMapper;

public class DBAccess {
 public static void main(String[] args) throws IOException {
  JobConf conf = new JobConf(DBAccess.class);
  conf.setOutputKeyClass(LongWritable.class);
  conf.setOutputValueClass(Text.class);
  conf.setInputFormat(DBInputFormat.class);
  Path path = new Path("hdfs://192.168.44.129:9000/user/root/dbout");
  FileOutputFormat.setOutputPath(conf, path);
  DBConfiguration.configureDB(conf,"com.mysql.jdbc.Driver", "jdbc:mysql://你的ip:3306/数据库名","用户名","密码");
  String [] fields = {"id", "title", "content"};
  DBInputFormat.setInput(conf, DBRecord.class, "wu_testhadoop",
   null, "id", fields);
  conf.setMapperClass(DBRecordMapper.class);
  conf.setReducerClass(IdentityReducer.class);
  JobClient.runJob(conf);
 }
}

执行程序,结果如下:

15/08/11 16:46:18 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=
15/08/11 16:46:18 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
15/08/11 16:46:18 WARN mapred.JobClient: No job jar file set. User classes may not be found. See JobConf(Class) or JobConf#setJar(String).
15/08/11 16:46:19 INFO mapred.JobClient: Running job: job_local_0001
15/08/11 16:46:19 INFO mapred.MapTask: numReduceTasks: 1
15/08/11 16:46:19 INFO mapred.MapTask: io.sort.mb = 100
15/08/11 16:46:19 INFO mapred.MapTask: data buffer = 79691776/99614720
15/08/11 16:46:19 INFO mapred.MapTask: record buffer = 262144/327680
15/08/11 16:46:19 INFO mapred.MapTask: Starting flush of map output
15/08/11 16:46:19 INFO mapred.MapTask: Finished spill 0
15/08/11 16:46:19 INFO mapred.TaskRunner: Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting
15/08/11 16:46:19 INFO mapred.LocalJobRunner: 
15/08/11 16:46:19 INFO mapred.TaskRunner: Task 'attempt_local_0001_m_000000_0' done.
15/08/11 16:46:19 INFO mapred.LocalJobRunner: 
15/08/11 16:46:19 INFO mapred.Merger: Merging 1 sorted segments
15/08/11 16:46:19 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 48 bytes
15/08/11 16:46:19 INFO mapred.LocalJobRunner: 
15/08/11 16:46:19 INFO mapred.TaskRunner: Task:attempt_local_0001_r_000000_0 is done. And is in the process of commiting
15/08/11 16:46:19 INFO mapred.LocalJobRunner: 
15/08/11 16:46:19 INFO mapred.TaskRunner: Task attempt_local_0001_r_000000_0 is allowed to commit now
15/08/11 16:46:19 INFO mapred.FileOutputCommitter: Saved output of task 'attempt_local_0001_r_000000_0' to hdfs://192.168.44.129:9000/user/root/dbout
15/08/11 16:46:19 INFO mapred.LocalJobRunner: reduce > reduce
15/08/11 16:46:19 INFO mapred.TaskRunner: Task 'attempt_local_0001_r_000000_0' done.
15/08/11 16:46:20 INFO mapred.JobClient: map 100% reduce 100%
15/08/11 16:46:20 INFO mapred.JobClient: Job complete: job_local_0001
15/08/11 16:46:20 INFO mapred.JobClient: Counters: 14
15/08/11 16:46:20 INFO mapred.JobClient: FileSystemCounters
15/08/11 16:46:20 INFO mapred.JobClient: FILE_BYTES_READ=34606
15/08/11 16:46:20 INFO mapred.JobClient: FILE_BYTES_WRITTEN=69844
15/08/11 16:46:20 INFO mapred.JobClient: HDFS_BYTES_WRITTEN=30
15/08/11 16:46:20 INFO mapred.JobClient: Map-Reduce Framework
15/08/11 16:46:20 INFO mapred.JobClient: Reduce input groups=2
15/08/11 16:46:20 INFO mapred.JobClient: Combine output records=0
15/08/11 16:46:20 INFO mapred.JobClient: Map input records=2
15/08/11 16:46:20 INFO mapred.JobClient: Reduce shuffle bytes=0
15/08/11 16:46:20 INFO mapred.JobClient: Reduce output records=2
15/08/11 16:46:20 INFO mapred.JobClient: Spilled Records=4
15/08/11 16:46:20 INFO mapred.JobClient: Map output bytes=42
15/08/11 16:46:20 INFO mapred.JobClient: Map input bytes=2
15/08/11 16:46:20 INFO mapred.JobClient: Combine input records=0
15/08/11 16:46:20 INFO mapred.JobClient: Map output records=2
15/08/11 16:46:20 INFO mapred.JobClient: Reduce input records=2


同时可以看到hdfs文件系统多了一个dbout的目录,里边的文件保存了数据库对应的数据,内容保存如下

1	1 123 122312
2	2 123 123456


hdfs数据导入到mysql

    hdfs文件存储到mysql,也需要上边的DBRecord类作为辅助,因为数据库的操作都是通过DBInput和DBOutput来进行的;

    首先需要定义map和reduce的实现(map用以对hdfs的文档进行解析,reduce解析map的输出并输出)

package com.wyg.hadoop.mysql.mapper;

import java.io.IOException;
import java.io.DataInput;
import java.io.DataOutput;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Iterator;

import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import com.wyg.hadoop.mysql.bean.DBRecord;

public class WriteDB {
 // Map处理过程
 public static class Map extends MapReduceBase implements

  Mapper<Object, Text, Text, DBRecord> {
 private final static DBRecord one = new DBRecord();

 private Text word = new Text();

 @Override

 public void map(Object key, Text value,

  OutputCollector<Text, DBRecord> output, Reporter reporter)

  throws IOException {

  String line = value.toString();
  String[] infos = line.split(" ");
  String id = infos[0].split("	")[1];
  one.setId(new Integer(id));
  one.setTitle(infos[1]);
  one.setContent(infos[2]);
  word.set(id);
  output.collect(word, one);
 }

 }

 public static class Reduce extends MapReduceBase implements
		 Reducer<Text, DBRecord, DBRecord, Text> {
		@Override
		public void reduce(Text key, Iterator<DBRecord> values,
				OutputCollector<DBRecord, Text> collector, Reporter reporter)
				throws IOException {
			DBRecord record = values.next();
		 collector.collect(record, new Text());
		}
	}
}

测试hdfs导入数据到数据库

package com.wyg.hadoop.mysql.db;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.lib.db.DBConfiguration;
import org.apache.hadoop.mapred.lib.db.DBInputFormat;
import org.apache.hadoop.mapred.lib.db.DBOutputFormat;

import com.wyg.hadoop.mysql.bean.DBRecord;
import com.wyg.hadoop.mysql.mapper.WriteDB;

public class DBInsert {
	public static void main(String[] args) throws Exception {

		 

 JobConf conf = new JobConf(WriteDB.class);
 // 设置输入输出类型

 conf.setInputFormat(TextInputFormat.class);
 conf.setOutputFormat(DBOutputFormat.class);

 // 不加这两句,通不过,但是网上给的例子没有这两句。
 //Text, DBRecord
 conf.setMapOutputKeyClass(Text.class);
 conf.setMapOutputValueClass(DBRecord.class);
 conf.setOutputKeyClass(Text.class);
 conf.setOutputValueClass(DBRecord.class);
 // 设置Map和Reduce类
 conf.setMapperClass(WriteDB.Map.class);
 conf.setReducerClass(WriteDB.Reduce.class);
 // 设置输如目录
 FileInputFormat.setInputPaths(conf, new Path("hdfs://192.168.44.129:9000/user/root/dbout"));
 // 建立数据库连接
 DBConfiguration.configureDB(conf,"com.mysql.jdbc.Driver", "jdbc:mysql://数据库ip:3306/数据库名称","用户名","密码");
 String[] fields = {"id","title","content" };
 DBOutputFormat.setOutput(conf, "wu_testhadoop", fields);
 JobClient.runJob(conf);
 }

}

测试结果如下

15/08/11 18:10:15 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=
15/08/11 18:10:15 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
15/08/11 18:10:15 WARN mapred.JobClient: No job jar file set. User classes may not be found. See JobConf(Class) or JobConf#setJar(String).
15/08/11 18:10:15 INFO mapred.FileInputFormat: Total input paths to process : 1
15/08/11 18:10:15 INFO mapred.JobClient: Running job: job_local_0001
15/08/11 18:10:15 INFO mapred.FileInputFormat: Total input paths to process : 1
15/08/11 18:10:15 INFO mapred.MapTask: numReduceTasks: 1
15/08/11 18:10:15 INFO mapred.MapTask: io.sort.mb = 100
15/08/11 18:10:15 INFO mapred.MapTask: data buffer = 79691776/99614720
15/08/11 18:10:15 INFO mapred.MapTask: record buffer = 262144/327680
15/08/11 18:10:15 INFO mapred.MapTask: Starting flush of map output
15/08/11 18:10:16 INFO mapred.MapTask: Finished spill 0
15/08/11 18:10:16 INFO mapred.TaskRunner: Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting
15/08/11 18:10:16 INFO mapred.LocalJobRunner: hdfs://192.168.44.129:9000/user/root/dbout/part-00000:0+30
15/08/11 18:10:16 INFO mapred.TaskRunner: Task 'attempt_local_0001_m_000000_0' done.
15/08/11 18:10:16 INFO mapred.LocalJobRunner: 
15/08/11 18:10:16 INFO mapred.Merger: Merging 1 sorted segments
15/08/11 18:10:16 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 40 bytes
15/08/11 18:10:16 INFO mapred.LocalJobRunner: 
15/08/11 18:10:16 INFO mapred.TaskRunner: Task:attempt_local_0001_r_000000_0 is done. And is in the process of commiting
15/08/11 18:10:16 INFO mapred.LocalJobRunner: reduce > reduce
15/08/11 18:10:16 INFO mapred.TaskRunner: Task 'attempt_local_0001_r_000000_0' done.
15/08/11 18:10:16 INFO mapred.JobClient: map 100% reduce 100%
15/08/11 18:10:16 INFO mapred.JobClient: Job complete: job_local_0001
15/08/11 18:10:16 INFO mapred.JobClient: Counters: 14
15/08/11 18:10:16 INFO mapred.JobClient: FileSystemCounters
15/08/11 18:10:16 INFO mapred.JobClient: FILE_BYTES_READ=34932
15/08/11 18:10:16 INFO mapred.JobClient: HDFS_BYTES_READ=60
15/08/11 18:10:16 INFO mapred.JobClient: FILE_BYTES_WRITTEN=70694
15/08/11 18:10:16 INFO mapred.JobClient: Map-Reduce Framework
15/08/11 18:10:16 INFO mapred.JobClient: Reduce input groups=2
15/08/11 18:10:16 INFO mapred.JobClient: Combine output records=0
15/08/11 18:10:16 INFO mapred.JobClient: Map input records=2
15/08/11 18:10:16 INFO mapred.JobClient: Reduce shuffle bytes=0
15/08/11 18:10:16 INFO mapred.JobClient: Reduce output records=2
15/08/11 18:10:16 INFO mapred.JobClient: Spilled Records=4
15/08/11 18:10:16 INFO mapred.JobClient: Map output bytes=34
15/08/11 18:10:16 INFO mapred.JobClient: Map input bytes=30
15/08/11 18:10:16 INFO mapred.JobClient: Combine input records=0
15/08/11 18:10:16 INFO mapred.JobClient: Map output records=2
15/08/11 18:10:16 INFO mapred.JobClient: Reduce input records=2

测试之前我对原有表进行了清空处理,可以看到执行后数据库里边添加了两条内容;

下次在执行的时候会报错,属于正常情况,原因在于我们导入数据的时候对id进行赋值了,如果忽略id,是可以一直添加的;

源码下载地址

源码已上传,下载地址为download.csdn.net/detail/wuyinggui10000/74585



一步一步跟我学习hadoop(7)----hadoop连接mysql数据库执行数据读写数据库操作

标签:hadoop   mysql   map-reduce   导入导出mysql   

热心网友 时间:2022-04-08 08:31

金陵书生顾洪家境贫困,以卖字画为生,与母相依为命。

热心网友 时间:2022-04-08 09:49

JDBC访问一般分为如下流程:
1、加载JDBC驱动程序:
在连接数据库之前,首先要加载想要连接的数据库的驱动到JVM(Java虚拟机),
这通过java.lang.Class类的静态方法forName(String className)实现。
例如:
try{
//加载MySql的驱动类
Class.forName("com.mysql.jdbc.Driver") ;
}catch(ClassNotFoundException e){
System.out.println("找不到驱动程序类 ,加载驱动失败!");
e.printStackTrace() ;
}
成功加载后,会将Driver类的实例注册到DriverManager类中。
2、提供JDBC连接的URL
连接URL定义了连接数据库时的协议、子协议、数据源标识。
书写形式:协议:子协议:数据源标识
协议:在JDBC中总是以jdbc开始
子协议:是桥连接的驱动程序或是数据库管理系统名称。
数据源标识:标记找到数据库来源的地址与连接端口。

声明声明:本网页内容为用户发布,旨在传播知识,不代表本网认同其观点,若有侵权等问题请及时与本网联系,我们将在第一时间删除处理。E-MAIL:11247931@qq.com