一:意义
1.意义
如果可以实现这个功能,就可以使用spark代替sqoop,功能程序就实现这个功能。
二:hive操作
1.准备数据
启动hive
否则报错,因为在hive与spark集成的时候,配置过配置项。
后来,又看见这个文档,感觉很好的解释了我存在的问题:https://blog.csdn.net/freedomboy319/article/details/44828337
2.新建部门员工表
1 -》创建员工表 2 create table emp( 3 empno int, 4 ename string, 5 job string, 6 mgr int, 7 hiredate string, 8 sal double, 9 comm double,10 deptno int11 )12 row format delimited fields terminated by '\t';13 load data local inpath '/opt/datas/emp.txt' into table emp;14 15 16 -》部门表17 create table dept(18 deptno int,19 dname string,20 loc string21 )22 row format delimited fields terminated by '\t';23 load data local inpath '/opt/datas/dept.txt' into table dept;
3.效果
三:程序
1.大纲
2.前提
需要hive-site.xml
3.需要的依赖
1 23 8 9 10org.apache.spark 4spark-hive_2.10 5${spark.version} 6provided 711 mysql 12mysql-connector-java 136.0.4 14
4.报错如下
1 Exception in thread "main" java.sql.SQLNonTransientConnectionException: CLIENT_PLUGIN_AUTH is required 2 at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:550) 3 at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:537) 4 at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:527) 5 at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:512) 6 at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:480) 7 at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:498) 8 at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:494) 9 at com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:72)10 at com.mysql.cj.jdbc.ConnectionImpl.createNewIO(ConnectionImpl.java:1634)11 at com.mysql.cj.jdbc.ConnectionImpl.(ConnectionImpl.java:637)12 at com.mysql.cj.jdbc.ConnectionImpl.getInstance(ConnectionImpl.java:351)13 at com.mysql.cj.jdbc.NonRegisteringDriver.connect(NonRegisteringDriver.java:224)14 at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$createConnectionFactory$2.apply(JdbcUtils.scala:61)15 at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$createConnectionFactory$2.apply(JdbcUtils.scala:52)16 at org.apache.spark.sql.DataFrameWriter.jdbc(DataFrameWriter.scala:278)17 at com.scala.it.HiveToMysql$.main(HiveToMysql.scala:28)18 at com.scala.it.HiveToMysql.main(HiveToMysql.scala)19 Caused by: com.mysql.cj.core.exceptions.UnableToConnectException: CLIENT_PLUGIN_AUTH is required20 at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)21 at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)22 at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)23 at java.lang.reflect.Constructor.newInstance(Constructor.java:423)24 at com.mysql.cj.core.exceptions.ExceptionFactory.createException(ExceptionFactory.java:54)25 at com.mysql.cj.core.exceptions.ExceptionFactory.createException(ExceptionFactory.java:73)26 at com.mysql.cj.mysqla.io.MysqlaProtocol.rejectConnection(MysqlaProtocol.java:319)27 at com.mysql.cj.mysqla.authentication.MysqlaAuthenticationProvider.connect(MysqlaAuthenticationProvider.java:207)28 at com.mysql.cj.mysqla.io.MysqlaProtocol.connect(MysqlaProtocol.java:1361)29 at com.mysql.cj.mysqla.MysqlaSession.connect(MysqlaSession.java:132)30 at com.mysql.cj.jdbc.ConnectionImpl.connectOneTryOnly(ConnectionImpl.java:1754)31 at com.mysql.cj.jdbc.ConnectionImpl.createNewIO(ConnectionImpl.java:1624)32 ... 8 more
原因:
mysql-connect版本不匹配,换5.1.17版本。
5.程序
1 package com.scala.it 2 3 import java.util.Properties 4 5 import org.apache.spark.sql.SaveMode 6 import org.apache.spark.sql.hive.HiveContext 7 import org.apache.spark.{SparkConf, SparkContext} 8 9 object HiveToMysql {10 def main(args: Array[String]): Unit = {11 val conf = new SparkConf()12 .setMaster("local[*]")13 .setAppName("hive-yo-mysql")14 val sc = SparkContext.getOrCreate(conf)15 val sqlContext = new HiveContext(sc)16 val (url, username, password) = ("jdbc:mysql://linux-hadoop01.ibeifeng.com:3306/hadoop09", "root", "123456")17 val props = new Properties()18 props.put("user", username)19 props.put("password", password)20 21 // ==================================22 // 第一步:同步hive的dept表到mysql中23 sqlContext24 .read25 .table("hadoop09.dept") // database.tablename26 .write27 .mode(SaveMode.Overwrite) // 存在覆盖28 .jdbc(url, "mysql_dept", props)29 }30 }
6.效果