Oracleのメッセージキューイング機能をJavaで使ってみる
みなさんは、「メッセージキューイング」というジャンルのソフトウェアをご存知でしょうか。IBMのMQシリーズなどがその代表格ですよね。システム間でデータを非同期にやり取りする際に、「キュー」という概念を導入し、送受信が確実に行われることを保証する、案件によっては非常に有用な機能です。
私にとっては非常に悔しいことに、メッセージキューイングを実現するためのソフトウェアって、どれも非常に高価なんですよね。もちろんコストに見合った機能を持っているからなのですが、それでも予算の絶対額が限られているプロジェクトではなかなか手が出せません。ですが! なんと有り難いことにOracleにはこのメッセージキューイング機能が、「アドバンストキュー」という名前で用意されています(Oracle8以降)。
それでは、Oracleのテーブルなどを利用してデータの送受信を行なう方法と、アドバンストキューイングを使う方法との違いは何なのでしょうか(通常のメッセージキューイングソフトを使える人はそちらを使うでしょうからあえて比較しません^^)。
メリットとしては、
- コーディングの量が減少する
- キューの基本的な機能を実装しているため、他のミドルウェアへの以降も行ないやすい
といったものがあるでしょうし、デメリットとしては、
- 実績が少ない。実際、使っている事例を聞くことはほとんど無い。
というのが最大のものでしょう。ともあれ、使ってみることはそれほど難しくないので、まずは試してみるというのも悪い選択ではないと思います。
ラベルにテキストを取得する方法Oracle側の準備(セットアップ・権限の定義)
まずはOracle側の準備となるわけですが、その前に必要なオプションが導入されている必要があります。そうでないと、
エラー行: 1: エラーが発生しました。 ORA-00439: 機能は使用できません: Database queuing ORA-06512: "SYS.DBMS_AQADM_SYS", 行 2012 ORA-06512: "SYS.DBMS_AQADM", 行 55 ORA-06512: 行 2
といった悲しいエラーが表示されてしまいます。
必要になるオプションは(バージョンにもよりますが)次の2つのはずです。追加インストールしておきましょう。
それから、メッセージキューイングを利用するユーザに対し、DBMS_AQとDBMS_AQADMのEXECUTE権限を与えておくことも忘れずに。
キューの定義
Oracleのメッセージキューイングでは、送受信するメッセージはオブジェクトとして定義します。まずはこのオブジェクトを定義してやらなければなりません。これには CREATE TYPE 文を利用します。
ここでは、「名前(NAME)」「住所(ADDRESS)」「電話番号(PHONENO)」の3つの属性をもつメッセージを送受信するものとし、それをobj_personという名前で定義してやることにしましょう。
CREATE OR REPLACE TYPE obj_person AS OBJECT( NAME VARCHAR(20), ADDRESS VARCHAR(40), PHONENO VARCHAR(15) ); /
続いて行なうことは、このメッセージを格納するための表を定義することです。但し、この表はユーザが直接アクセスすることはありません。キューへの投入・読み出しは後述するパッケージを利用することになります。
BEGIN DBMS_AQADM.CREATE_QUEUE_TABLE( queue_table => 'qt_person', queue_payload_type => 'obj_person' ); END; /
そして、このキューテーブルを利用してキューを定義します。
URLはPHPなしでは存在しているかどうかを確認する方法BEGIN DBMS_AQADM.CREATE_QUEUE( queue_name => 'q_person', queue_table => 'qt_person', comment => 'QUEUE FOR PERSONAL DATA' ); END; /
もう一つ! キューは開始することで利用可能になります。
BEGIN DBMS_AQADM.START_QUEUE( queue_name => 'q_person' ); END; /
お待たせしました。これでキューが利用可能になりました。
キューを利用するためのプロシージャの定義
既にキューは利用可能になっているので、DBMS_AQ.ENQUEUE/DEQUEUEを用いてメッセージを送受信することは可能です。ただ、メッセージはオブジェクトとになっていますし、読み書きのためには複雑なパラメータを設定してやることも必要です。ここは、プロシージャを定義してそこからキューの読み書きを行なうこととしましょう。
まずはメッセージをキューに投入するためのプロシージャです。3つの引数を文字列として取り、それを用いてインスタンスを生成、エンキューします。
CREATE OR REPLACE PROCEDURE ENQUEUE_PERSON ( NAME IN VARCHAR2, ADDRESS IN VARCHAR2, PHONENO IN VARCHAR2 ) IS enqueue_opt sys.dbms_aq.enqueue_options_t; message_opt sys.dbms_aq.message_properties_t; msg_handle RAW(16); msg_person obj_person; BEGIN msg_person := obj_person(NAME, ADDRESS, PHONENO); enqueue_opt.visibility := dbms_aq.on_commit; message_opt.priority := 15; message_opt.delay := dbms_aq.no_delay; message_opt.expiration := 1*24*60*60; message_opt.exception_queue := 'EXCEPT_ORD'; dbms_aq.enqueue( queue_name => 'q_person', enqueue_options => enqueue_opt, message_properties => message_opt, payload => msg_person, msgid => msg_handle ); END; /
次はメッセージをキューから取り出すためのプロシージャです。3つあるはずの戻り値は、PL/SQLのOUT引数を用いて返しています
利用可能なIPアドレスを検索する方法CREATE OR REPLACE PROCEDURE DEQUEUE_PERSON ( NAME OUT VARCHAR2, ADDRESS OUT VARCHAR2, PHONENO OUT VARCHAR2 ) IS dequeue_opt sys.dbms_aq.dequeue_options_t; message_opt sys.dbms_aq.message_properties_t; msg_handle RAW(16); msg_person obj_person; BEGIN dequeue_opt.visibility := dbms_aq.on_commit; dequeue_opt.dequeue_mode := dbms_aq.remove; dequeue_opt.wait := dbms_aq.forever; dbms_aq.dequeue( queue_name => 'q_person', dequeue_options => dequeue_opt, message_properties => message_opt, payload => msg_person, msgid => msg_handle ); NAME := msg_person.NAME; ADDRESS := msg_person.ADDRESS; PHONENO := msg_person.PHONENO; END; /
Javaからの利用
さて、あとはJavaからの利用のみです。とはいえ、すでにストアドプロシジャを作ってしまったので、後は普通のCallableStatementの呼び出しと全く同じになってしまいます。
まずはデータを投入する方。commit/rollbackによって、キューへの投入が業務トランザクションの一部として正しく制御できることに注目してください。
Connection con = ....; try { // ... // データをキューに投入する前処理 // ... CallableStatement cstmt = con.prepareCall("{call ENQUEUE_PERSON(?,?,?)}"); cstmt.setString(1, "Taro Oracle"); cstmt.setString(2, "Japan"); cstmt.setString(3, "123-4567"); cstmt.execute(); // ... // データをキューに投入する後処理 // ... // commitの時点で、データがキューに保存される con.commit(); } catch (Exception e) { // rollbackを行なうと、キューへの投入もキャンセルされる con.rollback(); } finally { con.close(); }
続いてはデータを取り出す方。こちらも、取り出しと後続処理が何かの理由で失敗した場合には、rollbackによって取り出したこと自体をキャンセルすることが出来ます。これにより、次回このメソッドを実行した時に、処理中だったメッセージを再度処理の対象にすることが出来ます。
Connection con = ....; try { // ... // データをキューから取り出す前処理 // ... CallableStatement cstmt = con.prepareCall("{call DEQUEUE_PERSON(?,?,?)}"); cstmt.registerOutParameter(1, Types.VARCHAR); cstmt.registerOutParameter(2, Types.VARCHAR); cstmt.registerOutParameter(3, Types.VARCHAR); cstmt.execute(); String wrkname = cstmt.getString(1); String wrkaddress = cstmt.getString(2); String wrkphone = cstmt.getString(3); // ... // データをキューから取り出す後処理 // ... // commitの時点で、データがキューから削除される con.commit(); } catch (Exception e) { // rollbackを行なうと、キューからの取り出しが「なかったこと」になる con.rollback(); } finally { con.
0 コメント:
コメントを投稿