BitShares-Core  5.0.0
BitShares blockchain implementation and command-line interface software
gelf_appender.cpp
Go to the documentation of this file.
2 #include <fc/network/ip.hpp>
3 #include <fc/network/resolve.hpp>
6 #include <fc/reflect/variant.hpp>
7 #include <fc/thread/thread.hpp>
8 #include <fc/variant.hpp>
9 #include <fc/io/json.hpp>
10 #include <fc/crypto/city.hpp>
11 #include <fc/compress/zlib.hpp>
12 
13 #include <boost/lexical_cast.hpp>
14 #include <iomanip>
15 #include <iostream>
16 #include <queue>
17 #include <sstream>
18 #include <iostream>
19 
20 namespace fc
21 {
22 
24  {
25  public:
29 
30  impl(const config& c) :
31  cfg(c)
32  {
33  }
34 
36  {
37  }
38  };
39 
41  my( new impl( args.as<config>( FC_MAX_LOG_OBJECT_DEPTH ) ) )
42  {
43  try
44  {
45  try
46  {
47  // if it's a numeric address:port, this will parse it
48  my->gelf_endpoint = ip::endpoint::from_string(my->cfg.endpoint);
49  }
50  catch (...)
51  {
52  }
53  if (!my->gelf_endpoint)
54  {
55  // couldn't parse as a numeric ip address, try resolving as a DNS name.
56  // This can yield, so don't do it in the catch block above
57  string::size_type colon_pos = my->cfg.endpoint.find(':');
58  try
59  {
60  uint16_t port = boost::lexical_cast<uint16_t>(my->cfg.endpoint.substr(colon_pos + 1, my->cfg.endpoint.size()));
61 
62  string hostname = my->cfg.endpoint.substr( 0, colon_pos );
63  std::vector<ip::endpoint> endpoints = resolve(hostname, port);
64  if (endpoints.empty())
65  FC_THROW_EXCEPTION(unknown_host_exception, "The host name can not be resolved: ${hostname}",
66  ("hostname", hostname));
67  my->gelf_endpoint = endpoints.back();
68  }
69  catch (const boost::bad_lexical_cast&)
70  {
71  FC_THROW("Bad port: ${port}", ("port", my->cfg.endpoint.substr(colon_pos + 1, my->cfg.endpoint.size())));
72  }
73  }
74 
75  if (my->gelf_endpoint)
76  my->gelf_socket.open();
77  }
78  catch (...)
79  {
80  std::cerr << "error opening GELF socket to endpoint ${endpoint}" << my->cfg.endpoint << "\n";
81  }
82  }
83 
85  {}
86 
87  void gelf_appender::log(const log_message& message)
88  {
89  if (!my->gelf_endpoint)
90  return;
91 
92  log_context context = message.get_context();
93 
94  mutable_variant_object gelf_message;
95  gelf_message["version"] = "1.1";
96  gelf_message["host"] = my->cfg.host;
97  gelf_message["short_message"] = format_string( message.get_format(), message.get_data(), my->cfg.max_object_depth );
98 
99  gelf_message["timestamp"] = context.get_timestamp().time_since_epoch().count() / 1000000.;
100 
101  switch (context.get_log_level())
102  {
103  case log_level::debug:
104  gelf_message["level"] = 7; // debug
105  break;
106  case log_level::info:
107  gelf_message["level"] = 6; // info
108  break;
109  case log_level::warn:
110  gelf_message["level"] = 4; // warning
111  break;
112  case log_level::error:
113  gelf_message["level"] = 3; // error
114  break;
115  case log_level::all:
116  case log_level::off:
117  // these shouldn't be used in log messages, but do something deterministic just in case
118  gelf_message["level"] = 6; // info
119  break;
120  }
121 
122  if (!context.get_context().empty())
123  gelf_message["context"] = context.get_context();
124  gelf_message["_line"] = context.get_line_number();
125  gelf_message["_file"] = context.get_file();
126  gelf_message["_method_name"] = context.get_method();
127  gelf_message["_thread_name"] = context.get_thread_name();
128  if (!context.get_task_name().empty())
129  gelf_message["_task_name"] = context.get_task_name();
130 
131  string gelf_message_as_string;
132  try
133  {
134  gelf_message_as_string = json::to_string(gelf_message);
135  }
136  catch( const fc::assert_exception& e )
137  {
138  gelf_message_as_string = "{\"level\":3,\"short_message\":\"ERROR while generating log message\"}";
139  }
140  gelf_message_as_string = zlib_compress(gelf_message_as_string);
141 
142  // graylog2 expects the zlib header to be 0x78 0x9c
143  // but miniz.c generates 0x78 0x01 (indicating
144  // low compression instead of default compression)
145  // so change that here
146  assert(gelf_message_as_string[0] == (char)0x78);
147  if (gelf_message_as_string[1] == (char)0x01 ||
148  gelf_message_as_string[1] == (char)0xda)
149  gelf_message_as_string[1] = (char)0x9c;
150  assert(gelf_message_as_string[1] == (char)0x9c);
151 
152  // packets are sent by UDP, and they tend to disappear if they
153  // get too large. It's hard to find any solid numbers on how
154  // large they can be before they get dropped -- datagrams can
155  // be up to 64k, but anything over 512 is not guaranteed.
156  // You can play with this number, intermediate values like
157  // 1400 and 8100 are likely to work on most intranets.
158  const unsigned max_payload_size = 512;
159 
160  if (gelf_message_as_string.size() <= max_payload_size)
161  {
162  // no need to split
163  std::shared_ptr<char> send_buffer(new char[gelf_message_as_string.size()],
164  [](char* p){ delete[] p; });
165  memcpy(send_buffer.get(), gelf_message_as_string.c_str(),
166  gelf_message_as_string.size());
167 
168  my->gelf_socket.send_to(send_buffer, gelf_message_as_string.size(),
169  *my->gelf_endpoint);
170  }
171  else
172  {
173  // split the message
174  // we need to generate an 8-byte ID for this message.
175  // city hash should do
176  uint64_t message_id = city_hash64(gelf_message_as_string.c_str(), gelf_message_as_string.size());
177  const unsigned header_length = 2 /* magic */ + 8 /* msg id */ + 1 /* seq */ + 1 /* count */;
178  const unsigned body_length = max_payload_size - header_length;
179  unsigned total_number_of_packets = (gelf_message_as_string.size() + body_length - 1) / body_length;
180  unsigned bytes_sent = 0;
181  unsigned number_of_packets_sent = 0;
182  while (bytes_sent < gelf_message_as_string.size())
183  {
184  unsigned bytes_to_send = std::min((unsigned)gelf_message_as_string.size() - bytes_sent,
185  body_length);
186 
187  std::shared_ptr<char> send_buffer(new char[max_payload_size],
188  [](char* p){ delete[] p; });
189  char* ptr = send_buffer.get();
190  // magic number for chunked message
191  *(unsigned char*)ptr++ = 0x1e;
192  *(unsigned char*)ptr++ = 0x0f;
193 
194  // message id
195  memcpy(ptr, (char*)&message_id, sizeof(message_id));
196  ptr += sizeof(message_id);
197 
198  *(unsigned char*)(ptr++) = number_of_packets_sent;
199  *(unsigned char*)(ptr++) = total_number_of_packets;
200  memcpy(ptr, gelf_message_as_string.c_str() + bytes_sent,
201  bytes_to_send);
202  my->gelf_socket.send_to(send_buffer, header_length + bytes_to_send,
203  *my->gelf_endpoint);
204  ++number_of_packets_sent;
205  bytes_sent += bytes_to_send;
206  }
207  assert(number_of_packets_sent == total_number_of_packets);
208  }
209  }
210 } // fc
static string to_string(const variant &v, output_formatting format=stringify_large_ints_and_doubles, uint32_t max_depth=DEFAULT_MAX_RECURSION_DEPTH)
Definition: json.cpp:638
virtual void log(const log_message &m) override
optional< ip::endpoint > gelf_endpoint
std::string get_file() const
std::string format_string(const std::string &, const variant_object &, uint32_t max_object_depth=200)
std::string get_task_name() const
impl(const config &c)
std::string get_thread_name() const
uint64_t get_line_number() const
std::string zlib_compress(const std::string &in)
Definition: zlib.cpp:7
#define FC_THROW(...)
Definition: exception.hpp:366
#define FC_MAX_LOG_OBJECT_DEPTH
Definition: config.hpp:8
log_level get_log_level() const
std::shared_ptr< appender > ptr
Definition: appender.hpp:30
std::string get_format() const
int64_t count() const
Definition: time.hpp:28
provides information about where and when a log message was generated.
Definition: log_message.hpp:56
gelf_appender(const variant &args)
stores null, int64, uint64, double, bool, string, std::vector<variant>, and variant_object&#39;s.
Definition: variant.hpp:198
#define FC_THROW_EXCEPTION(EXCEPTION, FORMAT,...)
Definition: exception.hpp:378
static endpoint from_string(const string &s)
Definition: ip.cpp:74
log_context get_context() const
Defines exception&#39;s used by fc.
time_point get_timestamp() const
const microseconds & time_since_epoch() const
Definition: time.hpp:54
std::string get_context() const
aggregates a message along with the context and associated meta-information.
Definition: api.hpp:15
std::vector< fc::ip::endpoint > resolve(const std::string &host, uint16_t port)
Definition: resolve.cpp:7
variant_object get_data() const
uint64_t city_hash64(const char *buf, size_t len)
Definition: city.cpp:375
std::string get_method() const
An order-perserving dictionary of variant&#39;s.
cerr_t & cerr
Definition: iostream.cpp:176