Class Jabber::Stream
In: lib/xmpp4r/stream.rb
Parent: Object
XMLStanza Message Presence Iq REXML::Element X IqQuery Error StreamHost IqSiFileRange IqSiFile StreamHostUsed IqSi IqFeature XRosterItem RosterItem XMUCUserItem XMUCUserInvite XDataField XDataReported XDataTitle XDataInstructions Feature Identity Item IqVcard Singleton IdGenerator Connection Client Component Comparable JID RuntimeError AuthenticationFailure ErrorException SOCKS5Error Stream SOCKS5Bytestreams SOCKS5BytestreamsTarget SOCKS5BytestreamsInitiator SOCKS5BytestreamsServerStreamHost TCPSocket SOCKS5Socket IqQuery IqQueryBytestreams IqQueryVersion IqQueryRoster IqQueryDiscoItems IqQueryDiscoInfo IBB IBBTarget IBBInitiator Responder SimpleResponder X XRoster XMUCUser XMUC XDelay XData MUCClient SimpleMUCClient Base DigestMD5 Plain FileSource StreamParser SOCKS5BytestreamsPeer SOCKS5BytestreamsServer IBBQueueItem Helper MUCBrowser Helper Helper lib/xmpp4r/authenticationfailure.rb lib/xmpp4r/idgenerator.rb lib/xmpp4r/connection.rb lib/xmpp4r/iq.rb lib/xmpp4r/jid.rb lib/xmpp4r/xmlstanza.rb lib/xmpp4r/errorexception.rb lib/xmpp4r/stream.rb lib/xmpp4r/client.rb lib/xmpp4r/x.rb lib/xmpp4r/streamparser.rb lib/xmpp4r/error.rb lib/xmpp4r/component.rb lib/xmpp4r/query.rb lib/xmpp4r/message.rb lib/xmpp4r/presence.rb lib/xmpp4r/bytestreams/helper/ibb/initiator.rb lib/xmpp4r/bytestreams/iq/si.rb lib/xmpp4r/bytestreams/iq/bytestreams.rb lib/xmpp4r/bytestreams/helper/socks5bytestreams/base.rb lib/xmpp4r/bytestreams/helper/socks5bytestreams/target.rb lib/xmpp4r/bytestreams/helper/socks5bytestreams/server.rb lib/xmpp4r/bytestreams/helper/socks5bytestreams/socks5.rb lib/xmpp4r/bytestreams/helper/socks5bytestreams/initiator.rb lib/xmpp4r/bytestreams/helper/ibb/base.rb lib/xmpp4r/bytestreams/helper/ibb/target.rb Bytestreams lib/xmpp4r/version/iq/version.rb lib/xmpp4r/version/helper/responder.rb lib/xmpp4r/version/helper/simpleresponder.rb Version lib/xmpp4r/feature_negotiation/iq/feature.rb FeatureNegotiation lib/xmpp4r/roster/helper/roster.rb lib/xmpp4r/roster/iq/roster.rb lib/xmpp4r/roster/x/roster.rb Roster lib/xmpp4r/muc/x/muc.rb lib/xmpp4r/muc/helper/mucclient.rb lib/xmpp4r/muc/x/mucuseritem.rb lib/xmpp4r/muc/helper/mucbrowser.rb lib/xmpp4r/muc/x/mucuserinvite.rb lib/xmpp4r/muc/helper/simplemucclient.rb MUC lib/xmpp4r/sasl.rb SASL lib/xmpp4r/bytestreams/helper/filetransfer.rb TransferSource FileTransfer lib/xmpp4r/delay/x/delay.rb Delay lib/xmpp4r/dataforms/x/data.rb Dataforms lib/xmpp4r/discovery/iq/discoinfo.rb lib/xmpp4r/discovery/iq/discoitems.rb Discovery lib/xmpp4r/vcard/helper/vcard.rb lib/xmpp4r/vcard/iq/vcard.rb Vcard Jabber Module: Jabber

The stream class manages a connection stream (a file descriptor using which XML messages are read and sent)

Methods

Classes and Modules

Class Jabber::Stream::ThreadBlock

Constants

DISCONNECTED = 1
CONNECTED = 2

Attributes

fd  [R]  file descriptor used
status  [R]  connection status

Public Class methods

Create a new stream (just initializes)

[Source]

    # File lib/xmpp4r/stream.rb, line 34
34:     def initialize(threaded = true)
35:       @fd = nil
36:       @status = DISCONNECTED
37:       @xmlcbs = CallbackList::new
38:       @stanzacbs = CallbackList::new
39:       @messagecbs = CallbackList::new
40:       @iqcbs = CallbackList::new
41:       @presencecbs = CallbackList::new
42:       unless threaded
43:         $stderr.puts "Non-threaded mode is currently broken, re-enabling threaded"
44:         threaded = true
45:       end
46:       @threaded = threaded
47:       @stanzaqueue = []
48:       @stanzaqueue_lock = Mutex::new
49:       @exception_block = nil
50:       @threadblocks = []
51: #      @pollCounter = 10
52:       @waiting_thread = nil
53:       @wakeup_thread = nil
54:       @streamid = nil
55:       @features_lock = Mutex.new
56:     end

Public Instance methods

Adds a callback block to process received Iqs

priority:[Integer] The callback’s priority, the higher, the sooner
ref:[String] The callback’s reference
&block:[Block] The optional block

[Source]

     # File lib/xmpp4r/stream.rb, line 518
518:     def add_iq_callback(priority = 0, ref = nil, &block)
519:       @iqcbs.add(priority, ref, block)
520:     end

Adds a callback block to process received Messages

priority:[Integer] The callback’s priority, the higher, the sooner
ref:[String] The callback’s reference
&block:[Block] The optional block

[Source]

     # File lib/xmpp4r/stream.rb, line 464
464:     def add_message_callback(priority = 0, ref = nil, &block)
465:       @messagecbs.add(priority, ref, block)
466:     end

Adds a callback block to process received Presences

priority:[Integer] The callback’s priority, the higher, the sooner
ref:[String] The callback’s reference
&block:[Block] The optional block

[Source]

     # File lib/xmpp4r/stream.rb, line 500
500:     def add_presence_callback(priority = 0, ref = nil, &block)
501:       @presencecbs.add(priority, ref, block)
502:     end

Adds a callback block to process received Stanzas

priority:[Integer] The callback’s priority, the higher, the sooner
ref:[String] The callback’s reference
&block:[Block] The optional block

[Source]

     # File lib/xmpp4r/stream.rb, line 482
482:     def add_stanza_callback(priority = 0, ref = nil, &block)
483:       @stanzacbs.add(priority, ref, block)
484:     end

Adds a callback block to process received XML messages

priority:[Integer] The callback’s priority, the higher, the sooner
ref:[String] The callback’s reference
&block:[Block] The optional block

[Source]

     # File lib/xmpp4r/stream.rb, line 446
446:     def add_xml_callback(priority = 0, ref = nil, &block)
447:       @xmlcbs.add(priority, ref, block)
448:     end

Closes the connection to the Jabber service

[Source]

     # File lib/xmpp4r/stream.rb, line 532
532:     def close
533:       close!
534:     end

[Source]

     # File lib/xmpp4r/stream.rb, line 536
536:     def close!
537:       @parserThread.kill if @parserThread
538: #      @pollThread.kill
539:       @fd.close if @fd and !@fd.closed?
540:       @status = DISCONNECTED
541:     end

Delete an Iq callback

ref:[String] The reference of the callback to delete

[Source]

     # File lib/xmpp4r/stream.rb, line 527
527:     def delete_iq_callback(ref)
528:       @iqcbs.delete(ref)
529:     end

Delete an Message callback

ref:[String] The reference of the callback to delete

[Source]

     # File lib/xmpp4r/stream.rb, line 472
472:     def delete_message_callback(ref)
473:       @messagecbs.delete(ref)
474:     end

Delete a Presence callback

ref:[String] The reference of the callback to delete

[Source]

     # File lib/xmpp4r/stream.rb, line 508
