Commit 51ac20f0 by Julien Danjou Committed by pastamaker[bot]

chef: merge refresh_metric and process_new_measures

This simplifies the code base by just allowing to process new measures sack per
sack with the existing process_new_measures_for_sack() or to use the new
refresh_metrics() who takes a list of metrics. Since the list of metric must be
known, there's no need to use list_metric_with_measures_to_process() anymore,
so it can be removed.

Since some of the tests (test_rest) need to process all sacks now, reduce the
number of sack to a small number in order for each test not to take 10s.
parent e424ea8e
......@@ -102,32 +102,31 @@ class Chef(object):
LOG.error("Unable to expunge metric %s from storage",
metric, exc_info=True)
def refresh_metric(self, metric, timeout):
s = self.incoming.sack_for_metric(metric.id)
lock = self.get_sack_lock(s)
if not lock.acquire(blocking=timeout):
raise SackAlreadyLocked(s)
try:
self.process_new_measures([str(metric.id)])
finally:
lock.release()
def process_new_measures(self, metrics_to_process, sync=False):
"""Process added measures in background.
def refresh_metrics(self, metrics, timeout=None, sync=False):
"""Process added measures in background for some metrics only.
Some drivers might need to have a background task running that process
the measures sent to metrics. This is used for that.
:param metrics: The list of `indexer.Metric` to refresh.
:param timeout: Time to wait for the process to happen.
:param sync: If an error occurs, raise, otherwise just log it.
"""
# process only active metrics. deleted metrics with unprocessed
# measures will be skipped until cleaned by janitor.
metrics = self.index.list_metrics(
attribute_filter={"in": {"id": metrics_to_process}})
metrics_by_id = {m.id: m for m in metrics}
# NOTE(gordc): must lock at sack level
try:
LOG.debug("Processing measures for %s", metrics)
with self.incoming.process_measure_for_metrics(
[m.id for m in metrics]) as metrics_and_measures:
metrics_to_refresh = sorted(
((metric, self.incoming.sack_for_metric(metric.id))
for metric in metrics),
key=ITEMGETTER_1)
for sack, metric_and_sack in itertools.groupby(
metrics_to_refresh, ITEMGETTER_1):
lock = self.get_sack_lock(sack)
# FIXME(jd) timeout should be global for all sack locking
if not lock.acquire(blocking=timeout):
raise SackAlreadyLocked(sack)
metrics = [m[0].id for m in metric_and_sack]
try:
LOG.debug("Processing measures for %d metrics", len(metrics))
with self.incoming.process_measure_for_metrics(
metrics) as metrics_and_measures:
self.storage.add_measures_to_metrics({
metrics_by_id[metric]: measures
for metric, measures
......@@ -135,24 +134,28 @@ class Chef(object):
})
LOG.debug("Measures for %d metrics processed",
len(metrics))
except Exception:
if sync:
raise
LOG.error("Error processing new measures", exc_info=True)
except Exception:
if sync:
raise
LOG.error("Error processing new measures", exc_info=True)
finally:
lock.release()
def process_new_measures_for_sack(self, sack, sync=False):
def process_new_measures_for_sack(self, sack, blocking=False, sync=False):
"""Process added measures in background.
Lock a sack and try to process measures from it. If the sack cannot be
locked, the method will raise `SackAlreadyLocked`.
:param sack: The sack to process new measures for.
:param blocking: Block to be sure the sack is processed or raise
`SackAlreadyLocked` otherwise.
:param sync: If True, raise any issue immediately otherwise just log it
:return: The number of metrics processed.
"""
lock = self.get_sack_lock(sack)
if not lock.acquire(blocking=False):
if not lock.acquire(blocking=blocking):
raise SackAlreadyLocked(sack)
LOG.debug("Processing measures for sack %s", sack)
try:
......
......@@ -200,10 +200,6 @@ class IncomingDriver(object):
raise exceptions.NotImplementedError
@staticmethod
def list_metric_with_measures_to_process(sack):
raise exceptions.NotImplementedError
@staticmethod
def delete_unprocessed_measures_for_metric(metric_id):
raise exceptions.NotImplementedError
......
......@@ -157,19 +157,6 @@ class CephStorage(incoming.IncomingDriver):
return dict(omaps)
def list_metric_with_measures_to_process(self, sack):
names = set()
marker = ""
while True:
obj_names = list(self._list_keys_to_process(
sack, marker=marker, limit=self.Q_LIMIT).keys())
names.update(name.split("_")[1] for name in obj_names)
if len(obj_names) < self.Q_LIMIT:
break
else:
marker = obj_names[-1]
return names
def delete_unprocessed_measures_for_metric(self, metric_id):
sack = self.sack_for_metric(metric_id)
key_prefix = self.MEASURE_PREFIX + "_" + str(metric_id)
......
......@@ -110,7 +110,7 @@ class FileStorage(incoming.IncomingDriver):
self._list_measures_container_for_metric_str(sack, metric))
for sack in self.iter_sacks():
for metric in self.list_metric_with_measures_to_process(sack):
for metric in set(self._list_target(self._sack_path(sack))):
build_metric_report(metric, sack)
return (report_vars['metrics'] or
len(report_vars['metric_details'].keys()),
......@@ -118,9 +118,6 @@ class FileStorage(incoming.IncomingDriver):
sum(report_vars['metric_details'].values()),
report_vars['metric_details'] if details else None)
def list_metric_with_measures_to_process(self, sack):
return set(self._list_target(self._sack_path(sack)))
def _list_measures_container_for_metric_str(self, sack, metric_id):
return self._list_target(self._measure_path(sack, metric_id))
......
......@@ -123,11 +123,6 @@ return results
return (metrics, report_vars['measures'],
report_vars['metric_details'] if details else None)
def list_metric_with_measures_to_process(self, sack):
match = redis.SEP.join([str(sack).encode(), b"*"])
keys = self._client.scan_iter(match=match, count=1000)
return set([k.split(redis.SEP)[1].decode("utf8") for k in keys])
def delete_unprocessed_measures_for_metric(self, metric_id):
self._client.delete(self._build_measure_path(metric_id))
......
......@@ -123,13 +123,6 @@ class S3Storage(incoming.IncomingDriver):
**kwargs)
yield response
def list_metric_with_measures_to_process(self, sack):
metrics = set()
for response in self._list_files((str(sack),), Delimiter="/"):
for p in response.get('CommonPrefixes', ()):
metrics.add(p['Prefix'].split('/', 2)[1])
return metrics
def _list_measure_files(self, path_items):
files = set()
for response in self._list_files(path_items):
......
......@@ -78,11 +78,6 @@ class SwiftStorage(incoming.IncomingDriver):
return (nb_metrics or len(metric_details), measures,
metric_details if details else None)
def list_metric_with_measures_to_process(self, sack):
headers, files = self.swift.get_container(
str(sack), delimiter='/', full_listing=True)
return set(f['subdir'][:-1] for f in files if 'subdir' in f)
def _list_measure_files_for_metric(self, sack, metric_id):
headers, files = self.swift.get_container(
str(sack), path=six.text_type(metric_id),
......
......@@ -529,8 +529,8 @@ class MetricController(rest.RestController):
if (strtobool("refresh", refresh) and
pecan.request.incoming.has_unprocessed(self.metric.id)):
try:
pecan.request.chef.refresh_metric(
self.metric,
pecan.request.chef.refresh_metrics(
[self.metric],
pecan.request.conf.api.operation_timeout)
except chef.SackAlreadyLocked:
abort(503, 'Unable to refresh metric: %s. Metric is locked. '
......@@ -1902,8 +1902,8 @@ class AggregationController(rest.RestController):
if pecan.request.incoming.has_unprocessed(m.id)]
for m in metrics_to_update:
try:
pecan.request.chef.refresh_metric(
m, pecan.request.conf.api.operation_timeout)
pecan.request.chef.refresh_metrics(
[m], pecan.request.conf.api.operation_timeout)
except chef.SackAlreadyLocked:
abort(503, 'Unable to refresh metric: %s. '
'Metric is locked. '
......
......@@ -366,7 +366,7 @@ class TestCase(BaseTestCase):
)
self.storage.upgrade()
self.incoming.upgrade(128)
self.incoming.upgrade(3)
self.chef = chef.Chef(
self.coord, self.incoming, self.index, self.storage)
......@@ -390,6 +390,7 @@ class TestCase(BaseTestCase):
def trigger_processing(self, metrics=None):
if metrics is None:
self.chef.process_new_measures_for_sack(
self.incoming.sack_for_metric(self.metric.id), sync=True)
self.incoming.sack_for_metric(self.metric.id),
blocking=True, sync=True)
else:
self.chef.process_new_measures(metrics, sync=True)
self.chef.refresh_metrics(metrics, timeout=True, sync=True)
......@@ -257,11 +257,8 @@ class MetricdThread(threading.Thread):
def run(self):
while self.flag:
metrics = utils.list_all_incoming_metrics(self.chef.incoming)
metrics = self.chef.index.list_metrics(
attribute_filter={"in": {"id": metrics}})
for metric in metrics:
self.chef.refresh_metric(metric, timeout=None)
for sack in self.chef.incoming.iter_sacks():
self.chef.process_new_measures_for_sack(sack, blocking=True)
time.sleep(0.1)
def stop(self):
......
......@@ -1031,7 +1031,7 @@ class CrossMetricAggregated(base.TestCase):
incoming.Measure(datetime64(2014, 1, 1, 12, 10, 31), 4),
incoming.Measure(datetime64(2014, 1, 1, 12, 13, 10), 4),
])
self.trigger_processing([str(self.metric.id), str(metric2.id)])
self.trigger_processing([self.metric, metric2])
values = processor.get_measures(
self.storage,
......@@ -1190,7 +1190,7 @@ class CrossMetricAggregated(base.TestCase):
incoming.Measure(datetime64(2014, 1, 1, 12, 9, 31), 6),
incoming.Measure(datetime64(2014, 1, 1, 12, 13, 10), 2),
])
self.trigger_processing([str(self.metric.id), str(metric2.id)])
self.trigger_processing([self.metric, metric2])
values = processor.get_measures(
self.storage,
......@@ -1228,7 +1228,7 @@ class CrossMetricAggregated(base.TestCase):
incoming.Measure(datetime64(2014, 1, 1, 14, 2, 31), 4),
incoming.Measure(datetime64(2014, 1, 1, 15, 3, 10), 4),
])
self.trigger_processing([str(self.metric.id), str(metric2.id)])
self.trigger_processing([self.metric, metric2])
values = processor.get_measures(
self.storage,
......@@ -1265,7 +1265,7 @@ class CrossMetricAggregated(base.TestCase):
incoming.Measure(datetime64(2014, 1, 1, 14, 2, 31), 4),
incoming.Measure(datetime64(2014, 1, 1, 15, 3, 10), 4),
])
self.trigger_processing([str(self.metric.id), str(metric2.id)])
self.trigger_processing([self.metric, metric2])
values = processor.get_measures(
self.storage,
......@@ -1302,7 +1302,7 @@ class CrossMetricAggregated(base.TestCase):
incoming.Measure(datetime64(2014, 1, 1, 14, 2, 31), 4),
incoming.Measure(datetime64(2014, 1, 1, 15, 3, 10), 4),
])
self.trigger_processing([str(self.metric.id), str(metric2.id)])
self.trigger_processing([self.metric, metric2])
values = processor.get_measures(
self.storage,
......@@ -1341,7 +1341,7 @@ class CrossMetricAggregated(base.TestCase):
incoming.Measure(datetime64(2014, 1, 1, 12, 10, 31), 4),
incoming.Measure(datetime64(2014, 1, 1, 12, 15, 10), 4),
])
self.trigger_processing([str(self.metric.id), str(metric2.id)])
self.trigger_processing([self.metric, metric2])
values = processor.get_measures(
self.storage,
......@@ -1385,7 +1385,7 @@ class CrossMetricAggregated(base.TestCase):
incoming.Measure(datetime64(2014, 1, 1, 14, 2, 31), 4),
incoming.Measure(datetime64(2014, 1, 1, 15, 3, 10), 4),
])
self.trigger_processing([str(self.metric.id), str(metric2.id)])
self.trigger_processing([self.metric, metric2])
values = processor.get_measures(
self.storage,
......@@ -1414,7 +1414,7 @@ class CrossMetricAggregated(base.TestCase):
incoming.Measure(datetime64(2014, 1, 1, 14, 2, 31), 4),
incoming.Measure(datetime64(2014, 1, 1, 15, 3, 45), 44),
])
self.trigger_processing([str(self.metric.id)])
self.trigger_processing()
values = processor.get_measures(
self.storage, [processor.MetricReference(self.metric, "mean")],
......@@ -1441,7 +1441,7 @@ class CrossMetricAggregated(base.TestCase):
incoming.Measure(datetime64(2014, 1, 1, 14, 2, 31), 4),
incoming.Measure(datetime64(2014, 1, 1, 15, 3, 45), 44),
])
self.trigger_processing([str(self.metric.id)])
self.trigger_processing()
values = processor.get_measures(
self.storage, [processor.MetricReference(self.metric, "mean")],
......@@ -1473,7 +1473,7 @@ class CrossMetricAggregated(base.TestCase):
incoming.Measure(datetime64(2014, 1, 1, 14, 2, 31), 4),
incoming.Measure(datetime64(2014, 1, 1, 15, 3, 10), 4),
])
self.trigger_processing([str(self.metric.id), str(metric2.id)])
self.trigger_processing([self.metric, metric2])
values = processor.get_measures(
self.storage,
......@@ -1511,7 +1511,7 @@ class CrossMetricAggregated(base.TestCase):
incoming.Measure(datetime64(2014, 1, 1, 14, 2, 31), 4),
incoming.Measure(datetime64(2014, 1, 1, 15, 3, 10), 4),
])
self.trigger_processing([str(self.metric.id), str(metric2.id)])
self.trigger_processing([self.metric, metric2])
values = processor.get_measures(
self.storage,
......@@ -1557,7 +1557,7 @@ class CrossMetricAggregated(base.TestCase):
incoming.Measure(datetime64(2014, 1, 1, 14, 2, 31), 4),
incoming.Measure(datetime64(2014, 1, 1, 15, 3, 10), -4),
])
self.trigger_processing([str(self.metric.id), str(metric2.id)])
self.trigger_processing([self.metric, metric2])
values = processor.get_measures(
self.storage,
......
......@@ -37,7 +37,6 @@ from gnocchi import archive_policy
from gnocchi.rest import api
from gnocchi.rest import app
from gnocchi.tests import base as tests_base
from gnocchi.tests import utils as tests_utils
from gnocchi import utils
......@@ -127,8 +126,9 @@ class TestingApp(webtest.TestApp):
elif self.auth_mode == "remoteuser":
req.remote_user = self.user
response = super(TestingApp, self).do_request(req, *args, **kwargs)
metrics = tests_utils.list_all_incoming_metrics(self.chef.incoming)
self.chef.process_new_measures(metrics, sync=True)
for sack in self.chef.incoming.iter_sacks():
self.chef.process_new_measures_for_sack(
sack, blocking=True, sync=True)
return response
......
......@@ -73,7 +73,7 @@ class TestStatsd(tests_base.TestCase):
metric = r.get_metric(metric_key)
self.chef.process_new_measures([str(metric.id)], sync=True)
self.chef.refresh_metrics([metric], sync=True)
measures = self.storage.get_measures(metric, self.aggregations)
self.assertEqual({"mean": [
......@@ -92,7 +92,7 @@ class TestStatsd(tests_base.TestCase):
("127.0.0.1", 12345))
self.stats.flush()
self.chef.process_new_measures([str(metric.id)], sync=True)
self.chef.refresh_metrics([metric], sync=True)
measures = self.storage.get_measures(metric, self.aggregations)
self.assertEqual({"mean": [
......@@ -124,7 +124,7 @@ class TestStatsd(tests_base.TestCase):
metric = r.get_metric(metric_key)
self.assertIsNotNone(metric)
self.chef.process_new_measures([str(metric.id)], sync=True)
self.chef.refresh_metrics([metric], sync=True)
measures = self.storage.get_measures(metric, self.aggregations)
self.assertEqual({"mean": [
......@@ -142,7 +142,7 @@ class TestStatsd(tests_base.TestCase):
("127.0.0.1", 12345))
self.stats.flush()
self.chef.process_new_measures([str(metric.id)], sync=True)
self.chef.refresh_metrics([metric], sync=True)
measures = self.storage.get_measures(metric, self.aggregations)
self.assertEqual({"mean": [
......
......@@ -31,7 +31,6 @@ from gnocchi.storage import redis
from gnocchi.storage import s3
from gnocchi.storage import swift
from gnocchi.tests import base as tests_base
from gnocchi.tests import utils as tests_utils
def datetime64(*args):
......@@ -167,23 +166,6 @@ class TestStorageDriver(tests_base.TestCase):
self.assertIn((datetime64(2014, 1, 1, 12),
numpy.timedelta64(5, 'm'), 5.0), m)
def test_list_metric_with_measures_to_process(self):
metrics = tests_utils.list_all_incoming_metrics(self.incoming)
self.assertEqual(set(), metrics)
self.incoming.add_measures(self.metric.id, [
incoming.Measure(datetime64(2014, 1, 1, 12, 0, 1), 69),
])
m2, __ = self._create_metric('medium')
self.incoming.add_measures(m2.id, [
incoming.Measure(datetime64(2014, 1, 1, 12, 0, 1), 69),
])
metrics = tests_utils.list_all_incoming_metrics(self.incoming)
m_list = [str(self.metric.id), str(m2.id)]
self.assertEqual(set(m_list), metrics)
self.trigger_processing(m_list)
metrics = tests_utils.list_all_incoming_metrics(self.incoming)
self.assertEqual(set([]), metrics)
def test_delete_nonempty_metric(self):
self.incoming.add_measures(self.metric.id, [
incoming.Measure(datetime64(2014, 1, 1, 12, 0, 1), 69),
......@@ -242,7 +224,7 @@ class TestStorageDriver(tests_base.TestCase):
self.incoming.add_measures(self.metric.id, [
incoming.Measure(datetime64(2014, 1, 1, 12, i, j), 100)
for i in six.moves.range(0, 60) for j in six.moves.range(0, 60)])
self.trigger_processing([str(self.metric.id)])
self.trigger_processing([self.metric])
aggregations = self.metric.archive_policy.aggregations
......@@ -258,7 +240,7 @@ class TestStorageDriver(tests_base.TestCase):
self.incoming.add_measures(m.id, [
incoming.Measure(datetime64(2014, 1, 1, 12, i, j), 100)
for i in six.moves.range(0, 60) for j in six.moves.range(0, 60)])
self.trigger_processing([str(m.id)])
self.trigger_processing([m])
aggregations = (
m.archive_policy.get_aggregations_for_method("mean")
......@@ -274,7 +256,7 @@ class TestStorageDriver(tests_base.TestCase):
incoming.Measure(datetime64(2014, 1, 6, i, j, 0), 100)
for i in six.moves.range(2) for j in six.moves.range(0, 60, 2)]
self.incoming.add_measures(m.id, measures)
self.trigger_processing([str(m.id)])
self.trigger_processing([m])
# add measure to end, in same aggregate time as last point.
self.incoming.add_measures(m.id, [
......@@ -282,13 +264,13 @@ class TestStorageDriver(tests_base.TestCase):
with mock.patch.object(self.storage, '_store_metric_splits') as c:
# should only resample last aggregate
self.trigger_processing([str(m.id)])
self.trigger_processing([m])
count = 0
for call in c.mock_calls:
# policy is 60 points and split is 48. should only update 2nd half
args = call[1]
for metric, key_agg_data_offset in six.iteritems(args[0]):
if metric == m_sql:
if metric.id == m_sql.id:
for key, aggregation, data, offset in key_agg_data_offset:
if (key.sampling == numpy.timedelta64(1, 'm')
and aggregation.method == "mean"):
......@@ -301,14 +283,14 @@ class TestStorageDriver(tests_base.TestCase):
incoming.Measure(datetime64(2014, 1, 6, i, j, 0), 100)
for i in six.moves.range(2) for j in six.moves.range(0, 60, 2)]
self.incoming.add_measures(m.id, measures)
self.trigger_processing([str(m.id)])
self.trigger_processing([m])
# add measure to end, in same aggregate time as last point.
new_point = datetime64(2014, 1, 6, 1, 58, 1)
self.incoming.add_measures(m.id, [incoming.Measure(new_point, 100)])
with mock.patch.object(self.incoming, 'add_measures') as c:
self.trigger_processing([str(m.id)])
self.trigger_processing([m])
for __, args, __ in c.mock_calls:
self.assertEqual(
list(args[3])[0][0], carbonara.round_timestamp(
......@@ -1062,7 +1044,7 @@ class TestStorageDriver(tests_base.TestCase):
incoming.Measure(datetime64(2014, 1, 1, 12, 9, 31), 6),
incoming.Measure(datetime64(2014, 1, 1, 12, 13, 10), 2),
])
self.trigger_processing([str(self.metric.id), str(metric2.id)])
self.trigger_processing([self.metric, metric2])
self.assertEqual(
[
......@@ -1117,7 +1099,7 @@ class TestStorageDriver(tests_base.TestCase):
incoming.Measure(datetime64(2014, 1, 1, 12, 0, 5), 1),
incoming.Measure(datetime64(2014, 1, 1, 12, 0, 10), 1),
])
self.trigger_processing([str(m.id)])
self.trigger_processing([m])
aggregation = m.archive_policy.get_aggregation(
"mean", numpy.timedelta64(5, 's'))
......@@ -1134,7 +1116,7 @@ class TestStorageDriver(tests_base.TestCase):
self.incoming.add_measures(m.id, [
incoming.Measure(datetime64(2014, 1, 1, 12, 0, 15), 1),
])
self.trigger_processing([str(m.id)])
self.trigger_processing([m])
self.assertEqual({"mean": [
(datetime64(2014, 1, 1, 12, 0, 5), numpy.timedelta64(5, 's'), 1),
(datetime64(2014, 1, 1, 12, 0, 10), numpy.timedelta64(5, 's'), 1),
......
......@@ -17,11 +17,6 @@ from oslo_policy import opts as policy_opts
from gnocchi import opts
def list_all_incoming_metrics(incoming):
return set.union(*[incoming.list_metric_with_measures_to_process(sack)
for sack in incoming.iter_sacks()])
def prepare_conf():
conf = cfg.ConfigOpts()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment