Bo's Oracle Station

【博客文章2023】利用Oracle Universal Connection Pool线程管理和RAC数据库Transaction Guard功能来保障并发执行事务

2023-3-24 16:55| 发布者: botang| 查看: 19| 评论: 0

摘要: 利用Oracle Universal Connection Pool线程管理和RAC数据库Transaction Guard功能来保障并发执行事务。
【博客文章2023】利用Oracle Universal Connection Pool线程管理和RAC数据库Transaction Guard功能来保障并发执行事务

Author: Bo Tang

1. 编写Oracle Universal Connection Pool线程管理程序,该程序可调用任意PL/SQL存储过程:

    在数据库中,准备一个hr用户的存储过程:

 
    编写Worker.java,该程序可以调用任意一个PL/SQL程存储过程:

package Test;

import javax.sql.DataSource;
import java.sql.*;
import oracle.ucp.jdbc.ValidConnection;
import oracle.ucp.jdbc.PoolDataSource;
import oracle.ucp.jdbc.oracle.OracleJDBCConnectionPoolStatistics;
import java.util.Random;
import java.text.SimpleDateFormat;
import java.text.DateFormat;
import java.util.Calendar;

class Worker implements Runnable {
  PoolDataSource ds;
  Random random;
  Boolean cpuIntensive;
  Worker(PoolDataSource _ds) {
    ds = _ds;
    random = new Random();
  }
 
  void databaseWorkload(Connection c) throws SQLException {
//准备SQL语句
//只循环1次,这样写便于以后更改重复执行的次数
    for(int i=0;i<1;i++) 
    {
       PreparedStatement pstmt = c.prepareStatement("begin hr.proctest1; end;");
       try{
            pstmt.executeUpdate();
            if (Test.VERBOSE) { System.out.println("Adding row to test1"); }

        } catch(SQLException insertsqlex)
        {
         if (insertsqlex instanceof SQLIntegrityConstraintViolationException) {
            if (insertsqlex.getMessage().startsWith("ORA-00001: unique constraint (HR.PK_TEST1) violated")){
              System.out.println("in catch block for constraint violation\n");
            }
         }
         else throw insertsqlex;
       }
      pstmt.close();
    }
     // c.rollback();
     c.commit();
  }
  
  public void run() {
    long counter = 0;
     boolean retry = false;
     Connection c = null;
     DateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd HH:MM:SS");
     Calendar cal = Calendar.getInstance();
     
      long nanoTimeStart=0,timeSpentOnDb = 0;
      try {
        c = ds.getConnection();
        // Make sure auto commit if off:
        c.setAutoCommit(false);

          ResultSet rs;
          Statement stmt = c.createStatement();
          
          rs =

           stmt.executeQuery("select '... Connected to '||sys_context('userenv','instance_name') from dual");
  
          while (rs.next()) { 
              // Only display for VERBOSE operation 
              if (Test.VERBOSE) { System.out.println( dateFormat.format(cal.getTime()) + "  " + rs.getString(1)); }
          }
          rs.close();
          stmt.close();
          */
           if (retry) {
              System.out.println(" Application driven connection retry succeeded");
              retry = false;
           }

        nanoTimeStart = System.nanoTime();
        
    
        if (Test.VERBOSE) { System.out.println("Executing databaseWorkload()"); }
        databaseWorkload(c);
        
      } catch (SQLException ea) {
/* 有了AC,应用开发人员就不用编写代码来恢复事务*/   
        try {
         if (c == null ||!((ValidConnection)c).isValid()){
          ea.printStackTrace();
          System.out.println("Application error handling: attempting to get a new connection "+ea.getMessage()+".");
           c.close();
           String fcfInfo = ((OracleJDBCConnectionPoolStatistics) ds.getStatistics()).getFCFProcessingInfoProcessedOnly();
           System.out.println("FCF information: " + fcfInfo);
            retry = true;
         } else {
          System.out.println("unknown exception: " + ea);
          }
        }catch (SQLException ea1) {}
        //
        synchronized (Test.statsLock) {
          Test.nbOfExceptions++;
          if(Test.applicationCrashOnErrors && Test.nbOfExceptions > 20)
          {
            System.err.println("20 fatal exceptions.");
            System.err.println("");
            System.err.println("*** APPLICATION CRASHED ***");
            System.err.println("");
            System.exit(1);
          }
        }
        if(Test.VERBOSE) {
          ea.printStackTrace();
          
          System.err.println("."+ea.getMessage()+".");
        }
        
      } finally {
        timeSpentOnDb = (System.nanoTime()-nanoTimeStart)/1000000; // in ms
        try {
          if (c != null) {
            c.close();
            if (Test.VERBOSE) { System.out.println("Closed connection"); }
          }
        } catch (SQLException ea) {}
      }

   
      if(counter > 0) {
        synchronized (Test.statsLock) {
          Test.operationsCompleted++;
          
          Test.timeSpentOnDb += timeSpentOnDb;
        }
      }

      if (Test.threadThinkTime > 0) {
        // Introduce delay between requests for processing webpages
        long timeToSleep = Test.threadThinkTime +
          random.nextInt((Test.threadThinkTime<10)?10:Test.threadThinkTime/10);
        try {
          Thread.sleep(timeToSleep);
        } catch (Exception ea) {}
      }
      counter++;
    }
  }