508:     def delete_presence_callback(ref)
509:       @presencecbs.delete(ref)
510:     end

Delete a Stanza callback

ref:[String] The reference of the callback to delete

[Source]

     # File lib/xmpp4r/stream.rb, line 490
490:     def delete_stanza_callback(ref)
491:       @stanzacbs.delete(ref)
492:     end

Delete an XML-messages callback

ref:[String] The reference of the callback to delete

[Source]

     # File lib/xmpp4r/stream.rb, line 454
454:     def delete_xml_callback(ref)
455:       @xmlcbs.delete(ref)
456:     end

Returns if this connection is connected to a Jabber service

return:[Boolean] Connection status

[Source]

     # File lib/xmpp4r/stream.rb, line 150
150:     def is_connected?
151:       return @status == CONNECTED
152:     end

Returns if this connection is NOT connected to a Jabber service

return:[Boolean] Connection status

[Source]

     # File lib/xmpp4r/stream.rb, line 158
158:     def is_disconnected?
159:       return @status == DISCONNECTED
160:     end

Mounts a block to handle exceptions if they occur during the poll send. This will likely be the first indication that the socket dropped in a Jabber Session.

The block has to take three arguments:

  • the Exception
  • the Jabber::Stream object (self)
  • a symbol where it happened, namely :start, :parser, :sending and :end

[Source]

     # File lib/xmpp4r/stream.rb, line 108
108:     def on_exception(&block)
109:       @exception_block = block
110:     end

This method is called by the parser when a failure occurs

[Source]

     # File lib/xmpp4r/stream.rb, line 114
114:     def parse_failure(e)
115:       Jabber::debuglog("EXCEPTION:\n#{e.class}\n#{e.message}\n#{e.backtrace.join("\n")}")
116: 
117:       # A new thread has to be created because close will cause the thread
118:       # to commit suicide(???)
119:       if @exception_block
120:         # New thread, because close will kill the current thread
121:         Thread.new {
122:           close
123:           @exception_block.call(e, self, :parser)
124:         }
125:       else
126:         puts "Stream#parse_failure was called by XML parser. Dumping " +
127:         "backtrace...\n" + e.exception + "\n"
128:         puts e.backtrace
129:         close
130:         raise
131:       end
132:     end

This method is called by the parser upon receiving </stream:stream>

[Source]

     # File lib/xmpp4r/stream.rb, line 136
136:     def parser_end
137:       if @exception_block
138:         Thread.new {
139:           close
140:           @exception_block.call(nil, self, :close)
141:         }
142:       else
143:         close
144:       end
145:     end

Starts a polling thread to send "keep alive" data to prevent the Jabber connection from closing for inactivity.

Currently not working!

[Source]

     # File lib/xmpp4r/stream.rb, line 424
424:     def poll
425:       sleep 10
426:       while true
427:         sleep 2
428: #        @pollCounter = @pollCounter - 1
429: #        if @pollCounter < 0
430: #          begin
431: #            send("  \t  ")
432: #          rescue
433: #            Thread.new {@exception_block.call if @exception_block}
434: #            break
435: #          end
436: #        end
437:       end
438:     end

Process |max| XML stanzas and call listeners for all of them.

max:[Integer] the number of stanzas to process (nil means process

all available)

[Source]

     # File lib/xmpp4r/stream.rb, line 281
281:     def process(max = nil)
282:       n = 0
283:       @stanzaqueue_lock.lock
284:       while @stanzaqueue.size > 0 and (max == nil or n < max)
285:         e = @stanzaqueue.shift
286:         @stanzaqueue_lock.unlock
287:         process_one(e)
288:         n += 1
289:         @stanzaqueue_lock.lock
290:       end
291:       @stanzaqueue_lock.unlock
292:       n
293:     end

Processes a received REXML::Element and executes registered thread blocks and filters against it.

If in threaded mode, a new thread will be spawned for the call to receive_nonthreaded.

element:[REXML::Element] The received element

[Source]

     # File lib/xmpp4r/stream.rb, line 169
