A Discrete-Event Network Simulator
API
ipython_view.py
Go to the documentation of this file.
1 """
2 Backend to the console plugin.
3 
4 @author: Eitan Isaacson
5 @organization: IBM Corporation
6 @copyright: Copyright (c) 2007 IBM Corporation
7 @license: BSD
8 
9 All rights reserved. This program and the accompanying materials are made
10 available under the terms of the BSD which accompanies this distribution, and
11 is available at U{http://www.opensource.org/licenses/bsd-license.php}
12 """
13 # this file is a modified version of source code from the Accerciser project
14 # http://live.gnome.org/accerciser
15 
16 import gtk, gobject
17 import re
18 import sys
19 import os
20 from gi.repository import Pango
21 from StringIO import StringIO
22 import IPython
23 
24 from pkg_resources import parse_version
25 
26 try:
27  import IPython
28 except ImportError:
29 
31  IPython = None
32 
33 
35 
74  def __init__(self,argv=None,user_ns=None,user_global_ns=None,
75  cin=None, cout=None,cerr=None, input_func=None):
76  """! Initializer
77 
78  @param self: this object
79  @param argv: Command line options for IPython
80  @param user_ns: User namespace.
81  @param user_global_ns: User global namespace.
82  @param cin: Console standard input.
83  @param cout: Console standard output.
84  @param cerr: Console standard error.
85  @param input_func: Replacement for builtin raw_input()
86  @return none
87  """
88  io = IPython.utils.io
89  if input_func:
90  if parse_version(IPython.release.version) >= parse_version("1.2.1"):
91  IPython.terminal.interactiveshell.raw_input_original = input_func
92  else:
93  IPython.frontend.terminal.interactiveshell.raw_input_original = input_func
94  if cin:
95  io.stdin = io.IOStream(cin)
96  if cout:
97  io.stdout = io.IOStream(cout)
98  if cerr:
99  io.stderr = io.IOStream(cerr)
100 
101  # This is to get rid of the blockage that occurs during
102  # IPython.Shell.InteractiveShell.user_setup()
103 
104  io.raw_input = lambda x: None
105 
106  os.environ['TERM'] = 'dumb'
107  excepthook = sys.excepthook
108 
109  from IPython.config.loader import Config
110  cfg = Config()
111  cfg.InteractiveShell.colors = "Linux"
112 
113  # InteractiveShell's __init__ overwrites io.stdout,io.stderr with
114  # sys.stdout, sys.stderr, this makes sure they are right
115  #
116  old_stdout, old_stderr = sys.stdout, sys.stderr
117  sys.stdout, sys.stderr = io.stdout.stream, io.stderr.stream
118 
119  # InteractiveShell inherits from SingletonConfigurable, so use instance()
120  #
121  if parse_version(IPython.release.version) >= parse_version("1.2.1"):
122  self.IP = IPython.terminal.embed.InteractiveShellEmbed.instance(\
123  config=cfg, user_ns=user_ns)
124  else:
125  self.IP = IPython.frontend.terminal.embed.InteractiveShellEmbed.instance(\
126  config=cfg, user_ns=user_ns)
127 
128  sys.stdout, sys.stderr = old_stdout, old_stderr
129 
130  self.IP.system = lambda cmd: self.shell(self.IP.var_expand(cmd),
131  header='IPython system call: ')
132 # local_ns=user_ns)
133  #global_ns=user_global_ns)
134  #verbose=self.IP.rc.system_verbose)
135 
136  self.IP.raw_input = input_func
137  sys.excepthook = excepthook
138  self.iter_more = 0
139  self.history_level = 0
140  self.complete_sep = re.compile('[\s\{\}\[\]\(\)]')
141  self.updateNamespace({'exit':lambda:None})
142  self.updateNamespace({'quit':lambda:None})
143  self.IP.readline_startup_hook(self.IP.pre_readline)
144  # Workaround for updating namespace with sys.modules
145  #
146  self.__update_namespace()
147 
149  """!
150  Update self.IP namespace for autocompletion with sys.modules
151  """
152  for k, v in list(sys.modules.items()):
153  if not '.' in k:
154  self.IP.user_ns.update({k:v})
155 
156  def execute(self):
157  """!
158  Executes the current line provided by the shell object.
159  """
160  self.history_level = 0
161  orig_stdout = sys.stdout
162  sys.stdout = IPython.utils.io.stdout
163 
164  orig_stdin = sys.stdin
165  sys.stdin = IPython.utils.io.stdin;
166  self.prompt = self.generatePrompt(self.iter_more)
167 
168  self.IP.hooks.pre_prompt_hook()
169  if self.iter_more:
170  try:
171  self.prompt = self.generatePrompt(True)
172  except:
173  self.IP.showtraceback()
174  if self.IP.autoindent:
175  self.IP.rl_do_indent = True
176 
177  try:
178  line = self.IP.raw_input(self.prompt)
179  except KeyboardInterrupt:
180  self.IP.write('\nKeyboardInterrupt\n')
181  self.IP.input_splitter.reset()
182  except:
183  self.IP.showtraceback()
184  else:
185  self.IP.input_splitter.push(line)
186  self.iter_more = self.IP.input_splitter.push_accepts_more()
187  self.prompt = self.generatePrompt(self.iter_more)
188  if (self.IP.SyntaxTB.last_syntax_error and
189  self.IP.autoedit_syntax):
190  self.IP.edit_syntax_error()
191  if not self.iter_more:
192  if parse_version(IPython.release.version) >= parse_version("2.0.0-dev"):
193  source_raw = self.IP.input_splitter.raw_reset()
194  else:
195  source_raw = self.IP.input_splitter.source_raw_reset()[1]
196  self.IP.run_cell(source_raw, store_history=True)
197  self.IP.rl_do_indent = False
198  else:
199  # TODO: Auto-indent
200  #
201  self.IP.rl_do_indent = True
202  pass
203 
204  sys.stdout = orig_stdout
205  sys.stdin = orig_stdin
206 
207  def generatePrompt(self, is_continuation):
208  """!
209  Generate prompt depending on is_continuation value
210 
211  @param is_continuation
212  @return: The prompt string representation
213 
214  """
215 
216  # Backwards compatibility with ipyton-0.11
217  #
218  ver = IPython.__version__
219  if '0.11' in ver:
220  prompt = self.IP.hooks.generate_prompt(is_continuation)
221  else:
222  if is_continuation:
223  prompt = self.IP.prompt_manager.render('in2')
224  else:
225  prompt = self.IP.prompt_manager.render('in')
226 
227  return prompt
228 
229 
230  def historyBack(self):
231  """!
232  Provides one history command back.
233 
234  @param self this object
235  @return: The command string.
236  """
237  self.history_level -= 1
238  if not self._getHistory():
239  self.history_level +=1
240  return self._getHistory()
241 
242  def historyForward(self):
243  """!
244  Provides one history command forward.
245 
246  @param self this object
247  @return: The command string.
248  """
249  if self.history_level < 0:
250  self.history_level += 1
251  return self._getHistory()
252 
253  def _getHistory(self):
254  """!
255  Gets the command string of the current history level.
256 
257  @param self this object
258  @return: Historic command string.
259  """
260  try:
261  rv = self.IP.user_ns['In'][self.history_level].strip('\n')
262  except IndexError:
263  rv = ''
264  return rv
265 
266  def updateNamespace(self, ns_dict):
267  """!
268  Add the current dictionary to the shell namespace.
269 
270  @param ns_dict: A dictionary of symbol-values.
271  @return none
272  """
273  self.IP.user_ns.update(ns_dict)
274 
275  def complete(self, line):
276  """!
277  Returns an auto completed line and/or possibilities for completion.
278 
279  @param line: Given line so far.
280  @return: Line completed as for as possible, and possible further completions.
281  """
282  split_line = self.complete_sep.split(line)
283  if split_line[-1]:
284  possibilities = self.IP.complete(split_line[-1])
285  else:
286  completed = line
287  possibilities = ['', []]
288  if possibilities:
289  def _commonPrefix(str1, str2):
290  """!
291  Reduction function. returns common prefix of two given strings.
292 
293  @param str1: First string.
294  @param str2: Second string
295  @return: Common prefix to both strings.
296  """
297  for i in range(len(str1)):
298  if not str2.startswith(str1[:i+1]):
299  return str1[:i]
300  return str1
301  if possibilities[1]:
302  common_prefix = reduce(_commonPrefix, possibilities[1]) or line[-1]
303  completed = line[:-len(split_line[-1])]+common_prefix
304  else:
305  completed = line
306  else:
307  completed = line
308  return completed, possibilities[1]
309 
310 
311  def shell(self, cmd,verbose=0,debug=0,header=''):
312  """!
313  Replacement method to allow shell commands without them blocking.
314 
315  @param cmd: Shell command to execute.
316  @param verbose: Verbosity
317  @param debug: Debug level
318  @param header: Header to be printed before output
319  @return none
320  """
321  stat = 0
322  if verbose or debug: print header+cmd
323  # flush stdout so we don't mangle python's buffering
324  if not debug:
325  input, output = os.popen4(cmd)
326  print output.read()
327  output.close()
328  input.close()
329 
330 
331 class ConsoleView(Gtk.TextView):
332 
342  """
343  Specialized text view for console-like workflow.
344 
345  @cvar ANSI_COLORS: Mapping of terminal colors to X11 names.
346  @type ANSI_COLORS: dictionary
347 
348  @ivar text_buffer: Widget's text buffer.
349  @type text_buffer: Gtk.TextBuffer
350  @ivar color_pat: Regex of terminal color pattern
351  @type color_pat: _sre.SRE_Pattern
352  @ivar mark: Scroll mark for automatic scrolling on input.
353  @type mark: Gtk.TextMark
354  @ivar line_start: Start of command line mark.
355  @type line_start: Gtk.TextMark
356  """
357  ANSI_COLORS = {'0;30': 'Black', '0;31': 'Red',
358  '0;32': 'Green', '0;33': 'Brown',
359  '0;34': 'Blue', '0;35': 'Purple',
360  '0;36': 'Cyan', '0;37': 'LightGray',
361  '1;30': 'DarkGray', '1;31': 'DarkRed',
362  '1;32': 'SeaGreen', '1;33': 'Yellow',
363  '1;34': 'LightBlue', '1;35': 'MediumPurple',
364  '1;36': 'LightCyan', '1;37': 'White'}
365 
366  def __init__(self):
367  """
368  Initialize console view.
369  """
370  GObject.GObject.__init__(self)
371  self.modify_font(Pango.FontDescription('Mono'))
372  self.set_cursor_visible(True)
373  self.text_buffer = self.get_buffer()
374  self.mark = self.text_buffer.create_mark('scroll_mark',
375  self.text_buffer.get_end_iter(),
376  False)
377  for code in self.ANSI_COLORS:
378  self.text_buffer.create_tag(code,
379  foreground=self.ANSI_COLORS[code],
380  weight=700)
381  self.text_buffer.create_tag('0')
382  self.text_buffer.create_tag('notouch', editable=False)
383  self.color_pat = re.compile('\x01?\x1b\[(.*?)m\x02?')
384  self.line_start = \
385  self.text_buffer.create_mark('line_start',
386  self.text_buffer.get_end_iter(), True)
387  self.connect('key-press-event', self.onKeyPress)
388 
389  def write(self, text, editable=False):
390  """!
391  Write given text to buffer.
392 
393  @param text: Text to append.
394  @param editable: If true, added text is editable.
395  @return none
396  """
397  GObject.idle_add(self._write, text, editable)
398 
399  def _write(self, text, editable=False):
400  """!
401  Write given text to buffer.
402 
403  @param text: Text to append.
404  @param editable: If true, added text is editable.
405  @return none
406  """
407  segments = self.color_pat.split(text)
408  segment = segments.pop(0)
409  start_mark = self.text_buffer.create_mark(None,
410  self.text_buffer.get_end_iter(),
411  True)
412  self.text_buffer.insert(self.text_buffer.get_end_iter(), segment)
413 
414  if segments:
415  ansi_tags = self.color_pat.findall(text)
416  for tag in ansi_tags:
417  i = segments.index(tag)
418  self.text_buffer.insert_with_tags_by_name(self.text_buffer.get_end_iter(),
419  segments[i+1], str(tag))
420  segments.pop(i)
421  if not editable:
422  self.text_buffer.apply_tag_by_name('notouch',
423  self.text_buffer.get_iter_at_mark(start_mark),
424  self.text_buffer.get_end_iter())
425  self.text_buffer.delete_mark(start_mark)
426  self.scroll_mark_onscreen(self.mark)
427 
428  def showPrompt(self, prompt):
429  """!
430  Prints prompt at start of line.
431 
432  @param prompt: Prompt to print.
433  @return none
434  """
435  GObject.idle_add(self._showPrompt, prompt)
436 
437  def _showPrompt(self, prompt):
438  """!
439  Prints prompt at start of line.
440 
441  @param prompt: Prompt to print.
442  @return none
443  """
444  self._write(prompt)
445  self.text_buffer.move_mark(self.line_start,
446  self.text_buffer.get_end_iter())
447 
448  def changeLine(self, text):
449  """!
450  Replace currently entered command line with given text.
451 
452  @param text: Text to use as replacement.
453  @return none
454  """
455  GObject.idle_add(self._changeLine, text)
456 
457  def _changeLine(self, text):
458  """!
459  Replace currently entered command line with given text.
460 
461  @param text: Text to use as replacement.
462  @return none
463  """
464  iter = self.text_buffer.get_iter_at_mark(self.line_start)
465  iter.forward_to_line_end()
466  self.text_buffer.delete(self.text_buffer.get_iter_at_mark(self.line_start), iter)
467  self._write(text, True)
468 
469  def getCurrentLine(self):
470  """!
471  Get text in current command line.
472 
473  @return Text of current command line.
474  """
475  rv = self.text_buffer.get_slice(
476  self.text_buffer.get_iter_at_mark(self.line_start),
477  self.text_buffer.get_end_iter(), False)
478  return rv
479 
480  def showReturned(self, text):
481  """!
482  Show returned text from last command and print new prompt.
483 
484  @param text: Text to show.
485  @return none
486  """
487  GObject.idle_add(self._showReturned, text)
488 
489  def _showReturned(self, text):
490  """!
491  Show returned text from last command and print new prompt.
492 
493  @param text: Text to show.
494  @return none
495  """
496  iter = self.text_buffer.get_iter_at_mark(self.line_start)
497  iter.forward_to_line_end()
498  self.text_buffer.apply_tag_by_name(
499  'notouch',
500  self.text_buffer.get_iter_at_mark(self.line_start),
501  iter)
502  self._write('\n'+text)
503  if text:
504  self._write('\n')
505  self._showPrompt(self.prompt)
506  self.text_buffer.move_mark(self.line_start,self.text_buffer.get_end_iter())
507  self.text_buffer.place_cursor(self.text_buffer.get_end_iter())
508 
509  if self.IP.rl_do_indent:
510  indentation = self.IP.input_splitter.indent_spaces * ' '
511  self.text_buffer.insert_at_cursor(indentation)
512 
513  def onKeyPress(self, widget, event):
514  """!
515  Key press callback used for correcting behavior for console-like
516  interfaces. For example 'home' should go to prompt, not to beginning of
517  line.
518 
519  @param widget: Widget that key press accored in.
520  @param event: Event object
521  @return Return True if event should not trickle.
522  """
523  insert_mark = self.text_buffer.get_insert()
524  insert_iter = self.text_buffer.get_iter_at_mark(insert_mark)
525  selection_mark = self.text_buffer.get_selection_bound()
526  selection_iter = self.text_buffer.get_iter_at_mark(selection_mark)
527  start_iter = self.text_buffer.get_iter_at_mark(self.line_start)
528  if event.keyval == Gdk.KEY_Home:
529  if event.get_state() & Gdk.ModifierType.CONTROL_MASK or event.get_state() & Gdk.ModifierType.MOD1_MASK:
530  pass
531  elif event.get_state() & Gdk.ModifierType.SHIFT_MASK:
532  self.text_buffer.move_mark(insert_mark, start_iter)
533  return True
534  else:
535  self.text_buffer.place_cursor(start_iter)
536  return True
537  elif event.keyval == Gdk.KEY_Left:
538  insert_iter.backward_cursor_position()
539  if not insert_iter.editable(True):
540  return True
541  elif not event.string:
542  pass
543  elif start_iter.compare(insert_iter) <= 0 and \
544  start_iter.compare(selection_iter) <= 0:
545  pass
546  elif start_iter.compare(insert_iter) > 0 and \
547  start_iter.compare(selection_iter) > 0:
548  self.text_buffer.place_cursor(start_iter)
549  elif insert_iter.compare(selection_iter) < 0:
550  self.text_buffer.move_mark(insert_mark, start_iter)
551  elif insert_iter.compare(selection_iter) > 0:
552  self.text_buffer.move_mark(selection_mark, start_iter)
553 
554  return self.onKeyPressExtend(event)
555 
556  def onKeyPressExtend(self, event):
557  """!
558  For some reason we can't extend onKeyPress directly (bug #500900).
559  @param event key press
560  @return none
561  """
562  pass
563 
564 
565 class IPythonView(ConsoleView, IterableIPShell):
566 
580  """
581  Sub-class of both modified IPython shell and L{ConsoleView} this makes
582  a GTK+ IPython console.
583  """
584  def __init__(self):
585  """
586  Initialize. Redirect I/O to console.
587  """
588  ConsoleView.__init__(self)
589  self.cout = StringIO()
590  IterableIPShell.__init__(self, cout=self.cout,cerr=self.cout,
591  input_func=self.raw_input)
592  self.interrupt = False
593  self.execute()
594  self.prompt = self.generatePrompt(False)
595  self.cout.truncate(0)
596  self.showPrompt(self.prompt)
597 
598  def raw_input(self, prompt=''):
599  """!
600  Custom raw_input() replacement. Gets current line from console buffer.
601 
602  @param prompt: Prompt to print. Here for compatibility as replacement.
603  @return The current command line text.
604  """
605  if self.interrupt:
606  self.interrupt = False
607  raise KeyboardInterrupt
608  return self.getCurrentLine()
609 
610  def onKeyPressExtend(self, event):
611  """!
612  Key press callback with plenty of shell goodness, like history,
613  autocompletions, etc.
614 
615  @param event: Event object.
616  @return True if event should not trickle.
617  """
618 
619  if event.get_state() & Gdk.ModifierType.CONTROL_MASK and event.keyval == 99:
620  self.interrupt = True
621  self._processLine()
622  return True
623  elif event.keyval == Gdk.KEY_Return:
624  self._processLine()
625  return True
626  elif event.keyval == Gdk.KEY_Up:
627  self.changeLine(self.historyBack())
628  return True
629  elif event.keyval == Gdk.KEY_Down:
630  self.changeLine(self.historyForward())
631  return True
632  elif event.keyval == Gdk.KEY_Tab:
633  if not self.getCurrentLine().strip():
634  return False
635  completed, possibilities = self.complete(self.getCurrentLine())
636  if len(possibilities) > 1:
637  slice = self.getCurrentLine()
638  self.write('\n')
639  for symbol in possibilities:
640  self.write(symbol+'\n')
641  self.showPrompt(self.prompt)
642  self.changeLine(completed or slice)
643  return True
644 
645  def _processLine(self):
646  """!
647  Process current command line.
648  @return none
649  """
650  self.history_pos = 0
651  self.execute()
652  rv = self.cout.getvalue()
653  if rv: rv = rv.strip('\n')
654  self.showReturned(rv)
655  self.cout.truncate(0)
656  self.cout.seek(0)
657 
658 if __name__ == "__main__":
659  window = Gtk.Window()
660  window.set_default_size(640, 320)
661  window.connect('delete-event', lambda x, y: Gtk.main_quit())
662  window.add(IPythonView())
663  window.show_all()
664  Gtk.main()
665 
def onKeyPressExtend(self, event)
For some reason we can&#39;t extend onKeyPress directly (bug #500900).
def onKeyPressExtend(self, event)
Key press callback with plenty of shell goodness, like history, autocompletions, etc.
def __init__(self, argv=None, user_ns=None, user_global_ns=None, cin=None, cout=None, cerr=None, input_func=None)
Initializer.
Definition: ipython_view.py:75
def raw_input(self, prompt='')
Custom raw_input() replacement.
def getCurrentLine(self)
Get text in current command line.
def _processLine(self)
Process current command line.
def execute(self)
Executes the current line provided by the shell object.
def _getHistory(self)
Gets the command string of the current history level.
def changeLine(self, text)
Replace currently entered command line with given text.
#define list
def _showReturned(self, text)
Show returned text from last command and print new prompt.
def _write(self, text, editable=False)
Write given text to buffer.
#define delete
def onKeyPress(self, widget, event)
Key press callback used for correcting behavior for console-like interfaces.
dictionary ANSI_COLORS
color list
def shell(self, cmd, verbose=0, debug=0, header='')
Replacement method to allow shell commands without them blocking.
def __update_namespace(self)
Update self.IP namespace for autocompletion with sys.modules.
def historyBack(self)
Provides one history command back.
def write(self, text, editable=False)
Write given text to buffer.
def generatePrompt(self, is_continuation)
Generate prompt depending on is_continuation value.
def _changeLine(self, text)
Replace currently entered command line with given text.
def historyForward(self)
Provides one history command forward.
def updateNamespace(self, ns_dict)
Add the current dictionary to the shell namespace.
def _showPrompt(self, prompt)
Prints prompt at start of line.
def showPrompt(self, prompt)
Prints prompt at start of line.
def complete(self, line)
Returns an auto completed line and/or possibilities for completion.
def showReturned(self, text)
Show returned text from last command and print new prompt.