# -*- coding: utf-8 -*-
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from builtins import str
import logging
import requests
from airflow.hooks.base_hook import BaseHook
from airflow.exceptions import AirflowException
[docs]class HttpHook(BaseHook):
"""
Interact with HTTP servers.
"""
def __init__(self, method='POST', http_conn_id='http_default'):
self.http_conn_id = http_conn_id
self.method = method
# headers is required to make it required
[docs] def get_conn(self, headers):
"""
Returns http session for use with requests
"""
conn = self.get_connection(self.http_conn_id)
session = requests.Session()
self.base_url = conn.host
if not self.base_url.startswith('http'):
self.base_url = 'http://' + self.base_url
if conn.port:
self.base_url = self.base_url + ":" + str(conn.port) + "/"
if conn.login:
session.auth = (conn.login, conn.password)
if headers:
session.headers.update(headers)
return session
[docs] def run(self, endpoint, data=None, headers=None, extra_options=None):
"""
Performs the request
"""
extra_options = extra_options or {}
session = self.get_conn(headers)
url = self.base_url + endpoint
req = None
if self.method == 'GET':
# GET uses params
req = requests.Request(self.method,
url,
params=data,
headers=headers)
else:
# Others use data
req = requests.Request(self.method,
url,
data=data,
headers=headers)
prepped_request = session.prepare_request(req)
logging.info("Sending '" + self.method + "' to url: " + url)
return self.run_and_check(session, prepped_request, extra_options)
[docs] def run_and_check(self, session, prepped_request, extra_options):
"""
Grabs extra options like timeout and actually runs the request,
checking for the result
"""
extra_options = extra_options or {}
response = session.send(
prepped_request,
stream=extra_options.get("stream", False),
verify=extra_options.get("verify", False),
proxies=extra_options.get("proxies", {}),
cert=extra_options.get("cert"),
timeout=extra_options.get("timeout"),
allow_redirects=extra_options.get("allow_redirects", True))
try:
response.raise_for_status()
except requests.exceptions.HTTPError:
# Tried rewrapping, but not supported. This way, it's possible
# to get reason and code for failure by checking first 3 chars
# for the code, or do a split on ':'
logging.error("HTTP error: " + response.reason)
if self.method != 'GET':
# The sensor uses GET, so this prevents filling up the log
# with the body every time the GET 'misses'.
# That's ok to do, because GETs should be repeatable and
# all data should be visible in the log (no post data)
logging.error(response.text)
raise AirflowException(str(response.status_code)+":"+response.reason)
return response