2. 用线程模拟并发执行, 用AC来保证事务执行时的故障转移

    编写调用Worker.java的主程序Test.java:

package Test;
import java.io.FileInputStream;
import java.io.IOException;
import java.sql.SQLException;
import java.util.Properties;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;

import javax.sql.DataSource;

import oracle.ucp.admin.UniversalConnectionPoolManagerImpl;
import oracle.ucp.jdbc.PoolDataSource;
import oracle.ucp.jdbc.PoolDataSourceFactory;
import oracle.ucp.admin.UniversalConnectionPoolManager;
import oracle.ucp.UniversalConnectionPoolAdapter;
import oracle.ucp.UniversalConnectionPoolException;
import oracle.ucp.UniversalConnectionPoolStatistics;


public class Test extends Thread {

  static final int DELAY_BETWEEN_PRINTING_STATS = 5 * 1000;
  static boolean VERBOSE = true;
  private static String PROP_FILE="test.properties";
  static int connectionWaitTimeout = 3; // seconds
 
  static int nbOfThreads = 0;
  static int ucpPoolSize = 0;
  static int threadThinkTime = 0;
  // 12.2以上版本(或者打过p31112088)补丁,不需要验证连接有效性
  static boolean validateConnectionOnBorrow = false;

  static boolean applicationCrashOnErrors = true;
  static boolean fastConnectionFailover = false;

  static boolean cpuIntensive = false;

  static final Object statsLock = new Object();
  static int operationsCompleted = 0;
  
  static long timeSpentOnDb = 0;
  
  static int nbOfExceptions = 0;


  static public void main(String args[])
    throws SQLException {
    Connection conn = null;
    PreparedStatement pstmt = null;
    ResultSet rs = null;

    if(args.length > 0) {
      PROP_FILE = args[0];
    }
    try {
      Properties prop = new Properties();
      try {
        prop.load(new FileInputStream(PROP_FILE));
      } catch (IOException e) {e.printStackTrace();}
      
      nbOfThreads = Integer.parseInt(prop.getProperty("number_of_threads"));
      ucpPoolSize = Integer.parseInt(prop.getProperty("ucp_pool_size"));
      threadThinkTime = Integer.parseInt(prop.getProperty("thread_think_time","20"));
      VERBOSE = Boolean.parseBoolean(prop.getProperty("verbose","false"));
      applicationCrashOnErrors = Boolean.parseBoolean(prop.getProperty("application_crash_on_errors","true"));
      fastConnectionFailover = Boolean.parseBoolean(prop.getProperty("fastConnectionFailover","false"));
      validateConnectionOnBorrow = Boolean.parseBoolean(prop.getProperty("validateConnectionOnBorrow","false"));
      connectionWaitTimeout = Integer.parseInt(prop.getProperty("connectionWaitTimeout","3"));
      PoolDataSource pds = PoolDataSourceFactory.getPoolDataSource();
      pds.setConnectionFactoryClassName(prop.getProperty("datasource"));
      
      pds.setUser(prop.getProperty("username","HR"));
      pds.setPassword(prop.getProperty("password","oracle_4U"));
      pds.setURL(prop.getProperty("url"));
      pds.setConnectionPoolName(UCP_POOL_NAME);
      pds.setConnectionWaitTimeout(connectionWaitTimeout);
      pds.setFastConnectionFailoverEnabled(fastConnectionFailover);
      pds.setValidateConnectionOnBorrow(validateConnectionOnBorrow);
      pds.setInitialPoolSize(ucpPoolSize);
      pds.setMinPoolSize(ucpPoolSize);
      pds.setMaxPoolSize(ucpPoolSize);
      pds.setConnectionProperties(prop);

      System.out.println("######################################################");
      System.out.println("Connecting to " + prop.getProperty("url"));
      System.out.println(" # of Threads             : " + nbOfThreads);
      System.out.println(" UCP pool size            : " + ucpPoolSize);
      System.out.println("FCF Enabled:  " + pds.getFastConnectionFailoverEnabled());
      System.out.println("VCoB Enabled: " + pds.getValidateConnectionOnBorrow());
      System.out.println("ONS Configuration:  " + pds.getONSConfiguration());
      System.out.println("Enable Intensive Wload:  " + cpuIntensive);
      System.out.format("Thread think time        : %d ms\n",
        threadThinkTime);
      System.out.println("######################################################");
      System.out.println("");
      
      // 使用UniversalConnectPoolManagerImpl启动连接池:
      UniversalConnectionPoolManager poolManager = 
        UniversalConnectionPoolManagerImpl.getUniversalConnectionPoolManager();
      poolManager.createConnectionPool((UniversalConnectionPoolAdapter)pds);
      System.out.println("Starting the pool now... (please wait)");
      long start = System.currentTimeMillis();
      poolManager.startConnectionPool(UCP_POOL_NAME);
      long end = System.currentTimeMillis();
      System.out.println("Pool is started in "+(end-start)+"ms");
      Test u = new Test();
      u.runDemo(pds);
    } catch (SQLException sqlea) {
      do{
        sqlea.printStackTrace();
        sqlea = sqlea.getNextException();
      }
      while(sqlea != null);
    } 
    catch (Exception ea) {
      System.out.println("Error during execution: " + ea);
      ea.printStackTrace();
    } finally {
         if (rs != null) rs.close(); 
         if (pstmt != null) pstmt.close(); 
         if (conn != null) conn.close(); 
    }

  }

  /**
   * 启动Worker线程,,每一个线程并发运行一次PL/SQL存储过程: 
   */
  private void runDemo(PoolDataSource pds)
    throws Exception {

    Thread[] t = new Thread[nbOfThreads];

    for (int i = 0; i < nbOfThreads; ++i) {
      t[i] = new Thread(new Worker(pds));
      t[i].start();
    }

    /*  显示线程统计信息 */
    Thread stat = new PrintStatThread();
    stat.start();
  
    for (int i = 0; i < nbOfThreads; ++i) {
      t[i].join();
    }
    needToPrintStats = false;
    stat.interrupt();
    //acStat.interrupt();

  }

  static boolean needToPrintStats = true;
  static String UCP_POOL_NAME="actest"; 
}

3. PrintStatThread类: 

    PrintStatThread类显示Oracle Universal Connect Pool当前统计信息

package Test;

import oracle.ucp.admin.UniversalConnectionPoolManagerImpl;
import oracle.jdbc.replay.ReplayStatistics;

/*
 每隔5秒显示UCP统计信息
  */
class PrintStatThread extends Thread {

    private static void display_replay_statistics(ReplayStatistics rs) {
       System.out.println("Client Statistics: ");
       System.out.println("FailedReplayCount="+rs.getFailedReplayCount());
       System.out.println("ReplayDisablingCount="+rs.getReplayDisablingCount());
       System.out.println("SuccessfulReplayCount="+rs.getSuccessfulReplayCount());
       System.out.println("TotalCalls="+rs.getTotalCalls());
       System.out.println("TotalCallsAffectedByOutages="+rs.getTotalCallsAffectedByOutages()); 
       System.out.println("TotalCallsAffectedByOutagesDuringReplay="+ rs.getTotalCallsAffectedByOutagesDuringReplay());
       System.out.println("TotalCallsTriggeringReplay="+rs.getTotalCallsTriggeringReplay());
       System.out.println("TotalCompletedRequests="+rs.getTotalCompletedRequests());
       System.out.println("TotalProtectedCalls="+rs.getTotalProtectedCalls());
       System.out.println("TotalReplayAttempts="+rs.getTotalReplayAttempts());
       System.out.println("TotalRequests="+rs.getTotalRequests());
       System.out.println("Protected Percentage="+((int)((double)rs.getTotalProtectedCalls()/(double)rs.getTotalCalls()*100)));
    }

