首页 > 技术文章 > Reusing dialogs with a dialog pool--一个sql server service broker例子

davidhou 2016-04-28 13:31 原文

一个sql server service broker例子

-----------------------------------
USE master
GO
--------------------------------------------------
-- Create demo database section
-------------------------------------------------- 

IF EXISTS (SELECT name FROM sys.databases WHERE name = 'SsbDemoDb')
      DROP DATABASE [SsbDemoDb]; 

CREATE DATABASE [SsbDemoDb]
GO 

USE [SsbDemoDb];
GO  

--------------------------------------------------
-- Dialog pool section
-------------------------------------------------- 

--------------------------------------------------
-- The dialog pool table.
-- Obtain a conversation handle using from service, to service, and contract.
-- Also indicates age and usage of dialog for auditing purposes.
--------------------------------------------------

IF EXISTS (SELECT name FROM sys.tables WHERE name = 'DialogPool')
      DROP TABLE [DialogPool]
GO

CREATE TABLE [DialogPool] (
      FromService SYSNAME NOT NULL,
      ToService SYSNAME NOT NULL,
      OnContract SYSNAME NOT NULL,
      Handle UNIQUEIDENTIFIER NOT NULL,
      OwnerSPID INT NOT NULL,
      CreationTime DATETIME NOT NULL,
      SendCount BIGINT NOT NULL,
      UNIQUE (Handle));
GO

--------------------------------------------------
-- Get dialog procedure.
-- Reuse a free dialog in the pool or create a new one in case
-- no free dialogs exist.
-- Input is from service, to service, and contract.
-- Output is dialog handle and count of message previously sent on dialog.
--------------------------------------------------

IF EXISTS (SELECT name FROM sys.procedures WHERE name = 'usp_get_dialog')
      DROP PROC usp_get_dialog
GO

CREATE PROCEDURE [usp_get_dialog] (
      @fromService SYSNAME,
      @toService SYSNAME,
      @onContract SYSNAME,
      @dialogHandle UNIQUEIDENTIFIER OUTPUT,
      @sendCount BIGINT OUTPUT)
AS
BEGIN
      SET NOCOUNT ON;
      DECLARE @dialog TABLE
      (
          FromService SYSNAME NOT NULL,
          ToService SYSNAME NOT NULL,
          OnContract SYSNAME NOT NULL,
          Handle UNIQUEIDENTIFIER NOT NULL,
          OwnerSPID INT NOT NULL,
          CreationTime DATETIME NOT NULL,
          SendCount BIGINT NOT NULL
      ); 

      -- Try to claim an unused dialog in [DialogPool]
      -- READPAST option avoids blocking on locked dialogs.
      BEGIN TRANSACTION;
      DELETE @dialog;
      UPDATE TOP(1) [DialogPool] WITH(READPAST)
             SET OwnerSPID = @@SPID
             OUTPUT INSERTED.* INTO @dialog
             WHERE FromService = @fromService
                   AND ToService = @toService
                   AND OnContract = @OnContract
                   AND OwnerSPID = -1;
      IF @@ROWCOUNT > 0
      BEGIN
           SET @dialogHandle = (SELECT Handle FROM @dialog);
           SET @sendCount = (SELECT SendCount FROM @dialog);  
      END
      ELSE
      BEGIN
           -- No free dialogs: need to create a new one
           BEGIN DIALOG CONVERSATION @dialogHandle
                 FROM SERVICE @fromService
                 TO SERVICE @toService
                 ON CONTRACT @onContract
                 WITH ENCRYPTION = OFF;
           INSERT INTO [DialogPool]
                  (FromService, ToService, OnContract, Handle, OwnerSPID,
                      CreationTime, SendCount)
                  VALUES
                  (@fromService, @toService, @onContract, @dialogHandle, @@SPID,
                      GETDATE(), 0);
          SET @sendCount = 0;
      END
      COMMIT
