# -*- coding: utf-8 -*-
# Copyright 2012-2015 Spotify AB
# Ported to Airflow by Bolke de Bruin
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.
# This is a port of Luigi's ssh implementation. All credits go there.
import subprocess
from contextlib import contextmanager
from airflow.hooks.base_hook import BaseHook
from airflow.exceptions import AirflowException
import logging
[docs]class SSHHook(BaseHook):
Light-weight remote execution library and utilities.
Using this hook (which is just a convenience wrapper for subprocess),
is created to let you stream data from a remotely stored file.
As a bonus, :class:`SSHHook` also provides a really cool feature that let's you
set up ssh tunnels super easily using a python context manager (there is an example
in the integration part of unittests).
:param key_file: Typically the SSHHook uses the keys that are used by the user
airflow is running under. This sets the behavior to use another file instead.
:type key_file: str
:param connect_timeout: sets the connection timeout for this connection.
:type connect_timeout: int
:param no_host_key_check: whether to check to host key. If True host keys will not
be checked, but are also not stored in the current users's known_hosts file.
:type no_host_key_check: bool
:param tty: allocate a tty.
:type tty: bool
:param sshpass: Use to non-interactively perform password authentication by using
:type sshpass: bool
def __init__(self, conn_id='ssh_default'):
conn = self.get_connection(conn_id)
self.key_file = conn.extra_dejson.get('key_file', None)
self.connect_timeout = conn.extra_dejson.get('connect_timeout', None)
self.tcp_keepalive = conn.extra_dejson.get('tcp_keepalive', False)
self.server_alive_interval = conn.extra_dejson.get('server_alive_interval', 60)
self.no_host_key_check = conn.extra_dejson.get('no_host_key_check', False)
self.tty = conn.extra_dejson.get('tty', False)
self.sshpass = conn.extra_dejson.get('sshpass', False)
self.conn = conn
def get_conn(self):
def _host_ref(self):
if self.conn.login:
return "{0}@{1}".format(self.conn.login, self.conn.host)
return self.conn.host
def _prepare_command(self, cmd):
connection_cmd = ["ssh", self._host_ref(), "-o", "ControlMaster=no"]
if self.sshpass:
connection_cmd = ["sshpass", "-e"] + connection_cmd
connection_cmd += ["-o", "BatchMode=yes"] # no password prompts
if self.conn.port:
connection_cmd += ["-p", str(self.conn.port)]
if self.connect_timeout:
connection_cmd += ["-o", "ConnectionTimeout={}".format(self.connect_timeout)]
if self.tcp_keepalive:
connection_cmd += ["-o", "TCPKeepAlive=yes"]
connection_cmd += ["-o", "ServerAliveInterval={}".format(self.server_alive_interval)]
if self.no_host_key_check:
connection_cmd += ["-o", "UserKnownHostsFile=/dev/null",
"-o", "StrictHostKeyChecking=no"]
if self.key_file:
connection_cmd += ["-i", self.key_file]
if self.tty:
connection_cmd += ["-t"]
connection_cmd += cmd
logging.debug("SSH cmd: {} ".format(connection_cmd))
return connection_cmd
[docs] def Popen(self, cmd, **kwargs):
Remote Popen
:param cmd: command to remotely execute
:param kwargs: extra arguments to Popen (see subprocess.Popen)
:return: handle to subprocess
prefixed_cmd = self._prepare_command(cmd)
return subprocess.Popen(prefixed_cmd, **kwargs)
[docs] def check_output(self, cmd):
Executes a remote command and returns the stdout a remote process.
Simplified version of Popen when you only want the output as a string and detect any errors.
:param cmd: command to remotely execute
:return: stdout
p = self.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
output, stderr = p.communicate()
if p.returncode != 0:
# I like this better: RemoteCalledProcessError(p.returncode, cmd, self.host, output=output)
raise AirflowException("Cannot execute {} on {}. Error code is: {}. Output: {}, Stderr: {}".format(
cmd, self.conn.host, p.returncode, output, stderr))
return output
[docs] @contextmanager
def tunnel(self, local_port, remote_port=None, remote_host="localhost"):
Creates a tunnel between two hosts. Like ssh -L <LOCAL_PORT>:host:<REMOTE_PORT>.
Remember to close() the returned "tunnel" object in order to clean up
after yourself when you are done with the tunnel.
:param local_port:
:type local_port: int
:param remote_port:
:type remote_port: int
:param remote_host:
:type remote_host: str
tunnel_host = "{0}:{1}:{2}".format(local_port, remote_host, remote_port)
proc = self.Popen(["-L", tunnel_host, "echo -n ready && cat"],
stdin=subprocess.PIPE, stdout=subprocess.PIPE)
ready = proc.stdout.read(5)
assert ready == b"ready", "Did not get 'ready' from remote"
assert proc.returncode == 0, "Tunnel process did unclean exit (returncode {}".format(proc.returncode)