blob: 65d43aa35ec5af95e02e9c2558967fba7aafd6dc [file] [log] [blame]
#include <stdint.h>
#include <atomic>
#include <chrono>
#include <csignal>
#include <functional>
#include <ostream>
#include <string>
#include <thread>
#include "absl/flags/flag.h"
#include "absl/flags/reflection.h"
#include "absl/log/check.h"
#include "absl/log/log.h"
#include "absl/strings/str_format.h"
#include "flatbuffers/string.h"
#include "gtest/gtest.h"
#include "aos/configuration.h"
#include "aos/events/event_loop.h"
#include "aos/events/ping_generated.h"
#include "aos/events/shm_event_loop.h"
#include "aos/flatbuffers.h"
#include "aos/ipc_lib/event.h"
#include "aos/ipc_lib/shm_base.h"
#include "aos/json_to_flatbuffer.h"
#include "aos/network/team_number.h"
#include "aos/starter/starter_generated.h"
#include "aos/starter/starter_rpc_generated.h"
#include "aos/starter/starter_rpc_lib.h"
#include "aos/starter/starterd_lib.h"
#include "aos/testing/path.h"
#include "aos/util/file.h"
#include "aos/util/process_info_generated.h"
using aos::testing::ArtifactPath;
namespace aos::starter {
class ThreadedStarterRunner {
public:
ThreadedStarterRunner(Starter *starter)
: my_thread_([this, starter]() {
starter->event_loop()->OnRun([this]() { event_.Set(); });
starter->Run();
}) {
event_.Wait();
}
~ThreadedStarterRunner() { my_thread_.join(); }
private:
aos::Event event_;
std::thread my_thread_;
};
class StarterdTest : public ::testing::Test {
public:
StarterdTest() {
// Nuke the shm dir:
aos::util::UnlinkRecursive(absl::GetFlag(FLAGS_shm_base));
}
protected:
void SetupStarterCleanup(aos::starter::Starter *starter) {
starter->event_loop()
->AddTimer([this, starter]() {
if (test_done_) {
starter->Cleanup();
}
})
->Schedule(starter->event_loop()->monotonic_now(),
std::chrono::milliseconds(100));
}
absl::FlagSaver flag_saver_;
// Used to track when the test completes so that we can clean up the starter
// in its thread.
std::atomic<bool> test_done_{false};
};
struct TestParams {
std::string config;
std::string hostname;
};
class StarterdConfigParamTest
: public StarterdTest,
public ::testing::WithParamInterface<TestParams> {};
TEST_P(StarterdConfigParamTest, MultiNodeStartStopTest) {
absl::FlagSaver flag_saver;
absl::SetFlag(&FLAGS_override_hostname, GetParam().hostname);
const std::string config_file = ArtifactPath(GetParam().config);
aos::FlatbufferDetachedBuffer<aos::Configuration> config =
aos::configuration::ReadConfig(config_file);
auto new_config = aos::configuration::MergeWithConfig(
&config.message(),
absl::StrFormat(
R"({"applications": [
{
"name": "ping",
"executable_name": "%s",
"nodes": ["pi1"],
"args": ["--shm_base", "%s", "--config", "%s", "--override_hostname", "%s"]
},
{
"name": "pong",
"executable_name": "%s",
"nodes": ["pi1"],
"args": ["--shm_base", "%s", "--config", "%s", "--override_hostname", "%s"]
}
]})",
ArtifactPath("aos/events/ping"), absl::GetFlag(FLAGS_shm_base),
config_file, GetParam().hostname, ArtifactPath("aos/events/pong"),
absl::GetFlag(FLAGS_shm_base), config_file, GetParam().hostname));
const aos::Configuration *config_msg = &new_config.message();
// Set up starter with config file
aos::starter::Starter starter(config_msg);
// Create an event loop to watch for ping messages, verifying it actually
// started.
aos::ShmEventLoop watcher_loop(config_msg);
watcher_loop.SkipAosLog();
aos::ShmEventLoop client_loop(config_msg);
client_loop.SkipAosLog();
StarterClient client(&client_loop);
client.SetTimeoutHandler(
[]() { FAIL() << ": Command should not have timed out."; });
bool success = false;
client.SetSuccessHandler([&success, &client_loop]() {
client_loop.Exit();
success = true;
});
watcher_loop
.AddTimer([&watcher_loop] {
watcher_loop.Exit();
FAIL();
})
->Schedule(watcher_loop.monotonic_now() + std::chrono::seconds(7));
std::atomic<int> test_stage = 0;
// Watch on the client loop since we need to interact with the StarterClient.
client_loop.MakeWatcher("/test", [&test_stage, &client,
&client_loop](const aos::examples::Ping &) {
switch (test_stage) {
case 1: {
test_stage = 2;
break;
}
case 2: {
{
client.SendCommands({{Command::STOP, "ping", {client_loop.node()}}},
std::chrono::seconds(3));
}
test_stage = 3;
break;
}
}
});
watcher_loop.MakeWatcher(
"/aos", [&test_stage, &watcher_loop](const aos::starter::Status &status) {
const aos::starter::ApplicationStatus *app_status =
FindApplicationStatus(status, "ping");
if (app_status == nullptr) {
return;
}
switch (test_stage) {
case 0: {
if (app_status->has_state() &&
app_status->state() == aos::starter::State::RUNNING) {
test_stage = 1;
}
break;
}
case 3: {
if (app_status->has_state() &&
app_status->state() == aos::starter::State::STOPPED) {
watcher_loop.Exit();
SUCCEED();
}
break;
}
}
});
SetupStarterCleanup(&starter);
ThreadedStarterRunner starterd_thread(&starter);
aos::Event event;
client_loop.OnRun([&event]() { event.Set(); });
std::thread client_thread([&client_loop] { client_loop.Run(); });
event.Wait();
watcher_loop.Run();
test_done_ = true;
client_thread.join();
ASSERT_TRUE(success);
}
INSTANTIATE_TEST_SUITE_P(
StarterdConfigParamTest, StarterdConfigParamTest,
::testing::Values(TestParams{"aos/events/pingpong_config.json", ""},
TestParams{"aos/starter/multinode_pingpong_config.json",
"pi1"}));
TEST_F(StarterdTest, DeathTest) {
const std::string config_file =
ArtifactPath("aos/events/pingpong_config.json");
aos::FlatbufferDetachedBuffer<aos::Configuration> config =
aos::configuration::ReadConfig(config_file);
auto new_config = aos::configuration::MergeWithConfig(
&config.message(),
absl::StrFormat(
R"({"applications": [
{
"name": "ping",
"executable_name": "%s",
"args": ["--shm_base", "%s"]
},
{
"name": "pong",
"executable_name": "%s",
"args": ["--shm_base", "%s"]
}
]})",
ArtifactPath("aos/events/ping"), absl::GetFlag(FLAGS_shm_base),
ArtifactPath("aos/events/pong"), absl::GetFlag(FLAGS_shm_base)));
const aos::Configuration *config_msg = &new_config.message();
// Set up starter with config file
aos::starter::Starter starter(config_msg);
// Create an event loop to watch for ping messages, verifying it actually
// started.
aos::ShmEventLoop watcher_loop(config_msg);
watcher_loop.SkipAosLog();
watcher_loop
.AddTimer([&watcher_loop] {
watcher_loop.Exit();
FAIL();
})
->Schedule(watcher_loop.monotonic_now() + std::chrono::seconds(11));
int test_stage = 0;
uint64_t id;
watcher_loop.MakeWatcher("/aos", [&test_stage, &watcher_loop,
&id](const aos::starter::Status &status) {
const aos::starter::ApplicationStatus *app_status =
FindApplicationStatus(status, "ping");
if (app_status == nullptr) {
return;
}
switch (test_stage) {
case 0: {
if (app_status->has_state() &&
app_status->state() == aos::starter::State::RUNNING) {
LOG(INFO) << "Ping is running";
test_stage = 1;
ASSERT_TRUE(app_status->has_pid());
ASSERT_TRUE(kill(app_status->pid(), SIGINT) != -1);
ASSERT_TRUE(app_status->has_id());
id = app_status->id();
}
break;
}
case 1: {
if (app_status->has_state() &&
app_status->state() == aos::starter::State::RUNNING &&
app_status->has_id() && app_status->id() != id) {
LOG(INFO) << "Ping restarted";
watcher_loop.Exit();
SUCCEED();
}
break;
}
}
});
SetupStarterCleanup(&starter);
ThreadedStarterRunner starterd_thread(&starter);
watcher_loop.Run();
test_done_ = true;
}
TEST_F(StarterdTest, Autostart) {
const std::string config_file =
ArtifactPath("aos/events/pingpong_config.json");
aos::FlatbufferDetachedBuffer<aos::Configuration> config =
aos::configuration::ReadConfig(config_file);
auto new_config = aos::configuration::MergeWithConfig(
&config.message(),
absl::StrFormat(
R"({"applications": [
{
"name": "ping",
"executable_name": "%s",
"args": ["--shm_base", "%s"],
"autostart": false
},
{
"name": "pong",
"executable_name": "%s",
"args": ["--shm_base", "%s"]
}
]})",
ArtifactPath("aos/events/ping"), absl::GetFlag(FLAGS_shm_base),
ArtifactPath("aos/events/pong"), absl::GetFlag(FLAGS_shm_base)));
const aos::Configuration *config_msg = &new_config.message();
// Set up starter with config file
aos::starter::Starter starter(config_msg);
// Create an event loop to watch for the application starting up.
aos::ShmEventLoop watcher_loop(config_msg);
watcher_loop.SkipAosLog();
watcher_loop
.AddTimer([&watcher_loop] {
watcher_loop.Exit();
FAIL();
})
->Schedule(watcher_loop.monotonic_now() + std::chrono::seconds(7));
int pong_running_count = 0;
watcher_loop.MakeWatcher("/aos", [&watcher_loop, &pong_running_count](
const aos::starter::Status &status) {
const aos::starter::ApplicationStatus *ping_app_status =
FindApplicationStatus(status, "ping");
const aos::starter::ApplicationStatus *pong_app_status =
FindApplicationStatus(status, "pong");
if (ping_app_status == nullptr || pong_app_status == nullptr) {
return;
}
if (ping_app_status->has_state() &&
ping_app_status->state() != aos::starter::State::STOPPED) {
watcher_loop.Exit();
FAIL();
}
if (pong_app_status->has_state() &&
pong_app_status->state() == aos::starter::State::RUNNING) {
++pong_running_count;
// Sometimes if the timing for everything is *just* off, then the
// process_info will say that the process name is "starter_test" because
// it grabbed the name after the fork() but before the execvp(). To
// protect against that, wait an extra cycle. If things aren't fixed by
// the second cycle, then that is a problem.
if (pong_running_count < 3) {
return;
}
ASSERT_TRUE(pong_app_status->has_process_info());
ASSERT_EQ("pong", pong_app_status->process_info()->name()->string_view())
<< aos::FlatbufferToJson(&status);
ASSERT_EQ(pong_app_status->pid(), pong_app_status->process_info()->pid());
ASSERT_TRUE(pong_app_status->process_info()->has_cpu_usage());
ASSERT_LE(0.0, pong_app_status->process_info()->cpu_usage());
ASSERT_TRUE(pong_app_status->has_has_active_timing_report());
ASSERT_TRUE(pong_app_status->has_active_timing_report());
watcher_loop.Exit();
SUCCEED();
}
});
SetupStarterCleanup(&starter);
ThreadedStarterRunner starterd_thread(&starter);
watcher_loop.Run();
test_done_ = true;
}
// Tests that starterd respects autorestart.
TEST_F(StarterdTest, DeathNoRestartTest) {
const std::string config_file =
ArtifactPath("aos/events/pingpong_config.json");
aos::FlatbufferDetachedBuffer<aos::Configuration> config =
aos::configuration::ReadConfig(config_file);
auto new_config = aos::configuration::MergeWithConfig(
&config.message(),
absl::StrFormat(
R"({"applications": [
{
"name": "ping",
"executable_name": "%s",
"args": ["--shm_base", "%s"],
"autorestart": false
},
{
"name": "pong",
"executable_name": "%s",
"args": ["--shm_base", "%s"]
}
]})",
ArtifactPath("aos/events/ping"), absl::GetFlag(FLAGS_shm_base),
ArtifactPath("aos/events/pong"), absl::GetFlag(FLAGS_shm_base)));
const aos::Configuration *config_msg = &new_config.message();
// Set up starter with config file
aos::starter::Starter starter(config_msg);
// Create an event loop to watch for the Status message to watch the state
// transitions.
aos::ShmEventLoop watcher_loop(config_msg);
watcher_loop.SkipAosLog();
watcher_loop
.AddTimer([&watcher_loop] {
watcher_loop.Exit();
SUCCEED();
})
->Schedule(watcher_loop.monotonic_now() + std::chrono::seconds(11));
int test_stage = 0;
uint64_t id;
watcher_loop.MakeWatcher("/aos", [&test_stage, &watcher_loop,
&id](const aos::starter::Status &status) {
const aos::starter::ApplicationStatus *app_status =
FindApplicationStatus(status, "ping");
if (app_status == nullptr) {
return;
}
switch (test_stage) {
case 0: {
if (app_status->has_state() &&
app_status->state() == aos::starter::State::RUNNING) {
LOG(INFO) << "Ping is running";
test_stage = 1;
ASSERT_TRUE(app_status->has_pid());
ASSERT_TRUE(kill(app_status->pid(), SIGINT) != -1);
ASSERT_TRUE(app_status->has_id());
id = app_status->id();
}
break;
}
case 1: {
if (app_status->has_state() &&
app_status->state() == aos::starter::State::RUNNING &&
app_status->has_id() && app_status->id() != id) {
LOG(INFO) << "Ping restarted, it shouldn't...";
watcher_loop.Exit();
FAIL();
}
break;
}
}
});
SetupStarterCleanup(&starter);
ThreadedStarterRunner starterd_thread(&starter);
watcher_loop.Run();
test_done_ = true;
}
TEST_F(StarterdTest, StarterChainTest) {
// This test was written in response to a bug that was found
// in StarterClient::Succeed. The bug caused the timeout handler
// to be reset after the success handler was called.
// the bug has been fixed, and this test will ensure it does
// not regress.
const std::string config_file =
ArtifactPath("aos/events/pingpong_config.json");
aos::FlatbufferDetachedBuffer<aos::Configuration> config =
aos::configuration::ReadConfig(config_file);
auto new_config = aos::configuration::MergeWithConfig(
&config.message(),
absl::StrFormat(
R"({"applications": [
{
"name": "ping",
"executable_name": "%s",
"args": ["--shm_base", "%s"],
"autorestart": false
},
{
"name": "pong",
"executable_name": "%s",
"args": ["--shm_base", "%s"]
}
]})",
ArtifactPath("aos/events/ping"), absl::GetFlag(FLAGS_shm_base),
ArtifactPath("aos/events/pong"), absl::GetFlag(FLAGS_shm_base)));
const aos::Configuration *config_msg = &new_config.message();
// Set up starter with config file
aos::starter::Starter starter(config_msg);
aos::ShmEventLoop client_loop(config_msg);
client_loop.SkipAosLog();
StarterClient client(&client_loop);
bool success = false;
auto client_node = client_loop.node();
// limit the amount of time we will wait for the test to finish.
client_loop
.AddTimer([&client_loop] {
client_loop.Exit();
FAIL() << "ERROR: The test has failed, the watcher has timed out. "
"The chain of stages defined below did not complete "
"within the time limit.";
})
->Schedule(client_loop.monotonic_now() + std::chrono::seconds(20));
// variables have been defined, here we define the body of the test.
// We want stage1 to succeed, triggering stage2.
// We want stage2 to timeout, triggering stage3.
auto stage3 = [&client_loop, &success]() {
LOG(INFO) << "Begin stage3.";
SUCCEED();
success = true;
client_loop.Exit();
LOG(INFO) << "End stage3.";
};
auto stage2 = [this, &starter, &client, &client_node, &stage3] {
LOG(INFO) << "Begin stage2";
test_done_ = true; // trigger `starter` to exit.
// wait for the starter event loop to close, so we can
// intentionally trigger a timeout.
int attempts = 0;
while (starter.event_loop()->is_running()) {
++attempts;
if (attempts > 5) {
LOG(INFO) << "Timeout while waiting for starter to exit";
return;
}
LOG(INFO) << "Waiting for starter to close.";
std::this_thread::sleep_for(std::chrono::seconds(1));
}
client.SetTimeoutHandler(std::ref(stage3));
client.SetSuccessHandler([]() {
LOG(INFO) << "stage3 success handler called.";
FAIL() << ": Command should not have succeeded here.";
});
// we want this command to timeout
client.SendCommands({{Command::START, "ping", {client_node}}},
std::chrono::seconds(5));
LOG(INFO) << "End stage2";
};
auto stage1 = [&client, &client_node, &stage2] {
LOG(INFO) << "Begin stage1";
client.SetTimeoutHandler(
[]() { FAIL() << ": Command should not have timed out."; });
client.SetSuccessHandler(std::ref(stage2));
client.SendCommands({{Command::STOP, "ping", {client_node}}},
std::chrono::seconds(5));
LOG(INFO) << "End stage1";
};
// start the test body
client_loop.AddTimer(stage1)->Schedule(client_loop.monotonic_now() +
std::chrono::milliseconds(1));
// prepare the cleanup for starter. This will finish when we call
// `test_done_ = true;`.
SetupStarterCleanup(&starter);
// run `starter.Run()` in a thread to simulate it running on
// another process.
ThreadedStarterRunner starterd_thread(&starter);
client_loop.Run();
EXPECT_TRUE(success);
ASSERT_FALSE(starter.event_loop()->is_running());
}
} // namespace aos::starter