END;
GO
 

--------------------------------------------------
-- Free dialog procedure.
-- Return the dialog to the pool.
-- Inputs are dialog handle and updated send count.
--------------------------------------------------

IF EXISTS (SELECT name FROM sys.procedures WHERE name = 'usp_free_dialog')
      DROP PROC usp_free_dialog
GO

CREATE PROCEDURE [usp_free_dialog] (
      @dialogHandle UNIQUEIDENTIFIER,
      @sendCount BIGINT)
AS
BEGIN
      SET NOCOUNT ON;
      DECLARE @rowcount INT;
      DECLARE @string VARCHAR(50);
      BEGIN TRANSACTION; 
      -- Release dialog by setting OwnerSPID to -1.
      UPDATE [DialogPool] SET OwnerSPID = -1, SendCount = @sendCount WHERE Handle = @dialogHandle;
      SELECT @rowcount = @@ROWCOUNT;
      IF @rowcount = 0
      BEGIN
           SET @string = (SELECT CAST( @dialogHandle AS VARCHAR(50)));
           RAISERROR('usp_free_dialog: dialog %s not found in dialog pool', 16, 1, @string) WITH LOG;
      END
      ELSE IF @rowcount > 1
      BEGIN
           SET @string = (SELECT CAST( @dialogHandle AS VARCHAR(50)));
           RAISERROR('usp_free_dialog: duplicate dialog %s found in dialog pool', 16, 1, @string) WITH LOG;
      END 
      COMMIT
END;

GO

 

--------------------------------------------------
-- Delete dialog procedure.
-- Delete the dialog from the pool. This does not end the dialog.
-- Input is dialog handle.
--------------------------------------------------

IF EXISTS (SELECT name FROM sys.procedures WHERE name = 'usp_delete_dialog')
      DROP PROC usp_delete_dialog
GO

CREATE PROCEDURE [usp_delete_dialog] (
      @dialogHandle UNIQUEIDENTIFIER)
AS
BEGIN
      SET NOCOUNT ON; 
      BEGIN TRANSACTION;
      DELETE [DialogPool] WHERE Handle = @dialogHandle;
      COMMIT
END;
GO

--------------------------------------------------
-- Application setup section.
-------------------------------------------------- 

--------------------------------------------------
-- Send messages from initiator to target.
-- Initiator uses dialogs from the dialog pool.
-- Initiator also retires dialogs based on application criteria,
-- which results in recycling dialogs in the pool.
-------------------------------------------------- 
-- This table stores the messages on the target side

IF EXISTS (SELECT name FROM sys.tables WHERE name = 'MsgTable')
      DROP TABLE MsgTable
GO

CREATE TABLE MsgTable ( message_type SYSNAME, message_body NVARCHAR(4000))
GO 

-- Activated store proc for the initiator to receive messages.

CREATE PROCEDURE initiator_queue_activated_procedure
AS
BEGIN
     DECLARE @handle UNIQUEIDENTIFIER;
     DECLARE @message_type SYSNAME; 
     BEGIN TRANSACTION;
     WAITFOR (
          RECEIVE TOP(1) @handle = [conversation_handle],
            @message_type = [message_type_name]
          FROM [SsbInitiatorQueue]), TIMEOUT 5000;
     IF @@ROWCOUNT = 1
     BEGIN
          -- Expect target response to EndOfStream message.
          IF @message_type = 'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog'
          BEGIN
               END CONVERSATION @handle;
          END
     END
     COMMIT
END;

GO 

-- Activated store proc for the target to receive messages.

CREATE PROCEDURE target_queue_activated_procedure
AS
BEGIN
    -- Variable table for received messages.
    DECLARE @receive_table TABLE(
            queuing_order BIGINT,
            conversation_handle UNIQUEIDENTIFIER,
            message_type_name SYSNAME,
            message_body VARCHAR(MAX));  

    -- Cursor for received message table.
    DECLARE message_cursor CURSOR LOCAL FORWARD_ONLY READ_ONLY
            FOR SELECT
            conversation_handle,
            message_type_name,
            message_body
            FROM @receive_table ORDER BY queuing_order; 

     DECLARE @conversation_handle UNIQUEIDENTIFIER;
     DECLARE @message_type SYSNAME;
     DECLARE @message_body VARCHAR(4000); 

     -- Error variables.
     DECLARE @error_number INT;
     DECLARE @error_message VARCHAR(4000);
     DECLARE @error_severity INT;
     DECLARE @error_state INT;
     DECLARE @error_procedure SYSNAME;
     DECLARE @error_line INT;
     DECLARE @error_dialog VARCHAR(50); 

     BEGIN TRY
       WHILE (1 = 1)
       BEGIN
         BEGIN TRANSACTION;  
         -- Receive all available messages into the table.
         -- Wait 5 seconds for messages.
         WAITFOR (
            RECEIVE
               [queuing_order],
               [conversation_handle],
               [message_type_name],
               CAST([message_body] AS VARCHAR(4000))
            FROM [SsbTargetQueue]
            INTO @receive_table
         ), TIMEOUT 5000;  

         IF @@ROWCOUNT = 0
         BEGIN
              COMMIT;
              BREAK;
         END
         ELSE
         BEGIN
              OPEN message_cursor;
              WHILE (1=1)
              BEGIN
                  FETCH NEXT FROM message_cursor
                            INTO @conversation_handle,
                                 @message_type,
                                 @message_body;    
                  IF (@@FETCH_STATUS != 0) BREAK; 
                  -- Process a message.
                  -- If an exception occurs, catch and attempt to recover.
                  BEGIN TRY 
                      IF @message_type = 'SsbMsgType'
                      BEGIN
                          -- process the msg. Here we will just insert it into a table
                          INSERT INTO MsgTable values(@message_type, @message_body);
                      END
                      ELSE IF @message_type = 'EndOfStream'
                      BEGIN
                          -- initiator is signaling end of message stream: end the dialog
                          END CONVERSATION @conversation_handle;
                      END
                      ELSE IF @message_type = 'http://schemas.microsoft.com/SQL/ServiceBroker/Error'
                      BEGIN
                           -- If the message_type indicates that the message is an error,
                           -- raise the error and end the conversation.
                           WITH XMLNAMESPACES ('http://schemas.microsoft.com/SQL/ServiceBroker/Error' AS ssb)
                           SELECT
                           @error_number = CAST(@message_body AS XML).value('(//ssb:Error/ssb:Code)[1]', 'INT'),
                           @error_message = CAST(@message_body AS XML).value('(//ssb:Error/ssb:Description)[1]', 'VARCHAR(4000)');
                           SET @error_dialog = CAST(@conversation_handle AS VARCHAR(50));
                           RAISERROR('Error in dialog %s: %s (%i)', 16, 1, @error_dialog, @error_message, @error_number);
                           END CONVERSATION @conversation_handle;
                      END
                  END TRY
                  BEGIN CATCH
                     SET @error_number = ERROR_NUMBER();
                     SET @error_message = ERROR_MESSAGE();
                     SET @error_severity = ERROR_SEVERITY();
                     SET @error_state = ERROR_STATE();
                     SET @error_procedure = ERROR_PROCEDURE();
                     SET @error_line = ERROR_LINE();            

                     IF XACT_STATE() = -1
                     BEGIN
                          -- The transaction is doomed. Only rollback possible.
                          -- This could disable the queue if done 5 times consecutively!
                          ROLLBACK TRANSACTION;            

                          -- Record the error.
                          BEGIN TRANSACTION;
                          INSERT INTO target_processing_errors VALUES(NULL, @error_number, @error_message,
                                 @error_severity, @error_state, @error_procedure, @error_line, 1);
                          COMMIT; 

                          -- For this level of error, it is best to exit the proc
                          -- and give the queue monitor control.
                          -- Breaking to the outer catch will accomplish this.
                          RAISERROR ('Message processing error', 16, 1);
                     END
                     ELSE IF XACT_STATE() = 1
                     BEGIN
                          -- Record error and continue processing messages.
                          -- Failing message could also be put aside for later processing here.
                          -- Otherwise it will be discarded.
                          INSERT INTO target_processing_errors VALUES(NULL, @error_number, @error_message,
                                 @error_severity, @error_state, @error_procedure, @error_line, 0);
                     END
                  END CATCH
              END
              CLOSE message_cursor;
              DELETE @receive_table;
         END
         COMMIT;
       END
     END TRY
     BEGIN CATCH
         -- Process the error and exit the proc to give the queue monitor control
         SET @error_number = ERROR_NUMBER();
         SET @error_message = ERROR_MESSAGE();
         SET @error_severity = ERROR_SEVERITY();
         SET @error_state = ERROR_STATE();
         SET @error_procedure = ERROR_PROCEDURE();
         SET @error_line = ERROR_LINE();

         IF XACT_STATE() = -1
         BEGIN
              -- The transaction is doomed. Only rollback possible.
              -- This could disable the queue if done 5 times consecutively!
              ROLLBACK TRANSACTION;
              -- Record the error.
              BEGIN TRANSACTION;
              INSERT INTO target_processing_errors VALUES(NULL, @error_number, @error_message,
                     @error_severity, @error_state, @error_procedure, @error_line, 1);
              COMMIT;
         END
         ELSE IF XACT_STATE() = 1
         BEGIN
              -- Record error and commit transaction.
              -- Here you could also save anything else you want before exiting.
              INSERT INTO target_processing_errors VALUES(NULL, @error_number, @error_message,
                     @error_severity, @error_state, @error_procedure, @error_line, 0);
              COMMIT;
         END
     END CATCH