169:     def receive(element)
170:       if @threaded
171:         # Don't spawn a new thread here. An implicit feature
172:         # of XMPP is constant order of stanzas.
173:         receive_nonthreaded(element)
174:       else
175:         receive_nonthreaded(element)
176:       end
177:     end

Sends XML data to the socket and (optionally) waits to process received data.

xml:[String] The xml data to send
&block:[Block] The optional block

[Source]

     # File lib/xmpp4r/stream.rb, line 357
357:     def send(xml, &block)
358:       Jabber::debuglog("SENDING:\n#{xml}")
359:       @threadblocks.unshift(ThreadBlock.new(block)) if block
360:       Thread.critical = true # we don't want to be interupted before we stop!
361:       begin
362:         @fd << xml.to_s
363:         @fd.flush
364:       rescue Exception => e
365:         Jabber::debuglog("EXCEPTION:\n#{e.class}\n#{e.message}\n#{e.backtrace.join("\n")}")
366: 
367:         if @exception_block 
368:           Thread.new { close!; @exception_block.call(e, self, :sending) }
369:         else
370:           puts "Exception caught while sending!"
371:           close!
372:           raise
373:         end
374:       end
375:       Thread.critical = false
376:       # The parser thread might be running this (think of a callback running send())
377:       # If this is the case, we mustn't stop (or we would cause a deadlock)
378:       Thread.stop if block and Thread.current != @parserThread
379:       @pollCounter = 10
380:     end

Send an XMMP stanza with an Jabber::XMLStanza#id. The id will be generated by Jabber::IdGenerator if not already set.

The block will be called once: when receiving a stanza with the same Jabber::XMLStanza#id. It must return true to complete this!

Be aware that if a stanza with type=’error’ is received the function does not yield but raises an ErrorException with the corresponding error element.

Please read the note about nesting at Stream#send

xml:[XMLStanza]

[Source]

     # File lib/xmpp4r/stream.rb, line 395
395:     def send_with_id(xml, &block)
396:       if xml.id.nil?
397:         xml.id = Jabber::IdGenerator.instance.generate_id
398:       end
399: 
400:       error = nil
401:       send(xml) do |received|
402:         if received.kind_of? XMLStanza and received.id == xml.id
403:           if received.type == :error
404:             error = (received.error ? received.error : Error.new)
405:             true
406:           else
407:             yield(received)
408:           end
409:         else
410:           false
411:         end
412:       end
413: 
414:       unless error.nil?
415:         raise ErrorException.new(error)
416:       end
417:     end

Start the XML parser on the fd

[Source]

    # File lib/xmpp4r/stream.rb, line 60
60:     def start(fd)
61:       @stream_mechanisms = []
62:       @stream_features = {}
63: 
64:       @fd = fd
65:       @parser = StreamParser.new(@fd, self)
66:       @parserThread = Thread.new do
67:         begin
68:           @parser.parse
69:         rescue Exception => e
70:           Jabber::debuglog("EXCEPTION:\n#{e.class}\n#{e.message}\n#{e.backtrace.join("\n")}")
71: 
72:           if @exception_block
73:             Thread.new { close; @exception_block.call(e, self, :start) }
74:           else
75:             puts "Exception caught in Parser thread!"
76:             close
77:             raise
78:           end
79:         end
80:       end
81: #      @pollThread = Thread.new do
82: #        begin
83: #        poll
84: #        rescue
85: #          puts "Exception caught in Poll thread, dumping backtrace and" +
86: #            " exiting...\n" + $!.exception + "\n"
87: #          puts $!.backtrace
88: #          exit
89: #        end
90: #      end
91:       @status = CONNECTED
92:     end

[Source]

    # File lib/xmpp4r/stream.rb, line 94
94:     def stop
95:       @parserThread.kill
96:       @parser = nil
97:     end

Process an XML stanza and call the listeners for it. If no stanza is currently available, wait for max |time| seconds before returning.

time:[Integer] time to wait in seconds. If nil, wait infinitely.

all available)

[Source]

     # File lib/xmpp4r/stream.rb, line 301