  public void run() {
    while (true) {
      try {
        Thread.sleep(Test.DELAY_BETWEEN_PRINTING_STATS);
        long operationsCompleted = 0;
        long timeSpentOnDb = 0;
        synchronized (Test.statsLock) {
          operationsCompleted = Test.operationsCompleted;
          timeSpentOnDb = Test.timeSpentOnDb;
          Test.operationsCompleted = 0;
          Test.timeSpentOnDb = 0;
        }
        long avgDBResponseTime = (operationsCompleted==0)?0:timeSpentOnDb/operationsCompleted;
        long ucpBorrowedConnectionCount = UniversalConnectionPoolManagerImpl
                .getUniversalConnectionPoolManager()
                .getConnectionPool(Test.UCP_POOL_NAME)
                .getStatistics()
                .getBorrowedConnectionsCount();
        long ucpPendingRequests = UniversalConnectionPoolManagerImpl
                .getUniversalConnectionPoolManager()
                .getConnectionPool(Test.UCP_POOL_NAME)
                .getStatistics().getPendingRequestsCount();
        long ucpWaitTime = UniversalConnectionPoolManagerImpl
            .getUniversalConnectionPoolManager()
            .getConnectionPool(Test.UCP_POOL_NAME)
            .getStatistics().getAverageConnectionWaitTime();
        long totalBorrowed = UniversalConnectionPoolManagerImpl
            .getUniversalConnectionPoolManager()
            .getConnectionPool(Test.UCP_POOL_NAME)
            .getStatistics().getCumulativeConnectionBorrowedCount();

        System.out.print(ucpBorrowedConnectionCount + " borrowed, "+ucpPendingRequests+" pending, "+ucpWaitTime+"ms getConnection wait, TotalBorrowed " + totalBorrowed);
        
        // Print the average response time:
        if(avgDBResponseTime > 0) {
          System.out.print(", avg response time from db " + avgDBResponseTime + "ms");
        }
        System.out.print("\n");
        
      } catch (Exception ea) {}
    }
  }
}
   
4. Ant编连:

    build.xml:<project name="" default="all" basedir=".">


<project name="" default="all" basedir="."
  <!-- set global properties for this build -->
  <property name="src" location="src"/>
  <property name="classes" location="classes"/>

  <target name="all" depends="clean,dist"/>

  <target name="init">
    <!-- Create the build directory structure used by compile -->
    <mkdir dir="${classes}"/>
  </target>

  <target name="compile" depends="init"
        description="compile the source " >
    <!-- Compile the java code from ${src} into ${classes} -->
    <!-- remove debug information: debug="true" -->
   <javac srcdir="${src}" debug="on" destdir="${classes}">
      <classpath>
        <pathelement path="${classpath}"/>
        <pathelement location="/u01/app/oracle/product/12.2.0/dbhome_1/ucp/lib/ucp.jar"/>
      </classpath>
      <classpath>
        <pathelement path="${classpath}"/>
        <pathelement location="/u01/app/oracle/product/12.2.0/dbhome_1/jdbc/lib/ojdbc8.jar"/>
      </classpath>
      <classpath>
        <pathelement path="${classpath}"/>
        <pathelement location="/u01/app/oracle/product/12.2.0/dbhome_1/opmn/lib/ons.jar"/>
      </classpath>
        <classpath>
        <pathelement path="${classpath}"/>
        <pathelement location="/u01/app/oracle/product/12.2.0/dbhome_1/jlib/oraclepki.jar"/>
      </classpath>
  <classpath>
        <pathelement path="${classpath}"/>
        <pathelement location="/u01/app/oracle/product/12.2.0/dbhome_1/jlib/osdt_core.jar"/>
      </classpath>
      <classpath>
        <pathelement path="${classpath}"/>
        <pathelement location="/u01/app/oracle/product/12.2.0/dbhome_1/jlib/osdt_cert.jar"/>
      </classpath>
      <classpath>
        <pathelement path="${classpath}"/>
        <pathelement location="/u01/app/oracle/product/12.2.0/dbhome_1/jlib/orai18n.jar"/>
      </classpath>   
    </javac>
  </target>

  <target name="dist" depends="compile"
        description="Jar the source files" >
    <!-- Create the distribution directory -->
    <delete dir="./lib/Test.jar"/>
    <jar destfile="./lib/Test.jar" basedir="${classes}" excludes="*.xml,*.cdi" manifest="MANIFEST.MF"/>
  </target>

  <target name="clean"
        description="clean up" >
    <!-- Delete the ${classes} and ${dist} directory trees -->
    <delete dir="${classes}"/>
    <delete dir="./lib/Test.jar"/>
  </target>
  
