首页 > 技术文章 > [SequenceFile_2] SequenceFile 的基本操作

share23 2018-11-01 10:42 原文


0. 说明

  测试序列文件的读写操作 && 测试序列文件的排序操作 && 测试序列文件的合并操作 && 测试序列文件的压缩方式 && 测试将日志文件转换成序列文件

  作为 Hadoop 序列文件 中的 SequenceFile 的基本操作 部分的补充存在

 

 

 


 

1. 测试读写 && 压缩

package hadoop.sequencefile;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.junit.Test;

import java.io.IOException;

/**
 * 测试序列文件
 */
public class TestSeqFile {

    /**
     * 测试序列文件写操作
     */
    @Test
    public void testWriteSeq() throws Exception {

        Configuration conf = new Configuration();

        // 设置文件系统为本地模式
        conf.set("fs.defaultFS", "file:///");

        FileSystem fs = FileSystem.get(conf);

//        Path path = new Path("E:/test/none.seq");
//        Path path = new Path("E:/test/record.seq");
        Path path = new Path("E:/test/block.seq");
        // 不压缩
//        SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, path, IntWritable.class, Text.class,SequenceFile.CompressionType.NONE);
        // 记录压缩
//        SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, path, IntWritable.class, Text.class,SequenceFile.CompressionType.RECORD);
        // 块压缩
        SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, path, IntWritable.class, Text.class, SequenceFile.CompressionType.BLOCK);


        for (int i = 1; i <= 1000; i++) {
            IntWritable key = new IntWritable(i);
            Text value = new Text("helloworld" + i);

            writer.append(key, value);

        }

        writer.close();
    }

    /**
     * 测试序列文件读操作
     */
    @Test
    public void testReadSeq() throws Exception {
        Configuration conf = new Configuration();

        // 设置文件系统为本地模式
        conf.set("fs.defaultFS", "file:///");

        FileSystem fs = FileSystem.get(conf);

        Path path = new Path("E:/test/block.seq");

        SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf);

        //初始化两个 Writable 对象
        IntWritable key = new IntWritable();
        Text value = new Text();

        while ((reader.next(key, value))) {
            long position = reader.getPosition();
            System.out.println("key: " + key.get() + " , " + " val: " + value.toString() + " , " + " pos: " + position);
        }
    }

}

 


 

2. 测试排序

package hadoop.sequencefile;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.junit.Test;

import java.util.Random;

/**
 * 测试排序
 */
public class TestSeqFileSort {

    /**
     * 创建无序 key-value 文件
     */
    @Test
    public void testWriteRandom() throws Exception {

        Configuration conf = new Configuration();

        conf.set("fs.defaultFS", "file:///");

        FileSystem fs = FileSystem.get(conf);

        Path p = new Path("E:/test/random.seq");

        SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, p, IntWritable.class, Text.class, SequenceFile.CompressionType.RECORD);

        // 初始化 random
        Random r = new Random();

        for (int i = 1; i < 100000; i++) {
            // 在0-99999之中随机选取一个值
            int j = r.nextInt(100000);
            IntWritable key = new IntWritable(j);
            Text value = new Text("helloworld" + j);

            writer.append(key, value);

        }

        writer.close();

    }

    /**
     * 测试seqFile排序
     */
    @Test
    public void testSort() throws Exception {

        Configuration conf = new Configuration();

        conf.set("fs.defaultFS", "file:///");

        FileSystem fs = FileSystem.get(conf);

        Path pin = new Path("E:/test/random.seq");
        Path pout = new Path("E:/test/sort.seq");

        SequenceFile.Sorter sorter = new SequenceFile.Sorter(fs, IntWritable.class, Text.class, conf);

        sorter.sort(pin, pout);
    }

    /**
     * 测试序列文件读操作
     */
    @Test
    public void testReadSeq() throws Exception {
        Configuration conf = new Configuration();

        // 设置文件系统为本地模式
        conf.set("fs.defaultFS", "file:///");

        FileSystem fs = FileSystem.get(conf);

        Path path = new Path("E:/test/sort.seq");

        SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf);

        //初始化两个 Writable 对象
        IntWritable key = new IntWritable();
        Text value = new Text();

        while ((reader.next(key, value))) {
            long position = reader.getPosition();
            System.out.println("key: " + key.get() + " , " + " val: " + value.toString() + " , " + " pos: " + position);
        }
    }

}

 

 


 