301:     def wait_and_process(time = nil)
302:       if time == 0 
303:         return process(1)
304:       end
305:       @stanzaqueue_lock.lock
306:       if @stanzaqueue.size > 0
307:         e = @stanzaqueue.shift
308:         @stanzaqueue_lock.unlock
309:         process_one(e)
310:         return 1
311:       end
312: 
313:       @waiting_thread = Thread.current
314:       @wakeup_thread = Thread.new { sleep time ; @waiting_thread.wakeup if @waiting_thread }
315:       @waiting_thread.stop
316:       @wakeup_thread.kill if @wakeup_thread
317:       @wakeup_thread = nil
318:       @waiting_thread = nil
319: 
320:       @stanzaqueue_lock.lock
321:       if @stanzaqueue.size > 0
322:         e = @stanzaqueue.shift
323:         @stanzaqueue_lock.unlock
324:         process_one(e)
325:         return 1
326:       end
327:       return 0
328:     end

Private Instance methods

Process |element| until it is consumed. Returns element.consumed? element The element to process

[Source]

     # File lib/xmpp4r/stream.rb, line 261
261:     def process_one(stanza)
262:       Jabber::debuglog("PROCESSING:\n#{stanza.to_s}")
263:       return true if @xmlcbs.process(stanza)
264:       return true if @stanzacbs.process(stanza)
265:       case stanza
266:       when Message
267:         return true if @messagecbs.process(stanza)
268:       when Iq
269:         return true if @iqcbs.process(stanza)
270:       when Presence
271:         return true if @presencecbs.process(stanza)
272:       end
273:     end

[Source]

     # File lib/xmpp4r/stream.rb, line 179
179:     def receive_nonthreaded(element)
180:       Jabber::debuglog("RECEIVED:\n#{element.to_s}")
181:       case element.prefix
182:       when 'stream'
183:         case element.name
184:           when 'stream'
185:             stanza = element
186:             @streamid = element.attributes['id']
187:             unless element.attributes['version']  # isn't XMPP compliant, so
188:               Jabber::debuglog("FEATURES: server not XMPP compliant, will not wait for features")
189:               @features_lock.unlock               # don't wait for <stream:features/>
190:             end
191:           when 'features'
192:             stanza = element
193:             element.each { |e|
194:               if e.name == 'mechanisms' and e.namespace == 'urn:ietf:params:xml:ns:xmpp-sasl'
195:                 e.each_element('mechanism') { |mech|
196:                   @stream_mechanisms.push(mech.text)
197:                 }
198:               else
199:                 @stream_features[e.name] = e.namespace
200:               end
201:             }
202:             Jabber::debuglog("FEATURES: received")
203:             @features_lock.unlock
204:           else
205:             stanza = element
206:         end
207:       else
208:         case element.name
209:           when 'message'
210:             stanza = Message::import(element)
211:           when 'iq'
212:             stanza = Iq::import(element)
213:           when 'presence'
214:             stanza = Presence::import(element)
215:           else
216:             stanza = element
217:         end
218:       end
219: 
220:       # Iterate through blocked threads (= waiting for an answer)
221:       #
222:       # We're dup'ping the @threadblocks here, so that we won't end up in an
223:       # endless loop if Stream#send is being nested. That means, the nested
224:       # threadblock won't receive the stanza currently processed, but the next
225:       # one.
226:       threadblocks = @threadblocks.dup
227:       threadblocks.each { |threadblock|
228:         exception = nil
229:         r = false
230:         begin
231:           r = threadblock.call(stanza)
232:         rescue Exception => e
233:           exception = e
234:         end
235: 
236:         if r == true
237:           @threadblocks.delete(threadblock)
238:           threadblock.wakeup
239:           return
240:         elsif exception
241:           @threadblocks.delete(threadblock)
242:           threadblock.raise(exception)
243:         end
244:       }
245: 
246:       if @threaded
247:         process_one(stanza)
248:       else
249:         # stanzaqueue will be read when the user call process
250:         @stanzaqueue_lock.lock
251:         @stanzaqueue.push(stanza)
252:         @stanzaqueue_lock.unlock
253:         @waiting_thread.wakeup if @waiting_thread
254:       end
255:     end

[Validate]