Fork 0
mirror of https://github.com/badaix/snapcast synced 2025-02-22 23:24:29 +01:00

583 lines
19 KiB

This file is part of snapcast
Copyright (C) 2014-2025 Johannes Pohl
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
// prototype/interface header file
#include "pcm_stream.hpp"
// local headers
#include "common/aixlog.hpp"
#include "common/base64.h"
#include "common/error_code.hpp"
#include "common/snap_exception.hpp"
#include "common/str_compat.hpp"
#include "common/utils/string_utils.hpp"
#include "control_error.hpp"
#include "encoder/encoder_factory.hpp"
#include "image_cache.hpp"
// 3rd party headers
#include <boost/asio/ip/host_name.hpp>
// standard headers
#include <memory>
using namespace std;
namespace streamreader
static constexpr auto LOG_TAG = "PcmStream";
PcmStream::PcmStream(PcmStream::Listener* pcmListener, boost::asio::io_context& ioc, const ServerSettings& server_settings, const StreamUri& uri)
: active_(false), strand_(boost::asio::make_strand(ioc.get_executor())), pcmListeners_{pcmListener}, uri_(uri), chunk_ms_(20), state_(ReaderState::kIdle),
server_settings_(server_settings), req_id_(0), property_timer_(strand_)
encoder::EncoderFactory encoderFactory;
if (uri_.query.find(kUriCodec) == uri_.query.end())
throw SnapException("Stream URI must have a codec");
encoder_ = encoderFactory.createEncoder(uri_.query[kUriCodec]);
if (uri_.query.find(kUriName) == uri_.query.end())
throw SnapException("Stream URI must have a name");
name_ = uri_.query[kUriName];
if (uri_.query.find(kUriSampleFormat) == uri_.query.end())
throw SnapException("Stream URI must have a sampleformat");
sampleFormat_ = SampleFormat(uri_.query[kUriSampleFormat]);
chunk_ = std::make_unique<msg::PcmChunk>(sampleFormat_, chunk_ms_);
silent_chunk_ = std::vector<char>(chunk_->payloadSize, 0);
LOG(DEBUG, LOG_TAG) << "Chunk duration: " << chunk_->durationMs() << " ms, frames: " << chunk_->getFrameCount() << ", size: " << chunk_->payloadSize
<< "\n";
LOG(INFO, LOG_TAG) << "PcmStream: " << name_ << ", sampleFormat: " << sampleFormat_.toString() << "\n";
if (uri_.query.find(kControlScript) != uri_.query.end())
std::string params;
if (uri_.query.find(kControlScriptParams) != uri_.query.end())
params = uri_.query[kControlScriptParams];
stream_ctrl_ = std::make_unique<ScriptStreamControl>(strand_, server_settings_.stream.plugin_dir, uri_.query[kControlScript], std::move(params));
if (uri_.query.find(kUriChunkMs) != uri_.query.end())
chunk_ms_ = cpt::stoul(uri_.query[kUriChunkMs]);
double silence_threshold_percent = 0.;
silence_threshold_percent = cpt::stod(uri_.getQuery("silence_threshold_percent", "0"));
catch (...)
int32_t max_amplitude = std::pow(2, sampleFormat_.bits() - 1) - 1;
silence_threshold_ = max_amplitude * (silence_threshold_percent / 100.);
LOG(DEBUG, LOG_TAG) << "Silence threshold percent: " << silence_threshold_percent << ", silence threshold amplitude: " << silence_threshold_ << "\n";
std::shared_ptr<msg::CodecHeader> PcmStream::getHeader()
return encoder_->getHeader();
const StreamUri& PcmStream::getUri() const
return uri_;
const std::string& PcmStream::getName() const
return name_;
const std::string& PcmStream::getId() const
return getName();
const SampleFormat& PcmStream::getSampleFormat() const
return sampleFormat_;
std::string PcmStream::getCodec() const
return encoder_->name();
void PcmStream::onControlRequest(const jsonrpcpp::Request& request)
LOG(INFO, LOG_TAG) << "Request: " << request.method() << ", id: " << request.id() << ", params: " << request.params().to_json() << "\n";
void PcmStream::pollProperties()
property_timer_.async_wait([this, self = shared_from_this()](const boost::system::error_code& ec)
if (!ec)
stream_ctrl_->command({++req_id_, "Plugin.Stream.Player.GetProperties"}, [this, self = shared_from_this()](const jsonrpcpp::Response& response)
LOG(INFO, LOG_TAG) << "Response for Plugin.Stream.Player.GetProperties: " << response.to_json() << "\n";
if (response.error().code() == 0)
void PcmStream::onControlNotification(const jsonrpcpp::Notification& notification)
LOG(DEBUG, LOG_TAG) << "Notification method: " << notification.method() << ", params: " << notification.params().to_json() << "\n";
if (notification.method() == "Plugin.Stream.Player.Properties")
LOG(DEBUG, LOG_TAG) << "Received properties notification\n";
else if (notification.method() == "Plugin.Stream.Ready")
LOG(DEBUG, LOG_TAG) << "Plugin is ready\n";
stream_ctrl_->command({++req_id_, "Plugin.Stream.Player.GetProperties"}, [this, self = shared_from_this()](const jsonrpcpp::Response& response)
LOG(INFO, LOG_TAG) << "Response for Plugin.Stream.Player.GetProperties: " << response.to_json() << "\n";
if (response.error().code() == 0)
// TODO: Add capabilities or settings?
// {"jsonrpc": "2.0", "method": "Plugin.Stream.Ready", "params": {"pollProperties": 10, "responseTimeout": 5}}
// pollProperties();
else if (notification.method() == "Plugin.Stream.Log")
std::string severity = notification.params().get("severity");
std::string message = notification.params().get("message");
LOG(INFO, LOG_TAG) << "Plugin log - severity: " << severity << ", message: " << message << "\n";
LOG(WARNING, LOG_TAG) << "Received unknown notification method: '" << notification.method() << "'\n";
catch (const std::exception& e)
LOG(ERROR, LOG_TAG) << "Error while receiving notification: " << e.what() << '\n';
void PcmStream::onControlLog(std::string line)
if (line.back() == '\r')
line.resize(line.size() - 1);
if (line.empty())
auto tmp = utils::string::tolower_copy(line);
AixLog::Severity severity = AixLog::Severity::info;
if (tmp.find(" trace") != string::npos)
severity = AixLog::Severity::trace;
else if (tmp.find(" debug") != string::npos)
severity = AixLog::Severity::debug;
else if (tmp.find(" info") != string::npos)
severity = AixLog::Severity::info;
else if (tmp.find(" warning") != string::npos)
severity = AixLog::Severity::warning;
else if (tmp.find(" error") != string::npos)
severity = AixLog::Severity::error;
else if ((tmp.find(" fatal") != string::npos) || (tmp.find(" critical") != string::npos))
severity = AixLog::Severity::fatal;
LOG(severity, LOG_TAG) << "Stream: " << getId() << ", message: " << line << "\n";
void PcmStream::start()
LOG(DEBUG, LOG_TAG) << "Start: " << name_ << ", type: " << uri_.scheme << ", sampleformat: " << sampleFormat_.toString() << ", codec: " << getCodec()
<< "\n";
encoder_->init([this, self = shared_from_this()](const encoder::Encoder& encoder, std::shared_ptr<msg::PcmChunk> chunk, double duration)
{ chunkEncoded(encoder, std::move(chunk), duration); }, sampleFormat_);
if (stream_ctrl_)
stream_ctrl_->start(getId(), server_settings_, [this, self = shared_from_this()](const jsonrpcpp::Notification& notification)
{ onControlNotification(notification); }, [this, self = shared_from_this()](const jsonrpcpp::Request& request) { onControlRequest(request); },
[this, self = shared_from_this()](std::string message) { onControlLog(std::move(message)); });
active_ = true;
void PcmStream::stop()
active_ = false;
bool PcmStream::isSilent(const msg::PcmChunk& chunk) const
if (silence_threshold_ == 0)
return (std::memcmp(chunk.payload, silent_chunk_.data(), silent_chunk_.size()) == 0);
if (sampleFormat_.sampleSize() == 1)
auto payload = chunk.getPayload<int8_t>();
for (size_t n = 0; n < payload.second; ++n)
if (abs(payload.first[n]) > silence_threshold_)
return false;
else if (sampleFormat_.sampleSize() == 2)
auto payload = chunk.getPayload<int16_t>();
for (size_t n = 0; n < payload.second; ++n)
if (abs(payload.first[n]) > silence_threshold_)
return false;
else if (sampleFormat_.sampleSize() == 4)
auto payload = chunk.getPayload<int32_t>();
for (size_t n = 0; n < payload.second; ++n)
if (abs(payload.first[n]) > silence_threshold_)
return false;
return true;
ReaderState PcmStream::getState() const
return state_;
void PcmStream::setState(ReaderState newState)
if (newState != state_)
LOG(INFO, LOG_TAG) << "State changed: " << name_ << ", state: " << state_ << " => " << newState << "\n";
state_ = newState;
for (auto* listener : pcmListeners_)
if (listener != nullptr)
listener->onStateChanged(this, newState);
void PcmStream::chunkEncoded(const encoder::Encoder& encoder, std::shared_ptr<msg::PcmChunk> chunk, double duration)
std::ignore = encoder;
// LOG(TRACE, LOG_TAG) << "onChunkEncoded: " << getName() << ", duration: " << duration
// << " ms, compression ratio: " << 100 - ceil(100 * (chunk->durationMs() / duration)) << "%\n";
if (duration <= 0)
// absolute start timestamp is the tvEncodedChunk_
auto microsecs = std::chrono::duration_cast<std::chrono::microseconds>(tvEncodedChunk_.time_since_epoch()).count();
chunk->timestamp.sec = microsecs / 1000000;
chunk->timestamp.usec = microsecs % 1000000;
// update tvEncodedChunk_ to the next chunk start by adding the current chunk duration
tvEncodedChunk_ += std::chrono::nanoseconds(static_cast<std::chrono::nanoseconds::rep>(duration * 1000000));
for (auto* listener : pcmListeners_)
if (listener != nullptr)
listener->onChunkEncoded(this, chunk, duration);
void PcmStream::chunkRead(const msg::PcmChunk& chunk)
for (auto* listener : pcmListeners_)
if (listener != nullptr)
listener->onChunkRead(this, chunk);
void PcmStream::resync(const std::chrono::nanoseconds& duration)
for (auto* listener : pcmListeners_)
if (listener != nullptr)
listener->onResync(this, duration.count() / 1000000.);
json PcmStream::toJson() const
std::lock_guard<std::recursive_mutex> lock(mutex_);
json j = {
{"uri", uri_.toJson()},
{"id", getId()},
{"status", to_string(state_)},
j["properties"] = properties_.toJson();
return j;
void PcmStream::addListener(PcmStream::Listener* pcmListener)
const Properties& PcmStream::getProperties() const
// std::lock_guard<std::recursive_mutex> lock(mutex_);
return properties_;
void PcmStream::setShuffle(bool shuffle, ResultHandler handler)
LOG(DEBUG, LOG_TAG) << "setShuffle: " << shuffle << "\n";
if (!properties_.can_control)
return handler({ControlErrc::can_control_is_false});
sendRequest("Plugin.Stream.Player.SetProperty", {"shuffle", shuffle}, std::move(handler));
void PcmStream::setLoopStatus(LoopStatus status, ResultHandler handler)
LOG(DEBUG, LOG_TAG) << "setLoopStatus: " << status << "\n";
if (!properties_.can_control)
return handler({ControlErrc::can_control_is_false});
sendRequest("Plugin.Stream.Player.SetProperty", {"loopStatus", to_string(status)}, std::move(handler));
void PcmStream::setVolume(uint16_t volume, ResultHandler handler)
LOG(DEBUG, LOG_TAG) << "setVolume: " << volume << "\n";
if (!properties_.can_control)
return handler({ControlErrc::can_control_is_false});
sendRequest("Plugin.Stream.Player.SetProperty", {"volume", volume}, std::move(handler));
void PcmStream::setMute(bool mute, ResultHandler handler)
LOG(DEBUG, LOG_TAG) << "setMute: " << mute << "\n";
if (!properties_.can_control)
return handler({ControlErrc::can_control_is_false});
sendRequest("Plugin.Stream.Player.SetProperty", {"mute", mute}, std::move(handler));
void PcmStream::setRate(float rate, ResultHandler handler)
LOG(DEBUG, LOG_TAG) << "setRate: " << rate << "\n";
if (!properties_.can_control)
return handler({ControlErrc::can_control_is_false});
sendRequest("Plugin.Stream.Player.SetProperty", {"rate", rate}, std::move(handler));
void PcmStream::setPosition(std::chrono::milliseconds position, ResultHandler handler)
LOG(DEBUG, LOG_TAG) << "setPosition\n";
if (!properties_.can_seek)
return handler({ControlErrc::can_seek_is_false});
json params;
params["command"] = "setPosition";
json j;
j["position"] = position.count() / 1000.f;
params["params"] = j;
sendRequest("Plugin.Stream.Player.Control", params, std::move(handler));
void PcmStream::seek(std::chrono::milliseconds offset, ResultHandler handler)
LOG(DEBUG, LOG_TAG) << "seek\n";
if (!properties_.can_seek)
return handler({ControlErrc::can_seek_is_false});
json params;
params["command"] = "seek";
json j;
j["offset"] = offset.count() / 1000.f;
params["params"] = j;
sendRequest("Plugin.Stream.Player.Control", params, std::move(handler));
void PcmStream::next(ResultHandler handler)
LOG(DEBUG, LOG_TAG) << "next\n";
if (!properties_.can_go_next)
return handler({ControlErrc::can_go_next_is_false});
sendRequest("Plugin.Stream.Player.Control", {"command", "next"}, std::move(handler));
void PcmStream::previous(ResultHandler handler)
LOG(DEBUG, LOG_TAG) << "previous\n";
if (!properties_.can_go_previous)
return handler({ControlErrc::can_go_previous_is_false});
sendRequest("Plugin.Stream.Player.Control", {"command", "previous"}, std::move(handler));
void PcmStream::pause(ResultHandler handler)
LOG(DEBUG, LOG_TAG) << "pause\n";
if (!properties_.can_pause)
return handler({ControlErrc::can_pause_is_false});
sendRequest("Plugin.Stream.Player.Control", {"command", "pause"}, std::move(handler));
void PcmStream::playPause(ResultHandler handler)
LOG(DEBUG, LOG_TAG) << "playPause\n";
if (!properties_.can_pause)
return handler({ControlErrc::can_play_is_false});
sendRequest("Plugin.Stream.Player.Control", {"command", "playPause"}, std::move(handler));
void PcmStream::stop(ResultHandler handler)
LOG(DEBUG, LOG_TAG) << "stop\n";
if (!properties_.can_control)
return handler({ControlErrc::can_control_is_false});
sendRequest("Plugin.Stream.Player.Control", {"command", "stop"}, std::move(handler));
void PcmStream::play(ResultHandler handler)
LOG(DEBUG, LOG_TAG) << "play\n";
if (!properties_.can_play)
return handler({ControlErrc::can_play_is_false});
sendRequest("Plugin.Stream.Player.Control", {"command", "play"}, std::move(handler));
void PcmStream::sendRequest(const std::string& method, const jsonrpcpp::Parameter& params, ResultHandler handler)
if (!stream_ctrl_)
return handler({ControlErrc::can_not_control});
jsonrpcpp::Request req(++req_id_, method, params);
stream_ctrl_->command(req, [handler](const jsonrpcpp::Response& response)
if (response.error().code() != 0)
handler({static_cast<ControlErrc>(response.error().code()), response.error().data()});
void PcmStream::setProperties(const Properties& properties)
std::lock_guard<std::recursive_mutex> lock(mutex_);
Properties props = properties;
// Missing metadata means the data didn't change, so
// enrich the new properites with old metadata
if (!props.metadata.has_value() && properties_.metadata.has_value())
props.metadata = properties_.metadata;
// If the cover image is availbale as raw data, cache it on the HTTP Server to make it also available via HTTP
if (props.metadata.has_value() && props.metadata->art_data.has_value() && !props.metadata->art_url.has_value())
auto data = base64_decode(props.metadata->art_data->data);
auto md5 = ImageCache::instance().setImage(getName(), std::move(data), props.metadata->art_data->extension);
std::stringstream url;
if (server_settings_.http.url_prefix.empty())
std::string proto{"http"};
size_t port{server_settings_.http.port};
if (server_settings_.http.ssl_enabled)
proto = "https";
port = server_settings_.http.ssl_port;
url << proto << "://" << server_settings_.http.host << ":" << port;
url << server_settings_.http.url_prefix;
url << "/__image_cache?name=" << md5;
props.metadata->art_url = url.str();
else if (!props.metadata.has_value() || !props.metadata->art_data.has_value())
if (props == properties_)
LOG(DEBUG, LOG_TAG) << "setProperties: Properties did not change\n";
properties_ = std::move(props);
LOG(DEBUG, LOG_TAG) << "setProperties, stream: " << getId() << ", properties: " << properties_.toJson() << "\n";
// Trigger a stream update
for (auto* listener : pcmListeners_)
if (listener != nullptr)
listener->onPropertiesChanged(this, properties_);
} // namespace streamreader