END;
GO
 
-- Table to store processing errors.

IF EXISTS (SELECT name FROM sys.tables WHERE name = 'target_processing_errors')
      DROP TABLE target_processing_errors;
GO 

CREATE TABLE target_processing_errors (error_conversation UNIQUEIDENTIFIER, error_number INT,
       error_message VARCHAR(4000), error_severity INT, error_state INT, error_procedure SYSNAME NULL,
       error_line INT, doomed_transaction TINYINT)
GO 

-- Create Initiator and Target side SSB entities
CREATE MESSAGE TYPE SsbMsgType VALIDATION = WELL_FORMED_XML;
CREATE MESSAGE TYPE EndOfStream;
CREATE CONTRACT SsbContract
       (
        SsbMsgType SENT BY INITIATOR,
        EndOfStream SENT BY INITIATOR
       );

CREATE QUEUE SsbInitiatorQueue
      WITH ACTIVATION (
            STATUS = ON,
            MAX_QUEUE_READERS = 1,
            PROCEDURE_NAME = [initiator_queue_activated_procedure],
            EXECUTE AS OWNER);
CREATE QUEUE SsbTargetQueue
      WITH ACTIVATION (
            STATUS = ON,
            MAX_QUEUE_READERS = 1,
            PROCEDURE_NAME = [target_queue_activated_procedure],
            EXECUTE AS OWNER); 

CREATE SERVICE SsbInitiatorService ON QUEUE SsbInitiatorQueue;

CREATE SERVICE SsbTargetService ON QUEUE SsbTargetQueue (SsbContract);

GO

 

-- SEND procedure. Uses a dialog from the dialog pool.
--

IF EXISTS (SELECT name FROM sys.procedures WHERE name = 'usp_send')
      DROP PROC usp_send
GO
CREATE PROCEDURE [usp_send] (
      @fromService SYSNAME,
      @toService SYSNAME,
      @onContract SYSNAME,
      @messageType SYSNAME,
      @messageBody NVARCHAR(MAX))