</project

  
    在build.xml那层目录,进行ant编连:

[oracle@station3 TestProcedure]$ ls

build.xml  classes  lib  MANIFEST.MF  run.sh  src

[oracle@station3 TestProcedure]$ ant

Buildfile: build.xml

 

clean:

   [delete] Deleting directory /home/oracle/JAVA/DBJar/TestProcedure/classes

 

init:

    [mkdir] Created dir: /home/oracle/JAVA/DBJar/TestProcedure/classes

 

compile:

    [javac] Compiling 1 source file to /home/oracle/JAVA/DBJar/TestProcedure/classes

    [javac] This version of java does not support the classic compiler; upgrading to modern

 

dist:

      [jar] Building jar: /home/oracle/JAVA/DBJar/TestProcedure/lib/Test.jar

 

all:

 

BUILD SUCCESSFUL

Total time: 3 seconds


5. 参数文件test.properties:


# Use replay datasource

# 测试过程必需使用UCP连接池并import oracle.ucp.admin.UniversalConnectionPoolManagerImpl来连接数据库。


datasource=oracle.jdbc.replay.OracleDataSourceImpl


# Set verbose mode

VERBOSE=FALSE


# database JDBC URL

url=jdbc:oracle:thin:@(DESCRIPTION=(CONNECT_TIMEOUT=90)(RETRY_COUNT=50)(RETRY_DELAY=3)(TRANSPORT_CONNECT_TIMEOUT=3)(ADDRESS_LIST=(ADDRESS=(PROTOCOL=tcp)(HOST=scan.cluster3.example.com)(PORT=1521)))(CONNECT_DATA=(SERVICE_NAME=serv3.example.com)))


# database username and password:

username=hr

password=oracle_4U


# Enable FAN

fastConnectionFailover=TRUE


#Disable connection tests

validateConnectionOnBorrow=TRUE


# number of connections in the UCP's pool:

ucp_pool_size=20


#Connection Wait Timeout for busy pool

connectionWaitTimeout=5


# number of active threads (this simulates concurrent load):

number_of_threads=5


# think time is how much time the threads will sleep before looping:

thread_think_time=50


   申请连接池数目为20个,并发执行5遍PL/SQL存储过程。

6. 运行

   写一个shell脚本,方便执行编连好的jar程序

#!/bin/bash



JAVA_HOME=${ORACLE_HOME}/jdk
${JAVA_HOME}/bin/java -Doracle.ucp.PlannedDrainingPeriod=30  -classpath ./lib/Test.jar:/u01/app/oracle/product/12.2.0/dbhome_1/ucp/lib/ucp.jar:/u01/app/oracle/product/12.2.0/dbhome_1/jdbc/lib/ojdbc8.jar:
/u01/app/oracle/product/12.2.0/dbhome_1/opmn/lib/ons.jar:/u01/app/oracle/product/12.2.0/dbhome_1/jlib/oraclepki.jar:/u01/app/oracle/product/12.2.0/dbhome_1/jlib/osdt_cert.jar:
/u01/app/oracle/product/12.2.0/dbhome_1/jlib/osdt_core.jar     Test.Test test.properties
   
    运行,并查询HR的连接池的terminal显示为unknown:

