# -*- coding: utf-8 -*-
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import multiprocessing
import subprocess
import time
from builtins import range
from airflow import configuration
from airflow.executors.base_executor import BaseExecutor
from airflow.utils.state import State
from airflow.utils.logging import LoggingMixin
PARALLELISM = configuration.get('core', 'PARALLELISM')
class LocalWorker(multiprocessing.Process, LoggingMixin):
def __init__(self, task_queue, result_queue):
multiprocessing.Process.__init__(self)
self.task_queue = task_queue
self.result_queue = result_queue
self.daemon = True
def run(self):
while True:
key, command = self.task_queue.get()
if key is None:
# Received poison pill, no more tasks to run
self.task_queue.task_done()
break
self.logger.info("{} running {}".format(
self.__class__.__name__, command))
command = "exec bash -c '{0}'".format(command)
try:
subprocess.check_call(command, shell=True)
state = State.SUCCESS
except subprocess.CalledProcessError as e:
state = State.FAILED
self.logger.error("failed to execute task {}:".format(str(e)))
# raise e
self.result_queue.put((key, state))
self.task_queue.task_done()
time.sleep(1)
[docs]class LocalExecutor(BaseExecutor):
"""
LocalExecutor executes tasks locally in parallel. It uses the
multiprocessing Python library and queues to parallelize the execution
of tasks.
"""
def start(self):
self.queue = multiprocessing.JoinableQueue()
self.result_queue = multiprocessing.Queue()
self.workers = [
LocalWorker(self.queue, self.result_queue)
for i in range(self.parallelism)
]
for w in self.workers:
w.start()
def execute_async(self, key, command, queue=None):
self.queue.put((key, command))
def sync(self):
while not self.result_queue.empty():
results = self.result_queue.get()
self.change_state(*results)
def end(self):
# Sending poison pill to all worker
for _ in self.workers:
self.queue.put((None, None))
# Wait for commands to finish
self.queue.join()
self.sync()