首页 > 技术文章 > java实现华为云对象存储OBS的文件上传

zhengwj-joker 2020-04-29 12:32 原文

最近碰到公司的磁盘需要扩容,新购进的存储为华为云OBS,需要将公司服务器文件迁移至华为云OBS,且不影响业务,本地程序不需要做太大改动。

实现思路:  用监听模式对指定文件目录执行监听,检测新增文件上传至OBS,并返回下载地址。

实现过程如下:

1.pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.zwj</groupId>
<artifactId>obs-upload</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<description>OBS文件上传</description>

<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.5.9.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<properties>
<java.version>1.8</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<tomcat.version>7.0.63</tomcat.version>
</properties>

<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.apache.tomcat</groupId>
<artifactId>tomcat-jdbc</artifactId>
</dependency>

<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.4</version>
</dependency>
<dependency>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
<version>2.4</version>
</dependency>
<dependency>
<groupId>commons-net</groupId>
<artifactId>commons-net</artifactId>
<version>3.3</version>
</dependency>


<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.16.8</version>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.6.1</version>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>log4j-over-slf4j</artifactId>
<version>1.6.1</version>
</dependency>

<dependency>
<groupId>com.jcraft</groupId>
<artifactId>jsch</artifactId>
<version>0.1.54</version>
</dependency>

<dependency>
<groupId>com.huaweicloud</groupId>
<artifactId>esdk-obs-java</artifactId>
<version>3.19.7</version>
<exclusions>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-core</artifactId>
<version>5.2.0</version>
</dependency>

</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>

<plugin>
<!-- 打包插件assembly -->
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<!-- 版本号 -->
<version>2.2.1</version>
<!-- 打包描述文件,用于描述把哪些资源进行打包 -->
<configuration>
<descriptors>
<descriptor>src/main/resources/assembly.xml</descriptor>
</descriptors>
</configuration>
<!-- assembly插件执行配置 -->
<executions>
<execution>
<!-- 执行ID -->
<id>make-assembly</id>
<!-- 绑定到package生命周期阶段上,执行maven package打包是会启用本打包配置 -->
<phase>package</phase>
<goals>
<!-- 由于maven整项目打包时,有可能会build编译多次,single表示执行一次 -->
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>

<resources>
<resource>
<directory>src/main/html</directory>
<includes>
<include>**/**</include>
</includes>
</resource>
<resource>
<directory>src/main/java</directory>
<excludes>
<exclude>&gt;**/*.java</exclude>
</excludes>
</resource>
<resource>
<directory>src/main/resources</directory>
<includes>
<include>**/**</include>
</includes>
<filtering>false</filtering>
</resource>
</resources>
</build>
</project>
2.OBS工具类:
package com.obsupload.configur;

import com.obs.services.ObsClient;
import com.obs.services.exception.ObsException;
import com.obs.services.model.*;

import java.io.*;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
* @Author: zhengwj
* @Description:
* @Date: 2020/4/20 12:47
* @Version: 1.0
*/
public class OBSHandler {

private String accessKeyId;// 华为云的 Access Key Id
private String accessKeySecret;// 华为云的 Access Key Secret
private String endpoint; // 华为云连接的地址节点

private String obsBucketName; // 创建的桶的名称
private String url; // 访问OBS文件的url

private static ObsClient obsClient; // 进行操作的华为云的客户端组件


/**
* 创建华为云OBS的本地控制器
* @param accessKeyId
* @param accessKeySecret
* @param endpoint
*/
public OBSHandler(String accessKeyId, String accessKeySecret, String endpoint) {
this.accessKeyId = accessKeyId;
this.accessKeySecret = accessKeySecret;
this.endpoint = endpoint;
}

public OBSHandler(String accessKeyId, String accessKeySecret, String endpoint, String obsBucketName) {
this.accessKeyId = accessKeyId;
this.accessKeySecret = accessKeySecret;
this.endpoint = endpoint;
this.obsBucketName = obsBucketName;
}

/**
* 设置OBS访问的CDN路径
* @param url
*/
public void setUrlForCDN(String url) {
this.url = url;
}

/**
* 设置OBS操作的同桶名称
* @param obsBucketName
*/
public void setObsBucketName(String obsBucketName) {
this.obsBucketName = obsBucketName;
}

/**
* 获取华为云提供的操作客户端实体类
* @return
*/
public ObsClient getObsClient() {
if(obsClient == null) {
obsClient = new ObsClient(accessKeyId, accessKeySecret, endpoint);
}
return obsClient;
}

/**
* 下载ObsObject
* @param bucketName 操作的桶的名称 例:"wangmarket1232311"
* @param filePath 需要下载的文件路径。 例:"site/a.txt"
* @return 下载文件的字节数组
* @throws IOException
*/
public byte[] getFileByteArray(String bucketName, String filePath) throws IOException {
ObsObject obsObject = getObsClient().getObject(bucketName, filePath);
InputStream input = obsObject.getObjectContent();
byte[] b = new byte[1024];
ByteArrayOutputStream bos = new ByteArrayOutputStream();
int len;
while ((len = input.read(b)) != -1){
bos.write(b, 0, len);
}
bos.close();
input.close();
return bos.toByteArray();
}


/**
* 获取指定路径下的ObsObject数量
* @param bucketName 操作的桶的名称 例:"wangmarket1232311"
* @param filePath 需要检索的文件夹路径 例:"site/"
* @return 检索搜文件下的ObsObject的数量
*/
public Integer getFolderObjectsSize(String bucketName, String filePath) {
ListObjectsRequest request = new ListObjectsRequest(bucketName);
if(filePath != null && (!filePath.trim().equals(""))){
request.setPrefix(filePath);
}
ObjectListing result = getObsClient().listObjects(request);
return new Integer(result.getObjects().size());
}

/**
* 获取指定路径下的ObsObject
* @param bucketName 操作的桶的名称 例:"wangmarket1232311"
* @param filePath 需要检索的文件夹路径
* @return 路径下的所有的ObsObject,包括子文件夹下的ObsObject
*/
public List<ObsObject> getFolderObjects(String bucketName, String filePath) {
List<ObsObject> list = new ArrayList<ObsObject>();
ListObjectsRequest request = new ListObjectsRequest(bucketName);
if(filePath != null && (!filePath.trim().equals(""))){
request.setPrefix(filePath);
}
request.setMaxKeys(100);
ObjectListing result;
do{
result = getObsClient().listObjects(request);
for(ObsObject obsObject : result.getObjects()){
list.add(obsObject);
}
request.setMarker(result.getNextMarker());
}while(result.isTruncated());
return list;
}

/**
* 删除对象
* @param bucketName 操作的桶的名称 例:"wangmarket1232311"
* @param fileName 需要删除的对象全名 例:"site/20190817/localFile.sh"
* @return
*/
public DeleteObjectResult deleteObject(String bucketName, String fileName) {
return getObsClient().deleteObject(bucketName, fileName);
}

/**
* 创建文件夹
* @param bucketName 操作的桶的名称 例:"wangmarket1232311"
* @param fileName 新建文件夹的路径,总根路径开始,请务必以"/"结尾。例:"2019/0817/"
* @return
*/
public PutObjectResult mkdirFolder(String bucketName, String fileName) {
return getObsClient().putObject(bucketName, fileName, new ByteArrayInputStream(new byte[0]));
}

/**
* 通过流上传字符串为文件
* @param bucketName 操作的桶的名称 例:"wangmarket1232311"
* @param fileName 上传的路径和文件名 例:"site/2010/example.txt"
* @param content 上传的String字符
* @param encode 进行转换byte时使用的编码格式 例:"UTF-8"
* @return
* @throws ObsException
* @throws UnsupportedEncodingException
*/
public PutObjectResult putStringFile(String bucketName, String fileName, String content, String encode) throws ObsException, UnsupportedEncodingException {
return getObsClient().putObject(bucketName, fileName, new ByteArrayInputStream(content.getBytes(encode)));
}


/**
* 上传文件本地文件
* @param bucketName 操作的桶的名称 例:"wangmarket1232311"
* @param fileName 上传的路径和文件名 例:"site/2010/example.txt"
* @param localFile 需要上传的文件
* @return
*/
public PutObjectResult putLocalFile(String bucketName, String fileName, File localFile) {
return getObsClient().putObject(bucketName, fileName, localFile);
}


/**
* 上传文件流
* @param bucketName 操作的桶的名称 例:"wangmarket1232311"
* @param fileName 上传的路径和文件名 例:"site/2010/example.txt"
* @param inputStream 上传文件的输入流
* @return
*/
public PutObjectResult putFileByStream(String bucketName, String fileName, InputStream inputStream) {
return getObsClient().putObject(bucketName, fileName, inputStream);
}


/**
* 通过流上传文件并设置指定文件属性
* @param bucketName 操作的桶的名称 例:"wangmarket1232311"
* @param fileName 上传的路径和文件名 例:"site/2010/example.txt"
* @param inputStream 上传文件的输入流
* @param metaData 上传文件的属性
* @return
*/
public PutObjectResult putFilebyInstreamAndMeta(String bucketName, String fileName, InputStream inputStream, ObjectMetadata metaData) {
return getObsClient().putObject(bucketName, fileName, inputStream, metaData);
}


/**
* OBS内对象复制
* @param sourceBucketName 源文件的桶名称 例:"wangmarket1232311"
* @param sourcePath 源文件的路径和文件名 例:"site/2010/example.txt"
* @param destBucketName 目标文件的桶名称 例:"swangmarket34578345"
* @param destPath 目标文件的路径和文件名 例:"site/2010/example_bak.txt"
*/
public void copyObject(String sourceBucketName, String sourcePath,String destBucketName, String destPath) {
getObsClient().copyObject(sourceBucketName, sourcePath, destBucketName, destPath);
}


/**
* 获得原生OBSBucket的访问前缀
* @return 桶原生的访问前缀,即不经过CDN加速的访问路径
*/
public String getOriginalUrlForOBS() {
return "//" + obsBucketName + "." + endpoint.substring(8, endpoint.length()) + "/";
}


/**
* 通过bucket的名字和连接点信息获取bucket访问的url
* @param bucketName 桶的名称 例:"wangmarket21345665"
* @param endpoint 连接点的名称 例:"obs.cn-north-1"
* @return 根据信息获得桶的访问路径 例:"//wangmarket21345665.obs.cn-north-1.myhuaweicloud.com/"
*/
public String getUrlByBucketName(String bucketName, String endpoint) {
String url = null;
if (url == null || url.length() == 0) {
url = "//" + bucketName + "." + endpoint + ".myhuaweicloud.com" + "/";
}
return url;
}


/**
* 创建华为云ObsBucket,默认设置为标准存储,桶访问权限为公共读私有写,同策略为所有用户可读桶内对象和桶内对象版本信息
* @param obsBucketName 创建桶的名称
* @return 新创建的桶的名字
*/
public String createOBSBucket(String obsBucketName) {
// 将桶的名字进行保存
this.obsBucketName = obsBucketName;
ObsBucket obsBucket = new ObsBucket();
obsBucket.setBucketName(obsBucketName);
// 设置桶访问权限为公共读,默认是私有读写
obsBucket.setAcl(AccessControlList.REST_CANNED_PUBLIC_READ);
// 设置桶的存储类型为标准存储
obsBucket.setBucketStorageClass(StorageClassEnum.STANDARD);
// 创建桶
getObsClient().createBucket(obsBucket);
//设置桶策略
String json = "{"
+ "\"Statement\":["
+ "{"
+ "\"Sid\":\"为授权用户创建OBS使用的桶策略\","
+ "\"Principal\":{\"ID\" : \"*\"},"
+ "\"Effect\":\"Allow\","
+ "\"Action\":[\"GetObject\",\"GetObjectVersion\"],"
+ "\"Resource\": [\"" + obsBucketName + "/*\"]"
+ "}"
+ "]}";
getObsClient().setBucketPolicy(obsBucketName, json);
return obsBucketName;
}


/**
* 获取当前的桶列表
* @return 当前桶的列表信息
*/
public List<S3Bucket> getBuckets() {
return getObsClient().listBuckets();
}


/**
* 关闭当前的使用的OBSClient
*/
public void closeOBSClient() {
if(getObsClient() != null){
try {
getObsClient().close();
} catch (IOException e){
e.printStackTrace();
}
}
}

/**
* 返回当前的创建桶的名称 例:"wangmarket1232311"
* @return 如果有桶,那么返回桶的名称,如 "wangmarket1232311" ,如果没有,则返回 null
*/
public String getObsBucketName() {
return this.obsBucketName;
}


/**
* 返回当前的桶的访问路径 例:“ http://cdn.leimingyun.com/”
* @return 若已经手动设置CDN路径返回为CND路径,反之则为OBS原始的访问路径
*/
public String getUrl() {
// 用户没有配置CDN,获的桶的原生访问路径
if(url == null) {
url = getOriginalUrlForOBS();
}
return url;
}


/**
* 为对象设置公共读
* @param objectKey
*/
public HeaderResponse setObjectAclPubilcRead(String objectKey){

return obsClient.setObjectAcl(obsBucketName, objectKey, AccessControlList.REST_CANNED_PUBLIC_READ);

}

/**
* 获得下载路径
* @param objectKey
* @return
*/
public String signatureUrl(String objectKey){
long expireSeconds = 3600L;
Map<String, String> headers = new HashMap<String, String>();

String contentType = "text/plain";
headers.put("Content-Type", contentType);

TemporarySignatureRequest request = new TemporarySignatureRequest(HttpMethodEnum.PUT, expireSeconds);
request.setBucketName(obsBucketName);
request.setObjectKey(objectKey);
request.setHeaders(headers);

TemporarySignatureResponse response = obsClient.createTemporarySignature(request);

return response.getSignedUrl();
}

}

