How to use rdb
PostGre SQL
String url = "jdbc:postgresql://localhost/test";
Properties props = new Properties();
props.setProperty("user","fred");
props.setProperty("password","secret");
props.setProperty("ssl","true");
Connection conn = DriverManager.getConnection(url, props);
String url = "jdbc:postgresql://localhost/test?user=fred&password=secret&ssl=true";
// Oracle
// url = String.format("jdbc:oracle:thin:@%s:%d:%s", dbip, dbport, dbname);
Connection conn = DriverManager.getConnection(url);
DBConnection common class
package com.datatrans;
import java.sql.Connection;
import java.sql.DriverManager;
/**
* DB config class
*/
public class DBConnection {
private String dbtype = null; // ex: "oracle"
private String dbname = null;
private String dbip = null;
private int dbport = 0; // ex: 1521
private String dbuser = null;
private String dbpass = null;
private String dbstr = null;
private Connection conn = null;
private String driverName = null; // oracle.jdbc.driver.OracleDriver
public DBConnection(String dbType, String dbName, String dbIP, int dbPort, String user, String pass) throws Exception {
dbtype = dbType;
dbname = dbName;
dbip = dbIP;
dbport = dbPort;
dbuser = user;
dbpass = pass;
//LOG.write(String.format("DB: [%s][%s][%s][%d][%s]", dbtype, dbname, dbip, dbport, dbuser));
if (dbtype.equals("oracle")) {
if (dbport <= 0) dbport = 1521; // default
dbstr = String.format("jdbc:%s:thin:@%s:%d:%s", dbtype, dbip, dbport, dbname); // jdbc:oracle:this:@localhost:1521:ORCL
driverName = "oracle.jdbc.driver.OracleDriver";
}
else if(dbtype.equals("postgresql")) {
if (dbport <= 0) dbport = 5432; // default
dbstr = String.format("jdbc:%s://%s:%d/%s", dbtype, dbip, dbport, dbname); // jdbc:postgresql:dbName
driverName = "org.postgresql.Driver";
}
else if(dbtype.equals("mysql")) {
if (dbport <= 0) dbport = 3306; // default
dbstr = String.format("jdbc:%s://%s:%d/%s", dbtype, dbip, dbport, dbname); // jdbc:mysql://localhost:3306/dbName
//driverName = "org.git.mm.mysql.Driver";
driverName = "com.mysql.jdbc.Driver";
}
else if(dbtype.equals("odbc")) {
dbstr = String.format("jdbc:%s:%s", dbtype, dbname); // jdbc:odbc:OdbcDbName
driverName = "sun.jdbc.odbc.JdbcOdbcDriver";
}
else {
throw new Exception("Unexpected DbType(ex: oracle, postgresql, mysql, odbc, ...)");
}
//LOG.write(String.format("[%s][%s]", driverName, dbstr));
}
/**
* DB Connection
* @return
*/
public Connection getConnection() {
try {
if (conn == null) {
Class.forName(driverName);
conn = DriverManager.getConnection(dbstr, dbuser, dbpass);
}
} catch (Exception e) {
LOG.error("DB connection failed: " + e.getMessage());
}
return conn;
}
}
DataTrans from DB to DB - #1: Read and write step by step
package com.datatrans;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
import java.io.*;
import java.sql.*;
import java.util.stream.Collectors;
/**
* DataTrans Main
*/
class DataTrans extends Thread {
public static void main(String args[]) {
String cfgfilename = null; // Default
JSONObject jobj = null;
for (int idx = 0 ; idx < args.length ; idx++) {
if (args[idx].startsWith("-f")) {
cfgfilename = args[idx].substring(2);
//LOG.write(String.format("[%d]%s", args.length, args[idx]));
}
}
if (cfgfilename == null) {
LOG.error("-f<FileName> is needed.");
return;
}
// Read Config
try {
File file = new File(cfgfilename);
FileInputStream fis = new FileInputStream(file);
byte[] data = new byte[(int) file.length()];
fis.read(data);
//String str = new String(data, "UTF-8");
String str = new String(data, "EUC-KR");
fis.close();
JSONParser parser = new JSONParser();
jobj = (JSONObject)parser.parse(str);
} catch (Exception e) {
LOG.error(String.format("Failed to read file:[%s] %s", cfgfilename, e.getMessage()));
return;
}
DataTrans sender = new DataTrans();
sender.start(jobj);
} /* End of main */
/**
* Start Main
* @param jcfg
*/
public void start(JSONObject jcfg) {
int colCnt = 0;
// Read config
DataTransCfg cfg = new DataTransCfg((JSONObject)jcfg);
DBConnection rdbconn = null;
Connection rconn = null;
PreparedStatement rpstmt = null;
ResultSet rs = null;
DBConnection wdbconn = null;
Connection wconn = null;
try {
if (cfg.logging) LOG.write("Read DB Connecting...");
if (cfg.logging) LOG.write(String.format("DB: [%s][%s][%s][%d][%s]", cfg.rDbType, cfg.rDbName, cfg.rDbIp, cfg.rDbPort, cfg.rDbUser));
rdbconn = new DBConnection(cfg.rDbType, cfg.rDbName, cfg.rDbIp, cfg.rDbPort, cfg.rDbUser, cfg.rDbPass);
rconn = rdbconn.getConnection();
if (rconn != null) {
if (cfg.logging) LOG.write("Read DB Connected");
}
if (cfg.logging) LOG.write("Write DB Connecting...");
if (cfg.logging) LOG.write(String.format("DB: [%s][%s][%s][%d][%s]", cfg.wDbType, cfg.wDbName, cfg.wDbIp, cfg.wDbPort, cfg.wDbUser));
wdbconn = new DBConnection(cfg.wDbType, cfg.wDbName, cfg.wDbIp, cfg.wDbPort, cfg.wDbUser, cfg.wDbPass);
wconn = wdbconn.getConnection();
wconn.setAutoCommit(false);
if (wconn != null) {
if (cfg.logging) LOG.write("Write DB Connected");
}
} catch (Exception e) {
LOG.error("DB Connection failed: " + e.getMessage());
}
if (rconn == null) {
LOG.error("Read DB connection failed");
return;
}
if (wconn == null) {
LOG.error("Write DB connection failed");
return;
}
// -----------------------------
// Open Read DB (Select)
// -----------------------------
try {
String rquery = readSqlFile(cfg.rSqlFilename);
String wquery = readSqlFile(cfg.wSqlFilename);
rpstmt = rconn.prepareStatement(rquery);
rs = rpstmt.executeQuery(); // select
int exeCnt = 0;
int colType;
String colData = "";
while (rs.next())
{
colCnt = rs.getMetaData().getColumnCount();
PreparedStatement wpstmt = wconn.prepareStatement(wquery);
for (int idx = 1 ; idx <= colCnt ; idx++)
{
ResultSetMetaData rsmd = rs.getMetaData();
colType = rsmd.getColumnType(idx);
if (cfg.logging) LOG.write(String.format("[%d][%s][%s][%d][%s]", idx, rsmd.getColumnName(idx), rsmd.getColumnTypeName(idx), rsmd.getScale(idx), rsmd.getColumnClassName(idx)));
switch (colType) {
case Types.VARCHAR:
case Types.CHAR:
case Types.LONGVARCHAR:
if (cfg.logging) LOG.write(String.format("--->%s:(%s)", "setString", rs.getString(idx)));
wpstmt.setString(idx, rs.getString(idx));
break;
case Types.INTEGER:
if (cfg.logging) LOG.write(String.format("--->%s:(%d)", "setInt", rs.getInt(idx)));
wpstmt.setInt(idx, rs.getInt(idx));
break;
case Types.BIT:
if (cfg.logging) LOG.write(String.format("--->%s:(%s)", "setBoolean", Boolean.toString(rs.getBoolean(idx))));
wpstmt.setBoolean(idx, rs.getBoolean(idx));
break;
case Types.NUMERIC:
if (rsmd.getScale(idx) > 0) {
if (cfg.logging) LOG.write(String.format("--->%s:(%f)", "setFloat", rs.getFloat(idx)));
wpstmt.setFloat(idx, rs.getFloat(idx));
} else {
if (cfg.logging) LOG.write(String.format("--->%s:(%s)", "setBigDecimal", rs.getBigDecimal(idx).toString()));
wpstmt.setBigDecimal(idx, rs.getBigDecimal(idx));
}
break;
case Types.TINYINT:
if (cfg.logging) LOG.write(String.format("--->%s:(%s)", "setByte", String.valueOf(rs.getByte(idx))));
wpstmt.setByte(idx, rs.getByte(idx));
break;
case Types.SMALLINT:
if (cfg.logging) LOG.write(String.format("--->%s:(%d)", "setShort", rs.getShort(idx)));
wpstmt.setShort(idx, rs.getShort(idx));
break;
case Types.BIGINT:
if (cfg.logging) LOG.write(String.format("--->%s:(%d)", "setLong", rs.getLong(idx)));
wpstmt.setLong(idx, rs.getLong(idx));
break;
case Types.REAL:
case Types.FLOAT:
if (cfg.logging) LOG.write(String.format("--->%s:(%f)", "setFloat", rs.getFloat(idx)));
wpstmt.setFloat(idx, rs.getFloat(idx));
break;
case Types.DOUBLE:
if (cfg.logging) LOG.write(String.format("--->%s:(%f)", "setDouble", rs.getDouble(idx)));
wpstmt.setDouble(idx, rs.getDouble(idx));
break;
case Types.VARBINARY:
case Types.BINARY:
if (cfg.logging) LOG.write(String.format("--->%s:(%s)", "setBytes", rs.getBytes(idx).toString()));
wpstmt.setBytes(idx, rs.getBytes(idx));
break;
case Types.DATE:
if (cfg.logging) LOG.write(String.format("--->%s:(%s)", "setDate", rs.getDate(idx).toString()));
wpstmt.setDate(idx, rs.getDate(idx));
break;
case Types.TIME:
if (cfg.logging) LOG.write(String.format("--->%s:(%s)", "setTime", rs.getTime(idx).toString()));
wpstmt.setTime(idx, rs.getTime(idx));
break;
case Types.TIMESTAMP:
if (cfg.logging) LOG.write(String.format("--->%s:(%s)", "setTimestamp", rs.getTimestamp(idx).toString()));
wpstmt.setTimestamp(idx, rs.getTimestamp(idx));
break;
case Types.CLOB:
if (cfg.logging) LOG.write(String.format("--->%s:(%s)", "setClob", ""));
wpstmt.setClob(idx, rs.getClob(idx));
break;
case Types.BLOB:
if (cfg.logging) LOG.write(String.format("--->%s:(%s)", "setBlob", ""));
wpstmt.setBlob(idx, rs.getBlob(idx));
break;
case Types.ARRAY:
if (cfg.logging) LOG.write(String.format("--->%s:(%s)", "setARRAY", rs.getArray(idx).toString()));
wpstmt.setArray(idx, rs.getArray(idx));
break;
case Types.REF:
if (cfg.logging) LOG.write(String.format("--->%s:(%s)", "setRef", rs.getRef(idx).toString()));
wpstmt.setRef(idx, rs.getRef(idx));
break;
default:
if (cfg.logging) LOG.write(String.format("Unknown Column Type: %d", colType));
break;
} // End of switch
} // End of for
exeCnt = exeCnt + wpstmt.executeUpdate(); // insert, update, delete
if (wpstmt != null) wpstmt.close();
} // End of while
LOG.write(String.format("Execution Count: %d", exeCnt));
wconn.commit(); // Commit all statements
if (cfg.logging) LOG.write(String.format("Commit completed"));
} catch (SQLException sqle) {
LOG.error("SQL Exception : " + sqle.toString());
} catch (Exception e) {
LOG.error(String.format("Unknown Exception %s", e.toString()));
e.printStackTrace();
} finally {
// Disconnect Database
try {
if ( rs != null ){rs.close();}
if ( rpstmt != null ){rpstmt.close();}
if ( rconn != null ){rconn.close(); }
if ( wconn != null ){wconn.close(); }
} catch(Exception e){
throw new RuntimeException(e.getMessage());
}
}
}
/**
* Read file
* @param filename
* @return
*/
public String readSqlFile(String filename) {
BufferedReader br = null;
String data = null;
try {
br = new BufferedReader(new FileReader(filename));
data = br.lines().collect(Collectors.joining());
br.close();
} catch (FileNotFoundException e) {
// TODO: handle exception
} catch(IOException e){
LOG.error(e.toString());
}
return data;
}
}
DataTrans from DB to DB - #2: Read all and write all
class ColDataSet {
String colName;
String colValue;
int colType;
Object colObject;
}
/**
* Process Main
* @param jcfg
*/
public void start(JSONObject jcfg) {
// can now access non-static fields
// -----------------------------
// Start UDP receive thread
// -----------------------------
//Thread thread = new FepMonSender();
//thread.start();
final String COL_TYPE = "_Type";
Connection conn = null;
PreparedStatement pstmt = null;
Statement stmt = null;
ResultSet rs = null;
int colCnt = 0;
ArrayList<ArrayList<ColDataSet>> arrayList = new ArrayList<ArrayList<ColDataSet>>();
// Read Config
DataTransCfg cfg = new DataTransCfg((JSONObject)jcfg);
// -----------------------------
// Open Read DB (Select)
// -----------------------------
try {
DBConnection oradb = new DBConnection(cfg.rdDbType, cfg.rdDbName, cfg.rdDbIp, cfg.rdDbPort);
LOG.write("Read DB Connecting...");
String query = readSqlFile(cfg.rdSqlFilename);
conn = oradb.getConnection();
if (conn == null)
LOG.write("Read DB Connected");
pstmt = conn.prepareStatement(query);
rs = pstmt.executeQuery(); // select
while (rs.next())
{
colCnt = rs.getMetaData().getColumnCount();
ArrayList<ColDataSet> colArrList = new ArrayList<ColDataSet>();
for (int idx = 1 ; idx <= colCnt ; idx++)
{
ColDataSet ds = new ColDataSet();
ds.colName = rs.getMetaData().getColumnName(idx);
ds.colValue = rs.getString(idx);
ds.colType = rs.getMetaData().getColumnType(idx);
ds.colObject = rs.getObject(idx);
colArrList.add(ds);
}
arrayList.add(colArrList);
}
} catch (SQLException sqle) {
LOG.write("SQL Exception : " + sqle.toString());
} catch (Exception e) {
LOG.write("Unknown error : " + e.toString());
e.printStackTrace();
} finally {
// Disconnect Database
try {
if ( rs != null ){rs.close();}
if ( pstmt != null ){pstmt.close();}
if ( stmt != null ){stmt.close();}
if ( conn != null ){conn.close(); }
} catch(Exception e){
throw new RuntimeException(e.getMessage());
}
}
// -----------------------------
// Open Write DB
// -----------------------------
try {
DBConnection oradb = new DBConnection(cfg.wrDbType, cfg.wrDbName, cfg.wrDbIp, cfg.wrDbPort);
LOG.write("Write DB Connecting...");
String query = readSqlFile(cfg.wrSqlFilename);
conn = oradb.getConnection();
if (conn == null)
LOG.write("Write DB Connected");
// sql example: insert into customers(id, pass) values(?, ?)
pstmt = conn.prepareStatement(query);
for (int idx = 0 ; idx < arrayList.size() ; idx++)
{
ArrayList<ColDataSet> alist = arrayList.get(idx);
for (int jdx = 0 ; jdx < alist.size() ; jdx++)
{
switch (alist.get(jdx).colType) {
case Types.VARCHAR:
case Types.CHAR:
case Types.LONGVARCHAR:
LOG.write("setString");
pstmt.setString(jdx + 1, alist.get(jdx).colValue);
break;
case Types.INTEGER:
LOG.write("setInt");
pstmt.setInt(jdx + 1, Integer.valueOf(alist.get(jdx).colValue));
break;
case Types.BIT:
LOG.write("setBoolean");
pstmt.setBoolean(jdx + 1, Boolean.valueOf(alist.get(jdx).colValue));
break;
case Types.NUMERIC:
LOG.write("setBigDecimal");
pstmt.setBigDecimal(jdx + 1, BigDecimal.valueOf(Long.valueOf(alist.get(jdx).colValue)));
break;
case Types.TINYINT:
LOG.write("setByte");
pstmt.setByte(jdx + 1, Byte.valueOf(alist.get(jdx).colValue));
break;
case Types.SMALLINT:
LOG.write("setShort");
pstmt.setInt(jdx + 1, Integer.valueOf(alist.get(jdx).colValue));
break;
case Types.BIGINT:
LOG.write("setLong");
pstmt.setInt(jdx + 1, Integer.valueOf(alist.get(jdx).colValue));
break;
case Types.REAL:
case Types.FLOAT:
LOG.write("setFloat");
pstmt.setFloat(jdx + 1, Float.valueOf(alist.get(jdx).colValue));
break;
case Types.DOUBLE:
LOG.write("setDouble");
pstmt.setDouble(jdx + 1, Double.valueOf(alist.get(jdx).colValue));
break;
case Types.VARBINARY:
case Types.BINARY:
LOG.write("setBytes");
pstmt.setBytes(jdx + 1, alist.get(jdx).colValue.getBytes());
break;
case Types.DATE:
LOG.write("setDate");
java.sql.Date sqlDate = new java.sql.Date(Long.valueOf(alist.get(jdx).colValue));
pstmt.setDate(jdx + 1, sqlDate);
break;
case Types.TIME:
LOG.write("setTime");
java.sql.Time sqlTime = new java.sql.Time(Long.valueOf(alist.get(jdx).colValue));
pstmt.setTime(jdx + 1, sqlTime);
break;
case Types.TIMESTAMP:
LOG.write("setTimestamp");
java.sql.Timestamp sqlTimeStamp = new java.sql.Timestamp(Long.valueOf(alist.get(jdx).colValue));
pstmt.setTimestamp(jdx + 1, sqlTimeStamp);
break;
case Types.CLOB:
LOG.write("setClob");
// if (alist.get(jdx).colObject != null) {
// java.sql.Clob sqlClob = new java.sql.Clob();
// sqlClob.getBytes(1, (int)sqlClob.length());
// //pstmt.setClob(jdx + 1, alist.get(jdx).colValue);
// }
break;
case Types.BLOB:
LOG.write("setBlob");
// java.sql.Blob sqlBlob = (java.sql.Blob)alist.get(jdx).colObject;
// sqlBlob.getBytes(1, (int)sqlBlob.length());
// pstmt.setBlob(jdx + 1, sqlBlob);
break;
case Types.ARRAY:
LOG.write("setARRAY");
//pstmt.setString(jdx + 1, alist.get(jdx).colValue);
break;
case Types.REF:
LOG.write("setRef");
//pstmt.setRef(jdx + 1, alist.get(jdx).colValue);
break;
case Types.STRUCT:
LOG.write("setStruct");
break;
} // End of switch
}
}
int rtn = pstmt.executeUpdate(); // insert, update, delete
LOG.write(String.format("Execution Count: %d", rtn));
} catch (SQLException sqle) {
LOG.write("SQL Exception : " + sqle.toString());
} catch (Exception e) {
LOG.write("Unknown error : " + e.toString());
e.printStackTrace();
} finally {
// Disconnect Database
try {
if ( rs != null ){rs.close();}
if ( pstmt != null ){pstmt.close();}
if ( stmt != null ){stmt.close();}
if ( conn != null ){conn.close(); }
} catch(Exception e){
throw new RuntimeException(e.getMessage());
}
}
//try {
// thread.interrupt();
//} catch (Exception e) {
// writeLog("Failed to stop thread: " + e.getMessage());
//}
}
Leave a comment