Commit afcf213c by Anusha Ramineni

Add async executor

This commit adds the framework to support async execution
of functions. Will be used in syncing pooled resources
while podmanager creation.

Also, this adds periodic task support which is used to
periodically sync pooled resources after particular interval.

Partially-Implements blueprint add-device-orchestration
Change-Id: I48f2358e4a7662898796c82d6a47aa6d947495e5
parent 0104b783
......@@ -4,9 +4,11 @@
pbr!=2.1.0,>=2.0.0 # Apache-2.0
aniso8601==1.2.0
click==6.6
eventlet>=0.18.2,!=0.18.3,!=0.20.1,<0.21.0 # MIT
Flask!=0.11,<1.0,>=0.10 # BSD
Flask-Cors==3.0.2
Flask-RESTful>=0.3.5 # BSD
futurist>=1.2.0 # Apache-2.0
itsdangerous==0.24
jsonschema<3.0.0,>=2.6.0 # MIT
Jinja2!=2.9.0,!=2.9.1,!=2.9.2,!=2.9.3,!=2.9.4,>=2.8 # BSD License (3 clause)
......
......@@ -19,7 +19,11 @@ import flask_cors
import flask_restful
from six.moves import http_client
# Note: setup app needs to be called before the valence imports
# for config options initialization to take place.
from valence.api import app as flaskapp
app = flaskapp.get_app()
import valence.api.root as api_root
import valence.api.v1.devices as v1_devices
import valence.api.v1.flavors as v1_flavors
......
......@@ -15,13 +15,16 @@
import logging
import eventlet
eventlet.monkey_patch(os=False)
import gunicorn.app.base
from valence.api.route import app as application
from valence.common import async
import valence.conf
from valence.controller import pooled_devices
CONF = valence.conf.CONF
LOG = logging.getLogger(__name__)
......@@ -42,6 +45,24 @@ class StandaloneApplication(gunicorn.app.base.BaseApplication):
return self.application
def start_periodic_tasks(server):
"""Starts asynchronous periodic sync on app startup
If enabled in configuration this function will start periodic sync
of pooled resources in background.
"""
if CONF.podm.enable_periodic_sync:
async.start_periodic_worker([(
pooled_devices.PooledDevices.synchronize_devices, None, None)])
return
def on_server_exit(server):
"""Performs cleanup tasks. """
async.stop_periodic_tasks()
def main():
options = {
'bind': '%s:%s' % (CONF.api.bind_host, CONF.api.bind_port),
......@@ -50,6 +71,9 @@ def main():
'workers': CONF.api.workers,
'loglevel': CONF.api.log_level,
'errorlog': CONF.api.log_file,
'worker_class': 'eventlet',
'when_ready': start_periodic_tasks,
'on_exit': on_server_exit
}
LOG.info(("Valence Server on http://%(host)s:%(port)s"),
{'host': CONF.api.bind_host, 'port': CONF.api.bind_port})
......
# Copyright (c) 2017 NEC, Corp.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
import futurist
from valence.common import exception
LOG = logging.getLogger(__name__)
_executor = None
_periodics_worker = None
def executor():
global _executor
if not _executor:
_executor = futurist.GreenThreadPoolExecutor(max_workers=10)
return _executor
def start_periodic_worker(callables):
"""Starts periodic execution of function passed in callables
To enable this:
1. Pass callables in following format
[(func, (arg1, arg2), {}),
(func2, (arg1, arg2), {}),]
2. Decorate func as follow:
@periodics.periodic(spacing=2, enabled=True)
def func():
pass
:param callables: pass functions in this to execute periodically
:returns: Future object
"""
global _periodics_worker
_periodics_worker = futurist.periodics.PeriodicWorker(
callables=callables,
executor_factory=futurist.periodics.ExistingExecutor(executor()))
task_worker = executor().submit(_periodics_worker.start)
task_worker.add_done_callback(_handle_exceptions)
def stop_periodic_tasks():
"""Stops all periodic tasks and cleanup resources. """
global _periodics_worker
if _periodics_worker is not None:
try:
_periodics_worker.stop()
_periodics_worker.wait()
except Exception:
LOG.exception("Exception occurred when stopping periodic workers")
_periodics_worker = None
if _executor and _executor.alive:
_executor.shutdown(wait=True)
LOG.debug("periodic tasks stopped successfully")
def _spawn_worker(func, *args, **kwargs):
"""Creates a greenthread to run func(*args, **kwargs).
Spawns a greenthread if there are free slots in pool, otherwise raises
exception. Execution control returns immediately to the caller.
:returns: Future object.
:raises: NoFreeWorker if worker pool is currently full.
"""
try:
return executor().submit(func, *args, **kwargs)
except futurist.RejectedSubmission:
raise exception.ValenceException("No free worker available")
def async(func):
"""Start a job in new background thread.
To start a async job, decorate the function as follows:
Example:
@async.async
def test():
pass
"""
def wrapper(*args, **kwargs):
LOG.info("starting async thread for function %s", func.__name__)
future = _spawn_worker(func, *args, **kwargs)
future.add_done_callback(_handle_exceptions)
return wrapper
def _handle_exceptions(fut):
try:
fut.result()
except Exception:
msg = 'Unexpected exception in background thread'
LOG.exception(msg)
......@@ -42,6 +42,14 @@ podm_opts = [
default='/redfish/v1/',
help=_('The URL extension that specifies the '
'Redfish API version that valence will interact with')),
cfg.BoolOpt('enable_periodic_sync',
default=False,
help=_('To enable periodic task to automatically sync'
'resources of podmanager with DB.')),
cfg.IntOpt('sync_interval',
default=30,
help=_('Time interval(in seconds) after which devices will be'
'synced periodically.')),
]
......
......@@ -14,13 +14,16 @@
import logging
from valence.common import async
from valence.common import exception
from valence.common import utils
import valence.conf
from valence.controller import nodes
from valence.controller import pooled_devices
from valence.db import api as db_api
from valence.podmanagers import manager
CONF = valence.conf.CONF
LOG = logging.getLogger(__name__)
......@@ -63,8 +66,7 @@ def create_podmanager(values):
values['status'] = mng.podm.get_status()
podm = db_api.Connection.create_podmanager(values).as_dict()
# updates all devices corresponding to this podm in DB
# TODO(Akhil): Make this as asynchronous action
pooled_devices.PooledDevices.update_device_info(podm['uuid'])
update_podm_resources_to_db(podm['uuid'])
return podm
......@@ -83,3 +85,17 @@ def delete_podmanager(uuid):
nodes.Node(node['uuid']).delete_composed_node(node['uuid'])
return db_api.Connection.delete_podmanager(uuid)
@async.async
def update_podm_resources_to_db(podm_id):
"""Starts asynchronous one_time sync
As set in configuration this function will sync pooled resources
one time if background periodic sync is disabled.
:param podm_id: to asynchronously sync devices of particular podm
"""
if not CONF.podm.enable_periodic_sync:
pooled_devices.PooledDevices.synchronize_devices(podm_id)
return
......@@ -14,10 +14,14 @@
import logging
from futurist import periodics
from valence.common import exception
import valence.conf
from valence.db import api as db_api
from valence.podmanagers import manager
CONF = valence.conf.CONF
LOG = logging.getLogger(__name__)
......@@ -50,6 +54,8 @@ class PooledDevices(object):
return db_api.Connection.get_device_by_uuid(device_id).as_dict()
@classmethod
@periodics.periodic(spacing=CONF.podm.sync_interval, enabled=True,
run_immediately=True)
def synchronize_devices(cls, podm_id=None):
"""Sync devices connected to podmanager(s)
......
# Copyright (c) 2018 NEC, Corp.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import unittest
import futurist
import mock
from valence.common import async
from valence.common import exception
class AsyncTestCase(unittest.TestCase):
def setUp(self):
super(AsyncTestCase, self).setUp()
self.executor = mock.Mock(spec=futurist.GreenThreadPoolExecutor)
self.periodics = mock.Mock(spec=futurist.periodics.PeriodicWorker)
async._executor = self.executor
async._periodics_worker = self.periodics
def test__spawn_worker(self):
async._spawn_worker('fake', 1, foo='bar')
self.executor.submit.assert_called_once_with('fake', 1, foo='bar')
def test__spawn_worker_none_free(self):
self.executor.submit.side_effect = futurist.RejectedSubmission()
self.assertRaises(exception.ValenceException,
async._spawn_worker, 'fake')
def test_start_periodic_tasks(self):
fake_callable = mock.MagicMock()
async.start_periodic_worker([(fake_callable, None, None)])
self.executor.submit.assert_called_once_with(
async._periodics_worker.start)
def test_stop_periodic_tasks(self):
async.stop_periodic_tasks()
self.periodics.stop.assert_called()
self.periodics.wait.assert_called()
self.executor.shutdown.assert_called()
......@@ -17,6 +17,7 @@ import mock
from valence.common.exception import BadRequest
from valence.controller import podmanagers
from valence.podmanagers import podm_base
class TestPodManagers(unittest.TestCase):
......@@ -104,3 +105,29 @@ class TestPodManagers(unittest.TestCase):
podmanagers.update_podmanager('fake-podm-id', values)
mock_db_update.assert_called_once_with('fake-podm-id', result_values)
@mock.patch('valence.redfish.sushy.sushy_instance.RedfishInstance')
@mock.patch('valence.controller.podmanagers.update_podm_resources_to_db')
@mock.patch('valence.db.api.Connection.create_podmanager')
@mock.patch('valence.podmanagers.manager.Manager')
@mock.patch('valence.controller.podmanagers._check_creation')
def test_create_podmanager(self, mock_creation, mock_mng, mock_db_create,
mock_resource_update, mock_sushy):
values = {"name": "podm_name", "url": "https://10.240.212.123",
"driver": "redfishv1", "status": None,
"authentication": [{
"type": "basic",
"auth_items": {"username": "xxxxxxx",
"password": "xxxxxxx"}}]}
mock_creation.return_value = values
mock_mng.podm.return_value = podm_base.PodManagerBase(
'fake', 'fake-pass', 'http://fake-url')
podmanagers.create_podmanager('fake-values')
mock_db_create.assert_called_once_with(values)
mock_resource_update.assert_called()
@mock.patch('valence.common.async._spawn_worker')
def test_update_podm_resources_to_db(self, mock_worker):
mock_worker.return_value = mock.MagicMock()
podmanagers.update_podm_resources_to_db('fake-podm-id')
mock_worker.assert_called()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment