Thread: [Orclib-users] subscriptions/callback for Oracle AQ
Open source C and C++ library for accessing Oracle Databases
Brought to you by:
vince_del_paris
From: Petr V. <pe...@ya...> - 2012-12-03 19:12:47
|
-----BEGIN PGP SIGNED MESSAGE----- Hash: SHA1 hi Vincent and gang, I'd like to introduce you an idea about an addon to DCN or CQN notifications/callback ocilib framework - notifications and callback API for Advanced Queueing (AQ). You can see it in this temporary git repository: https://github.com/pvanek/ocilib this is based on the latest ocilib 3.10.0 with these changes: - - an initial support for AQ callback: https://github.com/pvanek/ocilib/commit/0b44a5247829ef52c61694cfc2187396f88ef62f Note: this is not a final code, I'd like to know your opinion first. There is no doc etc. * backward compatibility stays as is - there are no changes in existing public API * new function OCI_SubscriptionRegisterAQ, similar to OCI_SubscriptionRegister but it uses AQ's OCI_SUBSCR_NAMESPACE_AQ subscription context (instead of OCI_SUBSCR_NAMESPACE_DBCHANGE) * new functions OCI_EventGetAQQueueName and OCI_EventGetAQConsumerName to get AQ related attributes from OCI_Event OCI_Event structure has been expanded to hold AQ attributes. Here I'm not sure if it's a good idea to expanc existing OCI_Event or create new AQ-only similar struct and related functions. I'm open to suggestions here. a small example: #include "ocilib.h" #ifdef _WINDOWS #define sleep(x) Sleep(x*1000) #else #include <unistd.h> #endif #define wait_for_events() sleep(15) void event_handler(OCI_Event *event); void error_handler(OCI_Error *err); int main(void) { OCI_Connection *con; OCI_Subscription *sub; OCI_Statement *st; printf("=> Initializing OCILIB in event mode...\n\n"); if (!OCI_Initialize(error_handler, NULL, OCI_ENV_EVENTS)) return EXIT_FAILURE; con = OCI_ConnectionCreate("stimpy", "omquser", "omquser", OCI_SESSION_DEFAULT); sub = OCI_SubscriptionRegisterAQ(con, "omquser.testaq", event_handler, 5468, 0); wait_for_events(); OCI_ConnectionFree(con); OCI_SubscriptionUnregister(sub); OCI_Cleanup(); return EXIT_SUCCESS; } void error_handler(OCI_Error *err) { int err_type = OCI_ErrorGetType(err); const char *err_msg = OCI_ErrorGetString(err); printf("** %s - %s\n", err_type == OCI_ERR_WARNING ? "Warning" : "Error", err_msg); } void event_handler(OCI_Event *event) { unsigned int type = OCI_EventGetType(event); unsigned int op = OCI_EventGetOperation(event); OCI_Subscription *sub = OCI_EventGetSubscription(event); printf("** Notification : %s\n\n", OCI_SubscriptionGetName(sub)); printf("... Queue Name : %s\n", OCI_EventGetAQQueueName(event)); printf(". Consumer Name : %s\n", OCI_EventGetAQConsumerName(event)); printf("\n"); } - - pkg-config feature added (ocilib.pc): https://github.com/pvanek/ocilib/commit/6ba08629bb3be6229b19bae577af241fade2202c This feature is ready to be merged. It allows easy use of ocilib in cmake, autotools, and similar build tools cmake example: find_package(PkgConfig) pkg_check_modules(OCILIB REQUIRED ocilib>=3.10) message(STATUS "includes: ${OCILIB_INCLUDE_DIRS}") message(STATUS " libs: ${OCILIB_LIBRARIES}") message(STATUS "lib dirs: ${OCILIB_LIBRARY_DIRS}") looking forward your comments, petr -----BEGIN PGP SIGNATURE----- Version: GnuPG v1.4.12 (Darwin) Comment: GPGTools - http://gpgtools.org Comment: Using GnuPG with undefined - http://www.enigmail.net/ iQEcBAEBAgAGBQJQvPU0AAoJEC8yRjM4uE2t7mUIAIKZw2C6kbylNE3ndmTSpl6t xEjVpKxnrVtc65G3bXhk19b2HhiseMXjNqMLW80+1Q5IvOHF4rpJew8m5vS29Lq/ HOOhKfSL1xa9N/MQ9oorMwi+eZDsK1MEJlh8e9okdSnrEoYkX9BRX7P0u0XyiPk7 3HiatyEkPKS1zh2PKqfQyeXKxweQloceDQbe8Q4RtseeG9yB3dylq2qjtsB/17MG rqUNm8Dl8Ws1dxTyjfamjtSUxzRPVYIW0QjcH+bQlvlBxnBOgStQqBHT/34v5w1x /LoPhXCUa7zlfXvRVP6F9JSKWQ+clm6my7rrL+2gLL4Z3VeMWqfg1IR+lkvQJaM= =I1ma -----END PGP SIGNATURE----- |
From: vincent r. <vin...@ya...> - 2012-12-04 12:46:21
|
Hi Petr, I already have in my plans to add support to CQN.. OCILIB currently support CDN introduced in 10g. CQN is an extension of CDN and has been introduced in 11g. when I added support for CDN, I saw that subscription could be done against AQ. when I looked at it, i thought that implementing notifications just for being notified of enqueuing was not a priority. I may have misunderstood its pertinence. If I have well understood, you want to add event notification on AQ for being informed that a user has received a message into queue ? Kind of monitoring ? Or is there more information you want to get ? More information that the one you've exposed in your code are available Regards, vincent On Mon, Dec 3, 2012 at 7:53 PM, Petr Vaněk <pe...@ya...> wrote: > -----BEGIN PGP SIGNED MESSAGE----- > Hash: SHA1 > > hi Vincent and gang, > > I'd like to introduce you an idea about an addon to DCN or CQN > notifications/callback ocilib framework - notifications and callback > API for Advanced Queueing (AQ). > > You can see it in this temporary git repository: > > https://github.com/pvanek/ocilib > > this is based on the latest ocilib 3.10.0 with these changes: > > > - - an initial support for AQ callback: > > https://github.com/pvanek/ocilib/commit/0b44a5247829ef52c61694cfc2187396f88ef62f > > Note: this is not a final code, I'd like to know your opinion first. > There is no doc etc. > > * backward compatibility stays as is - there are no changes in > existing public API > * new function OCI_SubscriptionRegisterAQ, similar to > OCI_SubscriptionRegister but it uses AQ's OCI_SUBSCR_NAMESPACE_AQ > subscription context (instead of OCI_SUBSCR_NAMESPACE_DBCHANGE) > * new functions OCI_EventGetAQQueueName and > OCI_EventGetAQConsumerName to get AQ related attributes from OCI_Event > OCI_Event structure has been expanded to hold AQ attributes. > > Here I'm not sure if it's a good idea to expanc existing OCI_Event or > create new AQ-only similar struct and related functions. I'm open to > suggestions here. > > a small example: > > #include "ocilib.h" > > #ifdef _WINDOWS > #define sleep(x) Sleep(x*1000) > #else > #include <unistd.h> > #endif > > #define wait_for_events() sleep(15) > > void event_handler(OCI_Event *event); > void error_handler(OCI_Error *err); > > int main(void) > { > OCI_Connection *con; > OCI_Subscription *sub; > OCI_Statement *st; > > printf("=> Initializing OCILIB in event mode...\n\n"); > > if (!OCI_Initialize(error_handler, NULL, OCI_ENV_EVENTS)) > return EXIT_FAILURE; > > con = OCI_ConnectionCreate("stimpy", "omquser", "omquser", > OCI_SESSION_DEFAULT); > > sub = OCI_SubscriptionRegisterAQ(con, "omquser.testaq", > event_handler, 5468, 0); > > wait_for_events(); > > OCI_ConnectionFree(con); > OCI_SubscriptionUnregister(sub); > OCI_Cleanup(); > return EXIT_SUCCESS; > } > > void error_handler(OCI_Error *err) > { > int err_type = OCI_ErrorGetType(err); > const char *err_msg = OCI_ErrorGetString(err); > > printf("** %s - %s\n", err_type == OCI_ERR_WARNING ? "Warning" : > "Error", err_msg); > } > > void event_handler(OCI_Event *event) > { > unsigned int type = OCI_EventGetType(event); > unsigned int op = OCI_EventGetOperation(event); > OCI_Subscription *sub = OCI_EventGetSubscription(event); > > printf("** Notification : %s\n\n", OCI_SubscriptionGetName(sub)); > printf("... Queue Name : %s\n", > OCI_EventGetAQQueueName(event)); > printf(". Consumer Name : %s\n", > OCI_EventGetAQConsumerName(event)); > > printf("\n"); > } > > > > > > - - pkg-config feature added (ocilib.pc): > > https://github.com/pvanek/ocilib/commit/6ba08629bb3be6229b19bae577af241fade2202c > This feature is ready to be merged. It allows easy use of ocilib in > cmake, autotools, and similar build tools > > cmake example: > find_package(PkgConfig) > pkg_check_modules(OCILIB REQUIRED ocilib>=3.10) > message(STATUS "includes: ${OCILIB_INCLUDE_DIRS}") > message(STATUS " libs: ${OCILIB_LIBRARIES}") > message(STATUS "lib dirs: ${OCILIB_LIBRARY_DIRS}") > > > > looking forward your comments, > petr > -----BEGIN PGP SIGNATURE----- > Version: GnuPG v1.4.12 (Darwin) > Comment: GPGTools - http://gpgtools.org > Comment: Using GnuPG with undefined - http://www.enigmail.net/ > > iQEcBAEBAgAGBQJQvPU0AAoJEC8yRjM4uE2t7mUIAIKZw2C6kbylNE3ndmTSpl6t > xEjVpKxnrVtc65G3bXhk19b2HhiseMXjNqMLW80+1Q5IvOHF4rpJew8m5vS29Lq/ > HOOhKfSL1xa9N/MQ9oorMwi+eZDsK1MEJlh8e9okdSnrEoYkX9BRX7P0u0XyiPk7 > 3HiatyEkPKS1zh2PKqfQyeXKxweQloceDQbe8Q4RtseeG9yB3dylq2qjtsB/17MG > rqUNm8Dl8Ws1dxTyjfamjtSUxzRPVYIW0QjcH+bQlvlBxnBOgStQqBHT/34v5w1x > /LoPhXCUa7zlfXvRVP6F9JSKWQ+clm6my7rrL+2gLL4Z3VeMWqfg1IR+lkvQJaM= > =I1ma > -----END PGP SIGNATURE----- > > > ------------------------------------------------------------------------------ > Keep yourself connected to Go Parallel: > BUILD Helping you discover the best ways to construct your parallel > projects. > http://goparallel.sourceforge.net > _______________________________________________ > Orclib-users mailing list > Orc...@li... > https://lists.sourceforge.net/lists/listinfo/orclib-users > -- Vincent Rogier |
From: Petr V. <pe...@ya...> - 2012-12-04 13:00:38
|
-----BEGIN PGP SIGNED MESSAGE----- Hash: SHA1 Vincent, the basic usecase is to have resource friendly way how to get AQ messages from the server. The main purbose is to inform client that data are ready to be dequeued. Lets take an example: there is a PL/SQL app doing various business logic resulting in unpredictable timing of posting messages to queues (some kind of notifications to an external app). Currently the external app have to perform polling against the DB/queue to get these messages. In the case you have more queues you need to poll in more loops etc. When there will be a subscription to AQ available we will be able to just wait for an event = to work fully asynchronously (external app is already doing some stuff etc.). So basically - yes, it's something like "monitoring" but it could allow stuff like this (pseudocode): installAQSubscription(...); ... do something ... ... and when DB writes a message to the queue, notify arrives... my_aq_handler(...) { if (queue_name == 'user.foo') handleMessagesFromFoo(); ... } real dequeueing will be done by already existing AQ ocilib implementation. thanks, petr On 12/4/12 1:45 PM, vincent rogier wrote: > Hi Petr, > > I already have in my plans to add support to CQN.. > > OCILIB currently support CDN introduced in 10g. > CQN is an extension of CDN and has been introduced in 11g. > > when I added support for CDN, I saw that subscription could be done against AQ. > when I looked at it, i thought that implementing notifications just for being notified of enqueuing was not a priority. > I may have misunderstood its pertinence. > > If I have well understood, you want to add event notification on AQ for being informed that a user has received a message into queue ? Kind of monitoring ? > Or is there more information you want to get ? More information that the one you've exposed in your code are available > > Regards, > > vincent > > > > > On Mon, Dec 3, 2012 at 7:53 PM, Petr Vaněk <pe...@ya... <mailto:pe...@ya...>> wrote: > > hi Vincent and gang, > > I'd like to introduce you an idea about an addon to DCN or CQN > notifications/callback ocilib framework - notifications and callback > API for Advanced Queueing (AQ). > > You can see it in this temporary git repository: > > https://github.com/pvanek/ocilib > > this is based on the latest ocilib 3.10.0 with these changes: > > > - an initial support for AQ callback: > https://github.com/pvanek/ocilib/commit/0b44a5247829ef52c61694cfc2187396f88ef62f > > Note: this is not a final code, I'd like to know your opinion first. > There is no doc etc. > > * backward compatibility stays as is - there are no changes in > existing public API > * new function OCI_SubscriptionRegisterAQ, similar to > OCI_SubscriptionRegister but it uses AQ's OCI_SUBSCR_NAMESPACE_AQ > subscription context (instead of OCI_SUBSCR_NAMESPACE_DBCHANGE) > * new functions OCI_EventGetAQQueueName and > OCI_EventGetAQConsumerName to get AQ related attributes from OCI_Event > OCI_Event structure has been expanded to hold AQ attributes. > > Here I'm not sure if it's a good idea to expanc existing OCI_Event or > create new AQ-only similar struct and related functions. I'm open to > suggestions here. > > a small example: > > #include "ocilib.h" > > #ifdef _WINDOWS > #define sleep(x) Sleep(x*1000) > #else > #include <unistd.h> > #endif > > #define wait_for_events() sleep(15) > > void event_handler(OCI_Event *event); > void error_handler(OCI_Error *err); > > int main(void) > { > OCI_Connection *con; > OCI_Subscription *sub; > OCI_Statement *st; > > printf("=> Initializing OCILIB in event mode...\n\n"); > > if (!OCI_Initialize(error_handler, NULL, OCI_ENV_EVENTS)) > return EXIT_FAILURE; > > con = OCI_ConnectionCreate("stimpy", "omquser", "omquser", > OCI_SESSION_DEFAULT); > > sub = OCI_SubscriptionRegisterAQ(con, "omquser.testaq", > event_handler, 5468, 0); > > wait_for_events(); > > OCI_ConnectionFree(con); > OCI_SubscriptionUnregister(sub); > OCI_Cleanup(); > return EXIT_SUCCESS; > } > > void error_handler(OCI_Error *err) > { > int err_type = OCI_ErrorGetType(err); > const char *err_msg = OCI_ErrorGetString(err); > > printf("** %s - %s\n", err_type == OCI_ERR_WARNING ? "Warning" : > "Error", err_msg); > } > > void event_handler(OCI_Event *event) > { > unsigned int type = OCI_EventGetType(event); > unsigned int op = OCI_EventGetOperation(event); > OCI_Subscription *sub = OCI_EventGetSubscription(event); > > printf("** Notification : %s\n\n", OCI_SubscriptionGetName(sub)); > printf("... Queue Name : %s\n", > OCI_EventGetAQQueueName(event)); > printf(". Consumer Name : %s\n", > OCI_EventGetAQConsumerName(event)); > > printf("\n"); > } > > > > > > - pkg-config feature added (ocilib.pc): > https://github.com/pvanek/ocilib/commit/6ba08629bb3be6229b19bae577af241fade2202c > This feature is ready to be merged. It allows easy use of ocilib in > cmake, autotools, and similar build tools > > cmake example: > find_package(PkgConfig) > pkg_check_modules(OCILIB REQUIRED ocilib>=3.10) > message(STATUS "includes: ${OCILIB_INCLUDE_DIRS}") > message(STATUS " libs: ${OCILIB_LIBRARIES}") > message(STATUS "lib dirs: ${OCILIB_LIBRARY_DIRS}") > > > > looking forward your comments, > petr > > ------------------------------------------------------------------------------ > Keep yourself connected to Go Parallel: > BUILD Helping you discover the best ways to construct your parallel projects. > http://goparallel.sourceforge.net > _______________________________________________ > Orclib-users mailing list > Orc...@li... <mailto:Orc...@li...> > https://lists.sourceforge.net/lists/listinfo/orclib-users > > > > > -- > Vincent Rogier -----BEGIN PGP SIGNATURE----- Version: GnuPG v1.4.12 (Darwin) Comment: GPGTools - http://gpgtools.org Comment: Using GnuPG with undefined - http://www.enigmail.net/ iQEcBAEBAgAGBQJQvfPlAAoJEC8yRjM4uE2tdYgH/RMXV8tA/muGigVVN38Fhxhu ltj/CvPmYfUDAK+MAbRStmpumz4uAvYPeawCJEWhHHUCBsthgeRwdXUUs0H+OMg7 aVcvU/KkCIoYBD1navDEYY1+uVldIciUxf36FfHnk7fwQtNXlZi3VRr6UPRkCjGn XK/KiR0udfe2hrB+pVSh0ImXFD08kj2JyQSV6P+ZF69Kd/2wFCJdAOAgWDpWADMX SuYrRKpidpmjqBcLNT0PbyMDcoXihzho/MpAMfr24nhLCBPJAT5nJP2TA6OfMVqC nAKmmExDDFk2bGVcFrF+R18Bac2l6L636nR2OMy4F8A+ZE6InPK8HRbRHKccEVM= =n04O -----END PGP SIGNATURE----- |
From: vincent r. <vin...@ya...> - 2012-12-04 14:34:41
|
Hi Petr, Got the logic :) Thus, I would suggest another implementation : * Add flag OCI_CNT_ENQUEUE to OCI_SubscriptionRegister() documented as exclusive (cannot be combined with other ones) * Add value OCI_ENT_ENQUEUE to OCI_EventGetType() * Add Method OCI_EventGetDequeue() that returns an OCI_Dequeue object * Add method OCI_SubscriptionAddDequeue(OCI_Subscription *sub, OCI_Dequeue *dequeue) that will add an existing dequeues objects to an internal list of dequeue objects * When the internal callback is fired, it looks up the matching OCI_Dequeue object (queue name and current consumer or consumer list) and if found fires the event with the Dequeue property set Thus you can have directly the Dequeue object from the event :) What do you think ? On Tue, Dec 4, 2012 at 2:00 PM, Petr Vaněk <pe...@ya...> wrote: > real dequeueing will -- Vincent Rogier |
From: Petr V. <pe...@ya...> - 2012-12-04 19:57:36
|
-----BEGIN PGP SIGNED MESSAGE----- Hash: SHA1 it sounds really great (except some minor things). OCI_CNT_ENQUEUE is quite cryptic to me. I use OCI_CNT_AQMESSAGES in my code as it sounds more descriptive to me (if course it's just a minor stuff). The proposed OCI_EventGetDequeue() - I assume it should use only previously registered OCI_Dequeue by OCI_SubscriptionAddDequeue, right? I fully agree that contain OCI_Dequeue in OCI_event is much better than diplicated "queue name" and "consumer" etc. attributes. So what about implementation, please? Should I do it or do you want to do it yourself? (remember that you know OCI much better than me probably ;)). In the case I should do it: can I access the latest code? Or should I use the latest release? thanks, petr On 12/4/12 3:34 PM, vincent rogier wrote: > Hi Petr, > > > Got the logic :) > > Thus, I would suggest another implementation : > > > * Add flag OCI_CNT_ENQUEUE to OCI_SubscriptionRegister() documented as exclusive (cannot be combined with other ones) > * Add value OCI_ENT_ENQUEUE to OCI_EventGetType() > * Add Method OCI_EventGetDequeue() that returns an OCI_Dequeue object > * Add method OCI_SubscriptionAddDequeue(OCI_Subscription *sub, OCI_Dequeue *dequeue) that will add an existing dequeues objects to an internal list of dequeue objects > * When the internal callback is fired, it looks up the matching OCI_Dequeue object (queue name and current consumer or consumer list) and if found fires the event with the Dequeue property set > > Thus you can have directly the Dequeue object from the event :) > > > What do you think ? > > > On Tue, Dec 4, 2012 at 2:00 PM, Petr Vaněk <pe...@ya... <mailto:pe...@ya...>> wrote: > > real dequeueing will > > > > > -- > Vincent Rogier -----BEGIN PGP SIGNATURE----- Version: GnuPG v1.4.12 (Darwin) Comment: GPGTools - http://gpgtools.org Comment: Using GnuPG with undefined - http://www.enigmail.net/ iQEcBAEBAgAGBQJQvlWjAAoJEC8yRjM4uE2tAF0H/AxHvym+vSpo037JGF4Qqwqk eESHU0nKGp/z/oAmubYO/yUfnEI6FtsRuCUhMHJY0Xu2E4Da5DpZw6Yci3ldqh+C 68PsI47QzRATzTwxY3o0bIlRaXPZAIMl5qvR3g/EvOkDOmrDxs655h+CAi3ugNI6 4PUXvNproFWUXm/pCBNFMO0SViVgOJTZz7qB7Sr5323C1mtQ/vWP6AZvYs298DTd +rcZSrTwdR/S+KcH3tF5A3FQsQv8qU1xEswzNq0xz9gTZbQshoq5rBqS1zPk/F7a LB2nDvYeqW25H9cOKSHvASctjDvK+rsAgd0dl+FaEdbuU/D4xAt4IVkcZkcDNP8= =3s+g -----END PGP SIGNATURE----- |
From: vincent r. <vin...@ya...> - 2012-12-05 08:07:37
|
Hi, About coding, this is a secondary easy task that required only just few lines of code. I'm more focused on how to integrate to OCILIB. Is the following mock-up all right for you ? #include "ocilib.h" #ifdef _WINDOWS #define sleep(x) Sleep(x*1000) #endif #define PORT 5468 #define TIMEOUT 0 void event_handler(OCI_Event *event); void error_handler(OCI_Error *err); int main(void) { OCI_Connection *con; OCI_Subscription *sub; OCI_Dequeue *deq1; OCI_Dequeue *deq2; OCI_Initialize(NULL, NULL, OCI_ENV_DEFAULT | OCI_ENV_EVENTS | OCI_ENV_THREADED); con = OCI_ConnectionCreate("db", "usr", "pwd", OCI_SESSION_DEFAULT); sub = OCI_SubscriptionRegister(con, "SUB-MSG", OCI_CNT_MESSAGE, event_handler, PORT, TIMEOUT); inf = OCI_TypeInfoGet(con, "MY_MESSAGE", OCI_TIF_TYPE); deq1 = OCI_DequeueCreate(inf, "MY_QUEUE"); deq2 = OCI_DequeueCreate(inf, "MY_QUEUE"); OCI_DequeueSetConsumer(deq1, "CONSUMER1"); OCI_DequeueSetConsumer(deq2, "CONSUMER2"); OCI_SubscriptionAddDequeue(sub, deq1); OCI_SubscriptionAddDequeue(sub, deq2); sleep(MAX_INT); OCI_DequeueFree(deq1); OCI_DequeueFree(deq2); OCI_SubscriptionUnregister(sub); OCI_ConnectionFree(con); OCI_Cleanup(); return EXIT_SUCCESS; } void error_handler(OCI_Error *err) { printf("*** error : %s\n", OCI_ErrorGetString(err)); } void event_handler(OCI_Event *event) { OCI_Dequeue *deq = OCI_EventGetDequeue(event); OCI_Msg *msg = OCI_DequeueGet(deq); /* process payload */ } On Tue, Dec 4, 2012 at 8:57 PM, Petr Vaněk <pe...@ya...> wrote: > OCI_SubscriptionAddDequeue -- Vincent Rogier |
From: Petr V. <pe...@ya...> - 2012-12-05 08:11:00
|
-----BEGIN PGP SIGNED MESSAGE----- Hash: SHA1 hi Vincent, yes, it makes perfect sense to me. I'm looking forward to it! p. On 12/5/12 9:07 AM, vincent rogier wrote: > Hi, > > > About coding, this is a secondary easy task that required only just few lines of code. > I'm more focused on how to integrate to OCILIB. > Is the following mock-up all right for you ? > > > #include "ocilib.h" > > #ifdef _WINDOWS > #define sleep(x) Sleep(x*1000) > #endif > > #define PORT 5468 > #define TIMEOUT 0 > > void event_handler(OCI_Event *event); > void error_handler(OCI_Error *err); > > int main(void) > { > OCI_Connection *con; > OCI_Subscription *sub; > OCI_Dequeue *deq1; > OCI_Dequeue *deq2; > > OCI_Initialize(NULL, NULL, OCI_ENV_DEFAULT | OCI_ENV_EVENTS | OCI_ENV_THREADED); > > con = OCI_ConnectionCreate("db", "usr", "pwd", OCI_SESSION_DEFAULT); > sub = OCI_SubscriptionRegister(con, "SUB-MSG", OCI_CNT_MESSAGE, event_handler, PORT, TIMEOUT); > inf = OCI_TypeInfoGet(con, "MY_MESSAGE", OCI_TIF_TYPE); > deq1 = OCI_DequeueCreate(inf, "MY_QUEUE"); > deq2 = OCI_DequeueCreate(inf, "MY_QUEUE"); > > OCI_DequeueSetConsumer(deq1, "CONSUMER1"); > OCI_DequeueSetConsumer(deq2, "CONSUMER2"); > > OCI_SubscriptionAddDequeue(sub, deq1); > OCI_SubscriptionAddDequeue(sub, deq2); > > sleep(MAX_INT); > > OCI_DequeueFree(deq1); > OCI_DequeueFree(deq2); > OCI_SubscriptionUnregister(sub); > OCI_ConnectionFree(con); > > OCI_Cleanup(); > > return EXIT_SUCCESS; > } > > void error_handler(OCI_Error *err) > { > printf("*** error : %s\n", OCI_ErrorGetString(err)); > } > > void event_handler(OCI_Event *event) > { > OCI_Dequeue *deq = OCI_EventGetDequeue(event); > OCI_Msg *msg = OCI_DequeueGet(deq); > > /* process payload */ > } > > > > On Tue, Dec 4, 2012 at 8:57 PM, Petr Vaněk <pe...@ya... <mailto:pe...@ya...>> wrote: > > OCI_SubscriptionAddDequeue > > > > > -- > Vincent Rogier -----BEGIN PGP SIGNATURE----- Version: GnuPG v1.4.12 (Darwin) Comment: GPGTools - http://gpgtools.org Comment: Using GnuPG with undefined - http://www.enigmail.net/ iQEcBAEBAgAGBQJQvwGAAAoJEC8yRjM4uE2t7MwH/iYJyM13uHjr7pEYewQJ+cLV 5iYQTyB6RrD5TCVDXyid9KB55AVHHtef4yr24HDkMTxAit0D+ayP3T51EqV7ojuf ncqMR895TJMleK01+NSy9W0jzevj81/K8Tia3QO6QdXJd7EPNHiBY0m2EYJaTPbI AY1H47IKTWadMs2VCEuvV5zMzTf/RP7765FpKB/91/+Tm14A6A4SUY1QRc9mLRJ2 +vCy6UU8MXhEngKmFUKB+ZBte7YB8GdxrNeStM2vIHTYuRqYEgvzZLmU3oIwubw6 kOSOy1evKLuXNqe9DxbIwfMdE2T1DMvEtryLGUrHLG3UR35r5LwkhIW7Fp00VD8= =vGEd -----END PGP SIGNATURE----- |
From: vincent r. <vin...@gm...> - 2012-12-05 17:39:31
|
Hi, In fact, I prefer to not use an OCI_Event as all other fields (database, datetime, etc....) won't be available..... That would be messy ! Thus, what i proposed does not make sense. As the goal is to implement async dequeuing i prefer proposing another design. What do you think about the following : * OCI_DequeueEnableAsync(port, timeout) * OCI_DequeueDisableAsync(void) * OCI_DequeueGetStartAsync(oci_dequeue *, async callback) * OCI_DequeueGetStopAsync(oci_dequeue *) Where async callback prototype woulb be like: void callback(oci_dequeue*, oci_msg*) Thus no public subscription (it would be internal). Just call to enable/disable program wide aq notification. Then you provide a callback that gets msg object. I find it smooth. It may need to support more than one central notification... Vincent -----BEGIN PGP SIGNED MESSAGE----- Hash: SHA1 hi Vincent, yes, it makes perfect sense to me. I'm looking forward to it! p. On 12/5/12 9:07 AM, vincent rogier wrote: > Hi, > > > About coding, this is a secondary easy task that required only just few lines of code. > I'm more focused on how to integrate to OCILIB. > Is the following mock-up all right for you ? > > > #include "ocilib.h" > > #ifdef _WINDOWS > #define sleep(x) Sleep(x*1000) > #endif > > #define PORT 5468 > #define TIMEOUT 0 > > void event_handler(OCI_Event *event); > void error_handler(OCI_Error *err); > > int main(void) > { > OCI_Connection *con; > OCI_Subscription *sub; > OCI_Dequeue *deq1; > OCI_Dequeue *deq2; > > OCI_Initialize(NULL, NULL, OCI_ENV_DEFAULT | OCI_ENV_EVENTS | OCI_ENV_THREADED); > > con = OCI_ConnectionCreate("db", "usr", "pwd", OCI_SESSION_DEFAULT); > sub = OCI_SubscriptionRegister(con, "SUB-MSG", OCI_CNT_MESSAGE, event_handler, PORT, TIMEOUT); > inf = OCI_TypeInfoGet(con, "MY_MESSAGE", OCI_TIF_TYPE); > deq1 = OCI_DequeueCreate(inf, "MY_QUEUE"); > deq2 = OCI_DequeueCreate(inf, "MY_QUEUE"); > > OCI_DequeueSetConsumer(deq1, "CONSUMER1"); > OCI_DequeueSetConsumer(deq2, "CONSUMER2"); > > OCI_SubscriptionAddDequeue(sub, deq1); > OCI_SubscriptionAddDequeue(sub, deq2); > > sleep(MAX_INT); > > OCI_DequeueFree(deq1); > OCI_DequeueFree(deq2); > OCI_SubscriptionUnregister(sub); > OCI_ConnectionFree(con); > > OCI_Cleanup(); > > return EXIT_SUCCESS; > } > > void error_handler(OCI_Error *err) > { > printf("*** error : %s\n", OCI_ErrorGetString(err)); > } > > void event_handler(OCI_Event *event) > { > OCI_Dequeue *deq = OCI_EventGetDequeue(event); > OCI_Msg *msg = OCI_DequeueGet(deq); > > /* process payload */ > } > > > > On Tue, Dec 4, 2012 at 8:57 PM, Petr Vaněk <pe...@ya... <mailto:pe...@ya...> <pe...@ya...>> wrote: > > OCI_SubscriptionAddDequeue > > > > > -- > Vincent Rogier -----BEGIN PGP SIGNATURE----- Version: GnuPG v1.4.12 (Darwin) Comment: GPGTools - http://gpgtools.org Comment: Using GnuPG with undefined - http://www.enigmail.net/ iQEcBAEBAgAGBQJQvwGAAAoJEC8yRjM4uE2t7MwH/iYJyM13uHjr7pEYewQJ+cLV 5iYQTyB6RrD5TCVDXyid9KB55AVHHtef4yr24HDkMTxAit0D+ayP3T51EqV7ojuf ncqMR895TJMleK01+NSy9W0jzevj81/K8Tia3QO6QdXJd7EPNHiBY0m2EYJaTPbI AY1H47IKTWadMs2VCEuvV5zMzTf/RP7765FpKB/91/+Tm14A6A4SUY1QRc9mLRJ2 +vCy6UU8MXhEngKmFUKB+ZBte7YB8GdxrNeStM2vIHTYuRqYEgvzZLmU3oIwubw6 kOSOy1evKLuXNqe9DxbIwfMdE2T1DMvEtryLGUrHLG3UR35r5LwkhIW7Fp00VD8= =vGEd -----END PGP SIGNATURE----- |
From: vincent r. <vin...@gm...> - 2012-12-05 19:16:55
|
Hi, The design exposed in my previous email would give : void msg_handler(OCI_Dequeue *deq, OCI_Msg *msg) { /* process msg payload */ } int main(void) { OCI_Connection *con; OCI_Dequeue *deq1; OCI_Dequeue *deq2; OCI_Initialize(NULL, NULL, OCI_ENV_DEFAULT | OCI_ENV_EVENTS | OCI_ENV_THREADED); con = OCI_ConnectionCreate("db", "usr", "pwd", OCI_SESSION_DEFAULT); inf = OCI_TypeInfoGet(con, "MY_MESSAGE", OCI_TIF_TYPE); deq1 = OCI_DequeueCreate(inf, "MY_QUEUE"); deq2 = OCI_DequeueCreate(inf, "MY_QUEUE"); OCI_DequeueEnableAsync(PORT, TIMEOUT); OCI_DequeueSetConsumer(deq1, "CONSUMER1"); OCI_DequeueSetConsumer(deq2, "CONSUMER2"); OCI_DequeueGetStartAsync(deq1, msg_handler); OCI_DequeueGetStartAsync(deq2, msg_handler); sleep(MAX_INT); OCI_DequeueGetStopAsync(deq1); //optional - performed by OCI_DequeueFree OCI_DequeueGetStopAsync(deq2); //optional - performed by OCI_DequeueFree OCI_DequeueFree(deq1); OCI_DequeueFree(deq2); OCI_DequeueDisableAsync(); //optional - performed by OCI_Cleanup OCI_ConnectionFree(con); OCI_Cleanup(); return EXIT_SUCCESS; } On Wed, Dec 5, 2012 at 6:39 PM, vincent rogier <vin...@gm...>wrote: > Hi, > > In fact, I prefer to not use an OCI_Event as all other fields (database, > datetime, etc....) won't be available..... That would be messy ! > > Thus, what i proposed does not make sense. > > As the goal is to implement async dequeuing i prefer proposing another > design. What do you think about the following : > * OCI_DequeueEnableAsync(port, timeout) > * OCI_DequeueDisableAsync(void) > * OCI_DequeueGetStartAsync(oci_dequeue *, async callback) > * OCI_DequeueGetStopAsync(oci_dequeue *) > > Where async callback prototype woulb be like: > void callback(oci_dequeue*, oci_msg*) > > Thus no public subscription (it would be internal). Just call to > enable/disable program wide aq notification. Then you provide a callback > that gets msg object. I find it smooth. > It may need to support more than one central notification... > > Vincent > > -----BEGIN PGP SIGNED MESSAGE----- > Hash: SHA1 > > hi Vincent, > > yes, it makes perfect sense to me. > > I'm looking forward to it! > p. > > On 12/5/12 9:07 AM, vincent rogier wrote: > > Hi, > > > > > > About coding, this is a secondary easy task that required only just few > lines of code. > > I'm more focused on how to integrate to OCILIB. > > Is the following mock-up all right for you ? > > > > > > #include "ocilib.h" > > > > #ifdef _WINDOWS > > #define sleep(x) Sleep(x*1000) > > #endif > > > > #define PORT 5468 > > #define TIMEOUT 0 > > > > void event_handler(OCI_Event *event); > > void error_handler(OCI_Error *err); > > > > int main(void) > > { > > OCI_Connection *con; > > OCI_Subscription *sub; > > OCI_Dequeue *deq1; > > OCI_Dequeue *deq2; > > > > OCI_Initialize(NULL, NULL, OCI_ENV_DEFAULT | OCI_ENV_EVENTS | > OCI_ENV_THREADED); > > > > con = OCI_ConnectionCreate("db", "usr", "pwd", OCI_SESSION_DEFAULT); > > sub = OCI_SubscriptionRegister(con, "SUB-MSG", OCI_CNT_MESSAGE, > event_handler, PORT, TIMEOUT); > > inf = OCI_TypeInfoGet(con, "MY_MESSAGE", OCI_TIF_TYPE); > > deq1 = OCI_DequeueCreate(inf, "MY_QUEUE"); > > deq2 = OCI_DequeueCreate(inf, "MY_QUEUE"); > > > > OCI_DequeueSetConsumer(deq1, "CONSUMER1"); > > OCI_DequeueSetConsumer(deq2, "CONSUMER2"); > > > > OCI_SubscriptionAddDequeue(sub, deq1); > > OCI_SubscriptionAddDequeue(sub, deq2); > > > > sleep(MAX_INT); > > > > OCI_DequeueFree(deq1); > > OCI_DequeueFree(deq2); > > OCI_SubscriptionUnregister(sub); > > OCI_ConnectionFree(con); > > > > OCI_Cleanup(); > > > > return EXIT_SUCCESS; > > } > > > > void error_handler(OCI_Error *err) > > { > > printf("*** error : %s\n", OCI_ErrorGetString(err)); > > } > > > > void event_handler(OCI_Event *event) > > { > > OCI_Dequeue *deq = OCI_EventGetDequeue(event); > > OCI_Msg *msg = OCI_DequeueGet(deq); > > > > /* process payload */ > > } > > > > > > > > On Tue, Dec 4, 2012 at 8:57 PM, Petr Vaněk <pe...@ya... > <mailto:pe...@ya...> <pe...@ya...>> wrote: > > > > OCI_SubscriptionAddDequeue > > > > > > > > > > -- > > Vincent Rogier > > -----BEGIN PGP SIGNATURE----- > Version: GnuPG v1.4.12 (Darwin) > Comment: GPGTools - http://gpgtools.org > Comment: Using GnuPG with undefined - http://www.enigmail.net/ > > iQEcBAEBAgAGBQJQvwGAAAoJEC8yRjM4uE2t7MwH/iYJyM13uHjr7pEYewQJ+cLV > 5iYQTyB6RrD5TCVDXyid9KB55AVHHtef4yr24HDkMTxAit0D+ayP3T51EqV7ojuf > ncqMR895TJMleK01+NSy9W0jzevj81/K8Tia3QO6QdXJd7EPNHiBY0m2EYJaTPbI > AY1H47IKTWadMs2VCEuvV5zMzTf/RP7765FpKB/91/+Tm14A6A4SUY1QRc9mLRJ2 > +vCy6UU8MXhEngKmFUKB+ZBte7YB8GdxrNeStM2vIHTYuRqYEgvzZLmU3oIwubw6 > kOSOy1evKLuXNqe9DxbIwfMdE2T1DMvEtryLGUrHLG3UR35r5LwkhIW7Fp00VD8= > =vGEd > -----END PGP SIGNATURE----- > > -- Vincent Rogier |
From: Petr V. <pe...@ya...> - 2012-12-05 20:04:03
|
-----BEGIN PGP SIGNED MESSAGE----- Hash: SHA1 that would be perfect (IMHO). I think it's a very clean solution. For eg. simple monitoring it will need only few lines of code. also I silently assume thatOCI_DequeueSetConsumer is still optional (or available with consumer=NULL) to allow to catch all handler's events. thanks, petr On 12/5/12 8:16 PM, vincent rogier wrote: > Hi, > > The design exposed in my previous email would give : > > > void msg_handler(OCI_Dequeue *deq, OCI_Msg *msg) > { > /* process msg payload */ > } > > int main(void) > { > OCI_Connection *con; > OCI_Dequeue *deq1; > OCI_Dequeue *deq2; > > OCI_Initialize(NULL, NULL, OCI_ENV_DEFAULT | OCI_ENV_EVENTS | OCI_ENV_THREADED); > > con = OCI_ConnectionCreate("db", "usr", "pwd", OCI_SESSION_DEFAULT); > inf = OCI_TypeInfoGet(con, "MY_MESSAGE", OCI_TIF_TYPE); > deq1 = OCI_DequeueCreate(inf, "MY_QUEUE"); > deq2 = OCI_DequeueCreate(inf, "MY_QUEUE"); > > OCI_DequeueEnableAsync(PORT, TIMEOUT); > > OCI_DequeueSetConsumer(deq1, "CONSUMER1"); > OCI_DequeueSetConsumer(deq2, "CONSUMER2"); > > OCI_DequeueGetStartAsync(deq1, msg_handler); > OCI_DequeueGetStartAsync(deq2, msg_handler); > > sleep(MAX_INT); > > OCI_DequeueGetStopAsync(deq1); //optional - performed by OCI_DequeueFree > OCI_DequeueGetStopAsync(deq2); //optional - performed by OCI_DequeueFree > OCI_DequeueFree(deq1); > OCI_DequeueFree(deq2); > > OCI_DequeueDisableAsync(); //optional - performed by OCI_Cleanup > OCI_ConnectionFree(con); > OCI_Cleanup(); > > return EXIT_SUCCESS; > } > > > > On Wed, Dec 5, 2012 at 6:39 PM, vincent rogier <vin...@gm... <mailto:vin...@gm...>> wrote: > > Hi, > > In fact, I prefer to not use an OCI_Event as all other fields (database, datetime, etc....) won't be available..... That would be messy ! > > Thus, what i proposed does not make sense. > > As the goal is to implement async dequeuing i prefer proposing another design. What do you think about the following : > * OCI_DequeueEnableAsync(port, timeout) > * OCI_DequeueDisableAsync(void) > * OCI_DequeueGetStartAsync(oci_dequeue *, async callback) > * OCI_DequeueGetStopAsync(oci_dequeue *) > > Where async callback prototype woulb be like: > void callback(oci_dequeue*, oci_msg*) > > Thus no public subscription (it would be internal). Just call to enable/disable program wide aq notification. Then you provide a callback that gets msg object. I find it smooth. > It may need to support more than one central notification... > > Vincent > > > hi Vincent, > > yes, it makes perfect sense to me. > > I'm looking forward to it! > p. > > On 12/5/12 9:07 AM, vincent rogier wrote: > > Hi, > > > > About coding, this is a secondary easy task that required only just few lines of code. > > I'm more focused on how to integrate to OCILIB. > > Is the following mock-up all right for you ? > > > > #include "ocilib.h" > > > #ifdef _WINDOWS > > #define sleep(x) Sleep(x*1000) > > #endif > > > #define PORT 5468 > > #define TIMEOUT 0 > > > void event_handler(OCI_Event *event); > > void error_handler(OCI_Error *err); > > > int main(void) > > { > > OCI_Connection *con; > > OCI_Subscription *sub; > > OCI_Dequeue *deq1; > > OCI_Dequeue *deq2; > > > OCI_Initialize(NULL, NULL, OCI_ENV_DEFAULT | OCI_ENV_EVENTS | OCI_ENV_THREADED); > > > con = OCI_ConnectionCreate("db", "usr", "pwd", OCI_SESSION_DEFAULT); > > sub = OCI_SubscriptionRegister(con, "SUB-MSG", OCI_CNT_MESSAGE, event_handler, PORT, TIMEOUT); > > inf = OCI_TypeInfoGet(con, "MY_MESSAGE", OCI_TIF_TYPE); > > deq1 = OCI_DequeueCreate(inf, "MY_QUEUE"); > > deq2 = OCI_DequeueCreate(inf, "MY_QUEUE"); > > > OCI_DequeueSetConsumer(deq1, "CONSUMER1"); > > OCI_DequeueSetConsumer(deq2, "CONSUMER2"); > > > OCI_SubscriptionAddDequeue(sub, deq1); > > OCI_SubscriptionAddDequeue(sub, deq2); > > > sleep(MAX_INT); > > > OCI_DequeueFree(deq1); > > OCI_DequeueFree(deq2); > > OCI_SubscriptionUnregister(sub); > > OCI_ConnectionFree(con); > > > OCI_Cleanup(); > > > return EXIT_SUCCESS; > > } > > > void error_handler(OCI_Error *err) > > { > > printf("*** error : %s\n", OCI_ErrorGetString(err)); > > } > > > void event_handler(OCI_Event *event) > > { > > OCI_Dequeue *deq = OCI_EventGetDequeue(event); > > OCI_Msg *msg = OCI_DequeueGet(deq); > > > /* process payload */ > > } > > > > > On Tue, Dec 4, 2012 at 8:57 PM, Petr Vaněk <pe...@ya... <mailto:pe...@ya...> <mailto:pe...@ya...> <mailto:pe...@ya...>> wrote: > > > OCI_SubscriptionAddDequeue > > > > > > -- > > Vincent Rogier > > > > > > -- > Vincent Rogier -----BEGIN PGP SIGNATURE----- Version: GnuPG v1.4.12 (Darwin) Comment: GPGTools - http://gpgtools.org Comment: Using GnuPG with undefined - http://www.enigmail.net/ iQEcBAEBAgAGBQJQv6iiAAoJEC8yRjM4uE2tpd8H/RwdU3qGzA/Vx1J9xI8gzhGq S6qypr0K/ErrRCMCVNZhPGYYp4it1Os8pFek7FidETkOjPdO9HcCq1IVPdFWuX9N ZjqG6+tKWOMSUfHunXxwqdN+I28z0K5jgCzpKGpf2gtAdKnWeg/TZn02sB1sfAle a2fDbpFWzjtZ0N3wtA1zNJrsOAlHLwK8ciIGYbQXsPYMFDBrqgzR2n5iWUgpuH4R 8PJR4m48e6I/40+TZhrunFWg9QFXCZPW+CbxZdLz+XzrIRZEcXJ100yar63AeLfu vaX1XGluwj9+Og88EHxT50gnM1LNRXclU4K1Im66Q/u3NN2zm+6uxx33NcEAVpg= =pry4 -----END PGP SIGNATURE----- |
From: vincent r. <vin...@gm...> - 2012-12-05 20:11:54
|
yep, OCI_DequeueSetConsumer is still optional. I'm currently working on implementing the design shown oin my previous email. I'll send to you a temporary ocilib package later this evening :) On Wed, Dec 5, 2012 at 9:03 PM, Petr Vaněk <pe...@ya...> wrote: > ly assume thatOCI_DequeueSetConsumer is sti -- Vincent Rogier |
From: vincent r. <vin...@gm...> - 2012-12-06 00:13:42
|
Hum... I've been testing my implementation that seemed to works nicely. But... We can have two types of queue table : single consumer and multiple consumers. First case, a message is consumed by one consumer Second case, a message can be consumed by many consumer In the first case, it simple. For the second case, the OCI subscription must specify the consumer name.... Thus it is not possible possible to have a library wide subscription when dealing with a queue with multiple consumers. Thus a subscription must be done for each dequeue object... So i removed OCI_DequeueEnableAsync() and OCI_DequeueDisableAsync() We end up with only 2 new functions ! Here is 2 examples: First a queue with multiple consumer set to false (default) : void on_message(OCI_Dequeue *deq, OCI_Msg *msg) { /* process message here */ } int main(int argc, char *argv[]) { OCI_Connection *con; OCI_Dequeue *deq; OCI_TypeInfo *inf; OCI_Initialize(err_handler, NULL, OCI_ENV_DEFAULT | OCI_ENV_THREADED |OCI_ENV_EVENTS); con = OCI_ConnectionCreate("db11g", "usr", "pwd", OCI_SESSION_DEFAULT); inf = OCI_TypeInfoGet(con, "MY_MESSAGE", OCI_TIF_TYPE); deq = OCI_DequeueCreate(inf, "usr.my_queue"); OCI_DequeueGetStartAsync(deq, 9999, 0, on_message); getchar(); OCI_DequeueGetStopAsync(deq); OCI_DequeueFree(deq); OCI_ConnectionFree(con); OCI_Cleanup(); return EXIT_SUCCESS; } And a second example where we have a queue with multiple consumers : void on_message(OCI_Dequeue *deq, OCI_Msg *msg) { /* process message here */ } int main(int argc, char *argv[]) { OCI_Connection *con; OCI_Dequeue *deq1; OCI_Dequeue *deq2; OCI_TypeInfo *inf; OCI_Initialize(err_handler, NULL, OCI_ENV_DEFAULT | OCI_ENV_THREADED |OCI_ENV_EVENTS); con = OCI_ConnectionCreate("db11g", "usr", "pwd", OCI_SESSION_DEFAULT); inf = OCI_TypeInfoGet(con, "MY_MESSAGE", OCI_TIF_TYPE); deq1 = OCI_DequeueCreate(inf, "usr.my_queue"); deq2 = OCI_DequeueCreate(inf, "usr.my_queue"); OCI_DequeueSetConsumer(deq1, "C1"); OCI_DequeueSetNavigation(deq1, OCI_ADN_FIRST_MSG); OCI_DequeueSetConsumer(deq2, "C2"); OCI_DequeueSetNavigation(deq2, OCI_ADN_FIRST_MSG); OCI_DequeueGetStartAsync(deq1, 9998, 0, on_message); OCI_DequeueGetStartAsync(deq2, 9999, 0, on_message); getchar(); OCI_DequeueGetStopAsync(deq1); OCI_DequeueGetStopAsync(deq2); OCI_DequeueFree(deq1); OCI_DequeueFree(deq2); OCI_ConnectionFree(con); OCI_Cleanup(); return EXIT_SUCCESS; } My tests seems to run smoothly... Let me know your opinion :) Vincent On Wed, Dec 5, 2012 at 9:11 PM, vincent rogier <vin...@gm...>wrote: > yep, OCI_DequeueSetConsumer is still optional. > > I'm currently working on implementing the design shown oin my previous > email. > > I'll send to you a temporary ocilib package later this evening :) > > On Wed, Dec 5, 2012 at 9:03 PM, Petr Vaněk <pe...@ya...> wrote: > >> ly assume thatOCI_DequeueSetConsumer is sti > > > > > -- > Vincent Rogier > -- Vincent Rogier |