3.OBS业务实现类:
package com.obsupload.service;

import com.obsupload.configur.OBSHandler;
import org.apache.commons.io.filefilter.FileFilterUtils;
import org.apache.commons.io.filefilter.HiddenFileFilter;
import org.apache.commons.io.filefilter.IOFileFilter;
import org.apache.commons.io.monitor.FileAlterationListenerAdaptor;
import org.apache.commons.io.monitor.FileAlterationMonitor;
import org.apache.commons.io.monitor.FileAlterationObserver;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;

import java.io.File;
import java.util.concurrent.TimeUnit;

/**
* @Author: zhengwj
* @Description:
* @Date: 2020/4/20 12:45
* @Version: 1.0
*/
@Component
public class HuaweiyunOBS {

private final Logger log = LoggerFactory.getLogger(getClass());

private OBSHandler obsHandler;

@Value("${huawei.obs.accessKeyId}")
private String accessKeyId; //华为云的 Access Key Id
@Value("${huawei.obs.accessKeySecret}")
private String accessKeySecret; //华为云的 Access Key Secret
@Value("${huawei.obs.obsEndpoint}")
private String obsEndpoint; //格式如 obs.cn-north-1.myhuaweicloud.com
@Value("${huawei.obs.bucketName}")
private String bucketName; //obs桶名

@Value("${huawei.obs.parentPath}")
private String parentPath; //监控路径

/**
* 以下配置用于 补数据
*/
@Value("${local.specifiedPaths:}")
private String specifiedPaths; //指定上传路径
@Value("${local.vice.isopen:false}")
private boolean isOpenVice; //是否开启副应用

/**
* 获取连接
* @return
*/
public OBSHandler getObsHander() {
if(obsHandler == null) {
obsHandler = new OBSHandler(accessKeyId,accessKeySecret,obsEndpoint);
// 如果设置过CDN的路径测设置为CDN路径,没有设置则为桶原生的访问路径
//obsHandler.setUrlForCDN(Global.get("ATTACHMENT_FILE_URL"));
// 在数据库中读取进行操作的桶的明恒
obsHandler.setObsBucketName(bucketName);
// 对桶名称进行当前类内缓存
bucketName = obsHandler.getObsBucketName();
}
return obsHandler;
}


/**
* 增量录音文件上传OBS
* @param file
*/
@Async
public void excute(File file){
if(file.isFile() && file.getName().endsWith(".mp3")){
int index = file.getAbsolutePath().indexOf("monitor");
String fileName =file.getAbsolutePath().substring(index).replaceAll("\\\\", "/");
try{
getObsHander().putLocalFile(bucketName, fileName, file);
getObsHander().setObjectAclPubilcRead(fileName);
String url = getObsHander().signatureUrl(fileName);
log.info(url);
}catch (Exception e){
log.error("上传{}失败:{}",fileName,e);
}finally {
// getObsHander().closeOBSClient();
}
}
}

/**
* 文件监控
* 增量上传
*/
public void monitoring(){
if(isOpenVice){
log.info("开启上传指定路径下文件");
uploadSpecified();
}
log.info("开启监控.....");
// 轮询间隔 5 秒
long interval = TimeUnit.SECONDS.toMillis(1);
// 创建过滤器
IOFileFilter directories = FileFilterUtils.and(
FileFilterUtils.directoryFileFilter(),
HiddenFileFilter.VISIBLE);
IOFileFilter files = FileFilterUtils.and(
FileFilterUtils.fileFileFilter(),
FileFilterUtils.suffixFileFilter(".mp3"));
IOFileFilter filter = FileFilterUtils.or(directories, files);
// 使用过滤器
FileAlterationObserver observer = new FileAlterationObserver(new File(parentPath), filter);
//不使用过滤器
//FileAlterationObserver observer = new FileAlterationObserver(new File(rootDir));
observer.addListener(new FileListener());
//创建文件变化监听器
FileAlterationMonitor monitor = new FileAlterationMonitor(interval, observer);
// 开始监控
try {
monitor.start();
}catch (Exception e){
log.error("执行出错:{}",e);
}

}


/**
* 上传指定文件夹下的文件
*
*/
public void uploadSpecified(){
if(StringUtils.isEmpty(specifiedPaths)){
return;
}
String[] paths = specifiedPaths.split(",");
for(String specifiedPath : paths){
File specifiedFile = new File(specifiedPath);
if(specifiedFile.isDirectory()){
File[] files = specifiedFile.listFiles();
for (File file : files){
excute(file);
}
}
}
}



/**
* 文件夹监听器
*/
class FileListener extends FileAlterationListenerAdaptor {

private final Logger log = LoggerFactory.getLogger(getClass());

/**
* 文件创建执行
*/
public void onFileCreate(File file) {
log.info("[新建]:" + file.getAbsolutePath());
excute(file);
}

/**
* 文件删除
*/
public void onFileDelete(File file) {
// log.info("[删除]:" + file.getAbsolutePath());
}

/**
* 目录创建
*/
public void onDirectoryCreate(File directory) {
log.info("[新建]:" + directory.getAbsolutePath());
}

public void onStart(FileAlterationObserver observer) {
// TODO Auto-generated method stub
super.onStart(observer);
}
public void onStop(FileAlterationObserver observer) {
// TODO Auto-generated method stub
super.onStop(observer);
}

}

}
4.application.properties配置
#华为云的 Access Key Id
huawei.obs.accessKeyId=xxxxx
#华为云的 Access Key Secret
huawei.obs.accessKeySecret=xxxxxx
#华为云连接的地址节点
huawei.obs.obsEndpoint=xxxx
#桶的名称
huawei.obs.bucketName=xxxx

huawei.obs.parentPath=/files/monitor/
#huawei.obs.parentPath=D://files/monitor/

#上传指定路径下的文件 用于监控主程序挂掉之后补数据
#local.vice.isopen=false
#local.specifiedPaths=/files/monitor/20200428/,/files/monitor/20200427/


大致实现如上所示,完成代码请查看github:https://github.com/wojozer/obs-upload
欢迎留言交流分享!

推荐阅读