3. 测试合并

package hadoop.sequencefile;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.junit.Test;

/**
 * 测试文件合并,必须是同一种压缩类型
 */
public class TestSeqFileMerge {
    /**
     * 测试序列文件写操作
     * 创建两个文件,范围为1-100,100-200
     */
    @Test
    public void testWriteSeq() throws Exception {

        Configuration conf = new Configuration();

        // 设置文件系统为本地模式
        conf.set("fs.defaultFS", "file:///");

        FileSystem fs = FileSystem.get(conf);

//        Path path = new Path("E:/test/block1.seq");
        Path path = new Path("E:/test/block2.seq");

        // 块压缩
        SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, path, IntWritable.class, Text.class, SequenceFile.CompressionType.BLOCK);

//        for (int i = 1; i <= 100; i++) {
        for (int i = 101; i <= 200; i++) {
            IntWritable key = new IntWritable(i);
            Text value = new Text("helloworld" + i);

            writer.append(key, value);

        }

        writer.close();
    }

    /**
     * 测试文件合并,合并的同时排序
     */
    @Test
    public void testMerge() throws Exception {
        Configuration conf = new Configuration();

        conf.set("fs.defaultFS", "file:///");

        FileSystem fs = FileSystem.get(conf);

        Path pin1 = new Path("E:/test/block1.seq");
        Path pin2 = new Path("E:/test/block2.seq");
        Path pout = new Path("E:/test/merge.seq");

        SequenceFile.Sorter sorter = new SequenceFile.Sorter(fs, IntWritable.class, Text.class, conf);

        Path[] p = {pin1, pin2};

        sorter.merge(p, pout);
    }

    /**
     * 测试序列文件读操作
     */
    @Test
    public void testReadSeq() throws Exception {
        Configuration conf = new Configuration();

        // 设置文件系统为本地模式
        conf.set("fs.defaultFS", "file:///");

        FileSystem fs = FileSystem.get(conf);

        Path path = new Path("E:/test/merge.seq");

        SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf);

        //初始化两个 Writable 对象
        IntWritable key = new IntWritable();
        Text value = new Text();

        while ((reader.next(key, value))) {
            long position = reader.getPosition();
            System.out.println("key: " + key.get() + " , " + " val: " + value.toString() + " , " + " pos: " + position);
        }
    }

}

 

 

 


4. 测试将日志文件转换成序列文件

package hadoop.sequencefile;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;

/**
 * 测试将日志文件转换成序列文件
 * Windows 下查看压缩后的 SequenceFile :
 * hdfs dfs -text file:///E:/test/access.seq
 */
public class Log2Seq {
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();

        // 设置文件系统为本地模式
        conf.set("fs.defaultFS", "file:///");

        FileSystem fs = FileSystem.get(conf);

        Path path = new Path("E:/test/access.seq");

        // 不压缩
//        SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, path, IntWritable.class, Text.class,SequenceFile.CompressionType.NONE);
        // 记录压缩
//        SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, path, IntWritable.class, Text.class,SequenceFile.CompressionType.RECORD);
        // 块压缩
        SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, path, NullWritable.class, Text.class, SequenceFile.CompressionType.BLOCK);

        BufferedReader br = new BufferedReader(new FileReader("E:/file/access.log1"));

        String line = null;
        while ((line = br.readLine()) != null) {
            NullWritable key = NullWritable.get();
            Text value = new Text(line);
            writer.append(key, value);
        }

        writer.close();
    }
}

 

 

 

 

 

 

 


 

推荐阅读