当前位置: 游戏平台 > 互联网科技 > 正文

Hadoop中DBInputFormat和DBOutputFormat使用

时间:2019-11-05 23:44来源:互联网科技
Hadoop中可以编写自己的类,用作hadoopjob的key或者value类型,自己编写的类要实现接口Writable。 一、背景 我编写了一个HttpContent类,主要用于保存爬取网页的源码,返回状态和编码格式信

Hadoop中可以编写自己的类,用作hadoop job的key或者value类型,自己编写的类要实现接口Writable。

一、背景

我编写了一个HttpContent类,主要用于保存爬取网页的源码,返回状态和编码格式信息,他在mapper中别实例化保存网页内容,然后传输到reducer中被使用,在编写中遇到了一些问题:

为了方便MapReduce直接访问关系型数据库(Mysql,Oracle),Hadoop提供了DBInputFormat和DBOutputFormat两个类。通过DBInputFormat类把数据库表数据读入到HDFS,根据DBOutputFormat类把MapReduce产生的结果集导入到数据库表中。

(1)首先是没有编写默认的构造函数类,因为java中的反馈机制需要一个参数为空的默认构造函数,如果没有这个类就不能利用反馈机制实例化这个类。

推荐阅读:

(2)然后是类型在序列化的时候写入后读取值不正确,一定要统一类型中write(DataOutput out)和readFields(DataInput in)中写入和读取参数的方法,例如一个int类型如果你在write()中使用writeInt写出,在readFields()中就应该使用readInt()读入,否则读取的值是不正确的。多个值写出读入的时候,写出读入的顺序要保持一致的,否则读取也是不正确的。

Hadoop 中利用 MapReduce 读写 MySQL 数据 http://www.linuxidc.com/Linux/2013-07/88117.htm

(3)Writable中用于写出的DataOutput类型没有针对String类型的序列化方法,需要先将String类型转换成为Byte数组类型,然后在进行序列化。

二、技术细节

Ubuntu 13.04上搭建Hadoop环境 http://www.linuxidc.com/Linux/2013-06/86106.htm

1、DBInputFormat(Mysql为例),先创建表:

Ubuntu 12.10 +Hadoop 1.2.1版本集群配置 http://www.linuxidc.com/Linux/2013-09/90600.htm

CREATE TABLE studentinfo (

Ubuntu上搭建Hadoop环境(单机模式+伪分布模式) http://www.linuxidc.com/Linux/2013-01/77681.htm

  id INTEGER NOT NULL PRIMARY KEY,

Ubuntu下Hadoop环境的配置 http://www.linuxidc.com/Linux/2012-11/74539.htm

  name VARCHAR(32) NOT NULL);2、由于0.20版本对DBInputFormat和DBOutputFormat支持不是很好,该例用了0.19版本来说明这两个类的用法。3、DBInputFormat用法如下:

单机版搭建Hadoop环境图文教程详解 http://www.linuxidc.com/Linux/2012-02/53927.htm

public class DBInput {
  // DROP TABLE IF EXISTS `hadoop`.`studentinfo`;
  // CREATE TABLE studentinfo (
  // id INTEGER NOT NULL PRIMARY KEY,
  // name VARCHAR(32) NOT NULL);

搭建Hadoop环境(在Winodws环境下用虚拟机虚拟两个Ubuntu系统进行搭建) http://www.linuxidc.com/Linux/2011-12/48894.htm

  public static class StudentinfoRecord implements Writable, DBWritable {
    int id;
    String name;
    public StudentinfoRecord() {

下面是HttpContent的类型的源码,重点是write(DataOutput out)和readFields(DataInput in)方法:

    }
    public void readFields(DataInput in) throws IOException {
        this.id = in.readInt();
        this.name = Text.readString(in);
    }
    public void write(DataOutput out) throws IOException {
        out.writeInt(this.id);
        Text.writeString(out, this.name);
    }
    public void readFields(ResultSet result) throws SQLException {
        this.id = result.getInt(1);
        this.name = result.getString(2);
    }
    public void write(PreparedStatement stmt) throws SQLException {
        stmt.setInt(1, this.id);
        stmt.setString(2, this.name);
    }
    public String toString() {
        return new String(this.id + " " + this.name);
    }
  }
  public class DBInputMapper extends MapReduceBase implements
        Mapper<LongWritable, StudentinfoRecord, LongWritable, Text> {
    public void map(LongWritable key, StudentinfoRecord value,
          OutputCollector<LongWritable, Text> collector, Reporter reporter)
          throws IOException {
        collector.collect(new LongWritable(value.id), new Text(value
            .toString()));
    }
  }
  public static void main(String[] args) throws IOException {
    JobConf conf = new JobConf(DBInput.class);
    DistributedCache.addFileToClassPath(new Path(
          "/lib/mysql-connector-java-5.1.0-bin.jar"), conf);
   
    conf.setMapperClass(DBInputMapper.class);
    conf.setReducerClass(IdentityReducer.class);

package bbs.http;

    conf.setMapOutputKeyClass(LongWritable.class);
    conf.setMapOutputValueClass(Text.class);
    conf.setOutputKeyClass(LongWritable.class);
    conf.setOutputValueClass(Text.class);
   
    conf.setInputFormat(DBInputFormat.class);
    FileOutputFormat.setOutputPath(conf, new Path("/hua01"));
    DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver",
          "jdbc:mysql://192.168.3.244:3306/hadoop", "hua", "hadoop");
    String[] fields = { "id", "name" };
    DBInputFormat.setInput(conf, StudentinfoRecord.class, "studentinfo",
 null, "id", fields);

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.io.UnsupportedEncodingException;

    JobClient.runJob(conf);
  }
}

import org.apache.hadoop.io.Writable;

a)StudnetinfoRecord类的变量为表字段,实现Writable和DBWritable两个接口。

编辑:互联网科技 本文来源:Hadoop中DBInputFormat和DBOutputFormat使用

关键词:

  • 上一篇:没有了
  • 下一篇:没有了