# -*- Mode: Python; tab-width: 4 -*- | |
# Id: asynchat.py,v 2.26 2000/09/07 22:29:26 rushing Exp | |
# Author: Sam Rushing <rushing@nightmare.com> | |
# ====================================================================== | |
# Copyright 1996 by Sam Rushing | |
# | |
# All Rights Reserved | |
# | |
# Permission to use, copy, modify, and distribute this software and | |
# its documentation for any purpose and without fee is hereby | |
# granted, provided that the above copyright notice appear in all | |
# copies and that both that copyright notice and this permission | |
# notice appear in supporting documentation, and that the name of Sam | |
# Rushing not be used in advertising or publicity pertaining to | |
# distribution of the software without specific, written prior | |
# permission. | |
# | |
# SAM RUSHING DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE, | |
# INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS, IN | |
# NO EVENT SHALL SAM RUSHING BE LIABLE FOR ANY SPECIAL, INDIRECT OR | |
# CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS | |
# OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, | |
# NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN | |
# CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. | |
# ====================================================================== | |
r"""A class supporting chat-style (command/response) protocols. | |
This class adds support for 'chat' style protocols - where one side | |
sends a 'command', and the other sends a response (examples would be | |
the common internet protocols - smtp, nntp, ftp, etc..). | |
The handle_read() method looks at the input stream for the current | |
'terminator' (usually '\r\n' for single-line responses, '\r\n.\r\n' | |
for multi-line output), calling self.found_terminator() on its | |
receipt. | |
for example: | |
Say you build an async nntp client using this class. At the start | |
of the connection, you'll have self.terminator set to '\r\n', in | |
order to process the single-line greeting. Just before issuing a | |
'LIST' command you'll set it to '\r\n.\r\n'. The output of the LIST | |
command will be accumulated (using your own 'collect_incoming_data' | |
method) up to the terminator, and then control will be returned to | |
you - by calling your self.found_terminator() method. | |
""" | |
import socket | |
import asyncore | |
from collections import deque | |
from sys import py3kwarning | |
from warnings import filterwarnings, catch_warnings | |
class async_chat (asyncore.dispatcher): | |
"""This is an abstract class. You must derive from this class, and add | |
the two methods collect_incoming_data() and found_terminator()""" | |
# these are overridable defaults | |
ac_in_buffer_size = 4096 | |
ac_out_buffer_size = 4096 | |
def __init__ (self, sock=None, map=None): | |
# for string terminator matching | |
self.ac_in_buffer = '' | |
# we use a list here rather than cStringIO for a few reasons... | |
# del lst[:] is faster than sio.truncate(0) | |
# lst = [] is faster than sio.truncate(0) | |
# cStringIO will be gaining unicode support in py3k, which | |
# will negatively affect the performance of bytes compared to | |
# a ''.join() equivalent | |
self.incoming = [] | |
# we toss the use of the "simple producer" and replace it with | |
# a pure deque, which the original fifo was a wrapping of | |
self.producer_fifo = deque() | |
asyncore.dispatcher.__init__ (self, sock, map) | |
def collect_incoming_data(self, data): | |
raise NotImplementedError("must be implemented in subclass") | |
def _collect_incoming_data(self, data): | |
self.incoming.append(data) | |
def _get_data(self): | |
d = ''.join(self.incoming) | |
del self.incoming[:] | |
return d | |
def found_terminator(self): | |
raise NotImplementedError("must be implemented in subclass") | |
def set_terminator (self, term): | |
"Set the input delimiter. Can be a fixed string of any length, an integer, or None" | |
self.terminator = term | |
def get_terminator (self): | |
return self.terminator | |
# grab some more data from the socket, | |
# throw it to the collector method, | |
# check for the terminator, | |
# if found, transition to the next state. | |
def handle_read (self): | |
try: | |
data = self.recv (self.ac_in_buffer_size) | |
except socket.error, why: | |
self.handle_error() | |
return | |
self.ac_in_buffer = self.ac_in_buffer + data | |
# Continue to search for self.terminator in self.ac_in_buffer, | |
# while calling self.collect_incoming_data. The while loop | |
# is necessary because we might read several data+terminator | |
# combos with a single recv(4096). | |
while self.ac_in_buffer: | |
lb = len(self.ac_in_buffer) | |
terminator = self.get_terminator() | |
if not terminator: | |
# no terminator, collect it all | |
self.collect_incoming_data (self.ac_in_buffer) | |
self.ac_in_buffer = '' | |
elif isinstance(terminator, int) or isinstance(terminator, long): | |
# numeric terminator | |
n = terminator | |
if lb < n: | |
self.collect_incoming_data (self.ac_in_buffer) | |
self.ac_in_buffer = '' | |
self.terminator = self.terminator - lb | |
else: | |
self.collect_incoming_data (self.ac_in_buffer[:n]) | |
self.ac_in_buffer = self.ac_in_buffer[n:] | |
self.terminator = 0 | |
self.found_terminator() | |
else: | |
# 3 cases: | |
# 1) end of buffer matches terminator exactly: | |
# collect data, transition | |
# 2) end of buffer matches some prefix: | |
# collect data to the prefix | |
# 3) end of buffer does not match any prefix: | |
# collect data | |
terminator_len = len(terminator) | |
index = self.ac_in_buffer.find(terminator) | |
if index != -1: | |
# we found the terminator | |
if index > 0: | |
# don't bother reporting the empty string (source of subtle bugs) | |
self.collect_incoming_data (self.ac_in_buffer[:index]) | |
self.ac_in_buffer = self.ac_in_buffer[index+terminator_len:] | |
# This does the Right Thing if the terminator is changed here. | |
self.found_terminator() | |
else: | |
# check for a prefix of the terminator | |
index = find_prefix_at_end (self.ac_in_buffer, terminator) | |
if index: | |
if index != lb: | |
# we found a prefix, collect up to the prefix | |
self.collect_incoming_data (self.ac_in_buffer[:-index]) | |
self.ac_in_buffer = self.ac_in_buffer[-index:] | |
break | |
else: | |
# no prefix, collect it all | |
self.collect_incoming_data (self.ac_in_buffer) | |
self.ac_in_buffer = '' | |
def handle_write (self): | |
self.initiate_send() | |
def handle_close (self): | |
self.close() | |
def push (self, data): | |
sabs = self.ac_out_buffer_size | |
if len(data) > sabs: | |
for i in xrange(0, len(data), sabs): | |
self.producer_fifo.append(data[i:i+sabs]) | |
else: | |
self.producer_fifo.append(data) | |
self.initiate_send() | |
def push_with_producer (self, producer): | |
self.producer_fifo.append(producer) | |
self.initiate_send() | |
def readable (self): | |
"predicate for inclusion in the readable for select()" | |
# cannot use the old predicate, it violates the claim of the | |
# set_terminator method. | |
# return (len(self.ac_in_buffer) <= self.ac_in_buffer_size) | |
return 1 | |
def writable (self): | |
"predicate for inclusion in the writable for select()" | |
return self.producer_fifo or (not self.connected) | |
def close_when_done (self): | |
"automatically close this channel once the outgoing queue is empty" | |
self.producer_fifo.append(None) | |
def initiate_send(self): | |
while self.producer_fifo and self.connected: | |
first = self.producer_fifo[0] | |
# handle empty string/buffer or None entry | |
if not first: | |
del self.producer_fifo[0] | |
if first is None: | |
self.handle_close() | |
return | |
# handle classic producer behavior | |
obs = self.ac_out_buffer_size | |
try: | |
with catch_warnings(): | |
if py3kwarning: | |
filterwarnings("ignore", ".*buffer", DeprecationWarning) | |
data = buffer(first, 0, obs) | |
except TypeError: | |
data = first.more() | |
if data: | |
self.producer_fifo.appendleft(data) | |
else: | |
del self.producer_fifo[0] | |
continue | |
# send the data | |
try: | |
num_sent = self.send(data) | |
except socket.error: | |
self.handle_error() | |
return | |
if num_sent: | |
if num_sent < len(data) or obs < len(first): | |
self.producer_fifo[0] = first[num_sent:] | |
else: | |
del self.producer_fifo[0] | |
# we tried to send some actual data | |
return | |
def discard_buffers (self): | |
# Emergencies only! | |
self.ac_in_buffer = '' | |
del self.incoming[:] | |
self.producer_fifo.clear() | |
class simple_producer: | |
def __init__ (self, data, buffer_size=512): | |
self.data = data | |
self.buffer_size = buffer_size | |
def more (self): | |
if len (self.data) > self.buffer_size: | |
result = self.data[:self.buffer_size] | |
self.data = self.data[self.buffer_size:] | |
return result | |
else: | |
result = self.data | |
self.data = '' | |
return result | |
class fifo: | |
def __init__ (self, list=None): | |
if not list: | |
self.list = deque() | |
else: | |
self.list = deque(list) | |
def __len__ (self): | |
return len(self.list) | |
def is_empty (self): | |
return not self.list | |
def first (self): | |
return self.list[0] | |
def push (self, data): | |
self.list.append(data) | |
def pop (self): | |
if self.list: | |
return (1, self.list.popleft()) | |
else: | |
return (0, None) | |
# Given 'haystack', see if any prefix of 'needle' is at its end. This | |
# assumes an exact match has already been checked. Return the number of | |
# characters matched. | |
# for example: | |
# f_p_a_e ("qwerty\r", "\r\n") => 1 | |
# f_p_a_e ("qwertydkjf", "\r\n") => 0 | |
# f_p_a_e ("qwerty\r\n", "\r\n") => <undefined> | |
# this could maybe be made faster with a computed regex? | |
# [answer: no; circa Python-2.0, Jan 2001] | |
# new python: 28961/s | |
# old python: 18307/s | |
# re: 12820/s | |
# regex: 14035/s | |
def find_prefix_at_end (haystack, needle): | |
l = len(needle) - 1 | |
while l and not haystack.endswith(needle[:l]): | |
l -= 1 | |
return l |