select inst_id , terminal  from gv$session where username='HR';


   INST_ID TERMINAL


---------- ------------------------------
  

     2 unknown

     2 unknown

     2 unknown

     2 unknown

     1 unknown

     2 unknown

     2 unknown

     2 unknown

     2 unknown

     2 unknown

     1 unknown

     2 unknown

     2 unknown


   运行时,看见5个并发线程,直到执行完毕。发现其申请的20个Oracle UCP连接分布在两个实例上并行执行。由于serv3服务配置成AC(-e TRANSACTION --commit_outcome true),所以此时运行serv3的任意一个实例出现问题,都不会影响事务的正确执行

[oracle@station3 TestProcedure]$ ls

build.xml  classes  lib  MANIFEST.MF  run.sh  src

[oracle@station3 TestProcedure]$ ./run.sh

5 borrowed, 0 pending, 70ms getConnection wait, TotalBorrowed 5


5 borrowed, 0 pending, 70ms getConnection wait, TotalBorrowed 5


5 borrowed, 0 pending, 70ms getConnection wait, TotalBorrowed 5


5 borrowed, 0 pending, 70ms getConnection wait, TotalBorrowed 5


5 borrowed, 0 pending, 70ms getConnection wait, TotalBorrowed 5


5 borrowed, 0 pending, 70ms getConnection wait, TotalBorrowed 5


5 borrowed, 0 pending, 70ms getConnection wait, TotalBorrowed 5


5 borrowed, 0 pending, 70ms getConnection wait, TotalBorrowed 5


5 borrowed, 0 pending, 70ms getConnection wait, TotalBorrowed 5


5 borrowed, 0 pending, 70ms getConnection wait, TotalBorrowed 5


5 borrowed, 0 pending, 70ms getConnection wait, TotalBorrowed 5


5 borrowed, 0 pending, 70ms getConnection wait, TotalBorrowed 5


5 borrowed, 0 pending, 70ms getConnection wait, TotalBorrowed 5


5 borrowed, 0 pending, 70ms getConnection wait, TotalBorrowed 5


5 borrowed, 0 pending, 70ms getConnection wait, TotalBorrowed 5


5 borrowed, 0 pending, 70ms getConnection wait, TotalBorrowed 5


5 borrowed, 0 pending, 70ms getConnection wait, TotalBorrowed 5


5 borrowed, 0 pending, 70ms getConnection wait, TotalBorrowed 5


4 borrowed, 0 pending, 70ms getConnection wait, TotalBorrowed 5


4 borrowed, 0 pending, 70ms getConnection wait, TotalBorrowed 5


4 borrowed, 0 pending, 70ms getConnection wait, TotalBorrowed 5


4 borrowed, 0 pending, 70ms getConnection wait, TotalBorrowed 5


4 borrowed, 0 pending, 70ms getConnection wait, TotalBorrowed 5


4 borrowed, 0 pending, 70ms getConnection wait, TotalBorrowed 5


3 borrowed, 0 pending, 70ms getConnection wait, TotalBorrowed 5


2 borrowed, 0 pending, 70ms getConnection wait, TotalBorrowed 5


2 borrowed, 0 pending, 70ms getConnection wait, TotalBorrowed 5


0 borrowed, 0 pending, 70ms getConnection wait, TotalBorrowed 5


0 borrowed, 0 pending, 70ms getConnection wait, TotalBorrowed 5


0 borrowed, 0 pending, 70ms getConnection wait, TotalBorrowed 5


0 borrowed, 0 pending, 70ms getConnection wait, TotalBorrowed 5


0 borrowed, 0 pending, 70ms getConnection wait, TotalBorrowed 5


0 borrowed, 0 pending, 70ms getConnection wait, TotalBorrowed 5


    PL/SQ存储过程运行一次插入50万行,所以结果为5*50万行=250万行

SQL> select  count(*) from test1;

        count(*)
----------------
       2500000
   


路过

雷人

握手

鲜花

鸡蛋

QQ|手机版|Bo's Oracle Station   

GMT+8, 2023-3-26 09:35 , Processed in 0.034294 second(s), 21 queries .

返回顶部