AS
BEGIN
      SET NOCOUNT ON;
      DECLARE @dialogHandle UNIQUEIDENTIFIER;
      DECLARE @sendCount BIGINT; 
      DECLARE @counter INT;
      DECLARE @error INT;
      SELECT @counter = 1;
      BEGIN TRANSACTION;
      -- Will need a loop to retry in case the dialog is
      -- in a state that does not allow transmission
      --
      WHILE (1=1)
      BEGIN
            -- Claim a dialog from the dialog pool.
            -- A new one will be created if none are available.
            --
            EXEC usp_get_dialog @fromService, @toService, @onContract, @dialogHandle OUTPUT, @sendCount OUTPUT; 

            -- Attempt to SEND on the dialog
            --
            IF (@messageBody IS NOT NULL)
            BEGIN
                  -- If the @messageBody is not null it must be sent explicitly
                  SEND ON CONVERSATION @dialogHandle MESSAGE TYPE @messageType (@messageBody);
            END
            ELSE
            BEGIN
                  -- Messages with no body must *not* specify the body,
                  -- cannot send a NULL value argument
                  SEND ON CONVERSATION @dialogHandle MESSAGE TYPE @messageType;
            END               

            SELECT @error = @@ERROR;
            IF @error = 0
            BEGIN
                  -- Successful send, increment count and exit the loop
                  --
                  SET @sendCount = @sendCount + 1;
                  BREAK;
            END           

            SELECT @counter = @counter+1;
            IF @counter > 10
            BEGIN
                  -- We failed 10 times in a  row, something must be broken
                  --
                  RAISERROR('Failed to SEND on a conversation for more than 10 times. Error %i.', 16, 1, @error) WITH LOG;
            BREAK;
            END 

            -- Delete the associated dialog from the table and try again
            --
            EXEC usp_delete_dialog @dialogHandle;
            SELECT @dialogHandle = NULL;
      END

      -- “Criterion” for dialog pool removal is send count > 1000.
      -- Modify to suit application.
      -- When deleting also inform the target to end the dialog.
      IF @sendCount > 1000
      BEGIN
         EXEC usp_delete_dialog @dialogHandle ;
         SEND ON CONVERSATION @dialogHandle MESSAGE TYPE [EndOfStream];
      END
      ELSE
      BEGIN
         -- Free the dialog.
         EXEC usp_free_dialog @dialogHandle, @sendCount;
      END
      COMMIT
END;
GO

 

--------------------------------------------------------
-- Run application section
--------------------------------------------------------
 
-- Send some messages

exec usp_send N'SsbInitiatorService', N'SsbTargetService', N'SsbContract', N'SsbMsgType', N'<xml>This is a well formed XML Message1.</xml>'
exec usp_send N'SsbInitiatorService', N'SsbTargetService', N'SsbContract', N'SsbMsgType', N'<xml>This is a well formed XML Message2.</xml>'
exec usp_send N'SsbInitiatorService', N'SsbTargetService', N'SsbContract', N'SsbMsgType', N'<xml>This is a well formed XML Message3.</xml>'
exec usp_send N'SsbInitiatorService', N'SsbTargetService', N'SsbContract', N'SsbMsgType', N'<xml>This is a well formed XML Message4.</xml>'
exec usp_send N'SsbInitiatorService', N'SsbTargetService', N'SsbContract', N'SsbMsgType', N'<xml>This is a well formed XML Message5.</xml>'
GO

 

-- Show the dialog pool
SELECT * FROM [DialogPool]
GO 

-- Show the dialogs used.
SELECT * FROM sys.conversation_endpoints;
GO
 

-- Check whether the TARGET side has processed the messages
SELECT * FROM MsgTable
SELECT * FROM dialogpool
SELECT * FROM dbo.target_processing_errors

--TRUNCATE TABLE MsgTable
GO

 

推荐阅读