BitShares-Core  4.0.0
BitShares blockchain implementation and command-line interface software
buffered_iostream.cpp
Go to the documentation of this file.
3 #include <boost/asio/streambuf.hpp>
4 #include <iostream>
5 
6 #include <fc/log/logger.hpp>
7 
8 namespace fc
9 {
10  namespace detail
11  {
13  {
14  public:
16  _istr(std::move(is))
17 #ifndef NDEBUG
19 #endif
20  {}
21 
23  boost::asio::streambuf _rdbuf;
24  std::shared_ptr<char> _shared_read_buffer;
25 #ifndef NDEBUG
27 #endif
28  };
29  static const size_t minimum_read_size = 1024;
30  }
31 
33  :my( new detail::buffered_istream_impl( std::move(is) ) )
34  {
35  FC_ASSERT( my->_istr != nullptr, " this shouldn't be null" );
36  }
37 
39  :my( std::move(o.my) ){}
40 
42  {
43  my = std::move(i.my);
44  return *this;
45  }
46 
48 
49  size_t buffered_istream::readsome( char* buf, size_t len )
50  {
51  size_t bytes_from_rdbuf = static_cast<size_t>(my->_rdbuf.sgetn(buf, len));
52  if (bytes_from_rdbuf)
53  return bytes_from_rdbuf;
54 
55 
56  if( len > detail::minimum_read_size )
57  return my->_istr->readsome(buf,len);
58 
59  char tmp[detail::minimum_read_size];
60  size_t bytes_read = my->_istr->readsome( tmp, detail::minimum_read_size );
61 
62  size_t bytes_to_deliver_immediately = std::min<size_t>(bytes_read,len);
63 
64  memcpy( buf, tmp, bytes_to_deliver_immediately );
65 
66 
67  if( bytes_read > len )
68  {
69  my->_rdbuf.sputn( tmp + len, bytes_read - len );
70  }
71 
72  return bytes_to_deliver_immediately;
73  }
74 
75  size_t buffered_istream::readsome( const std::shared_ptr<char>& buf, size_t len, size_t offset )
76  {
77  size_t bytes_from_rdbuf = static_cast<size_t>(my->_rdbuf.sgetn(buf.get() + offset, len));
78  if (bytes_from_rdbuf)
79  return bytes_from_rdbuf;
80 
81 
82  if( len > detail::minimum_read_size )
83  return my->_istr->readsome(buf.get() + offset, len);
84 
85 #ifndef NDEBUG
86  // This code was written with the assumption that you'd only be making one call to readsome
87  // at a time so it reuses _shared_read_buffer. If you really need to make concurrent calls to
88  // readsome(), you'll need to prevent reusing _shared_read_buffer here
89  struct check_buffer_in_use {
90  bool& _buffer_in_use;
91  check_buffer_in_use(bool& buffer_in_use) : _buffer_in_use(buffer_in_use) { assert(!_buffer_in_use); _buffer_in_use = true; }
92  ~check_buffer_in_use() { assert(_buffer_in_use); _buffer_in_use = false; }
93  } buffer_in_use_checker(my->_shared_read_buffer_in_use);
94 #endif
95 
96  if (!my->_shared_read_buffer)
97  my->_shared_read_buffer.reset(new char[detail::minimum_read_size], [](char* p){ delete[] p; });
98  size_t bytes_read = my->_istr->readsome( my->_shared_read_buffer, detail::minimum_read_size, 0 );
99 
100  size_t bytes_to_deliver_immediately = std::min<size_t>(bytes_read,len);
101 
102  memcpy( buf.get() + offset, my->_shared_read_buffer.get(), bytes_to_deliver_immediately );
103 
104  if( bytes_read > len )
105  {
106  my->_rdbuf.sputn( my->_shared_read_buffer.get() + len, bytes_read - len );
107  }
108 
109  return bytes_to_deliver_immediately;
110  }
111 
113  {
114  if( my->_rdbuf.size() )
115  {
116  return my->_rdbuf.sgetc();
117  }
118 
119  char tmp[detail::minimum_read_size];
120  size_t bytes_read = my->_istr->readsome( tmp, detail::minimum_read_size );
121  my->_rdbuf.sputn( tmp, bytes_read );
122 
123  if( my->_rdbuf.size() )
124  {
125  return my->_rdbuf.sgetc();
126  }
127  FC_THROW_EXCEPTION( assert_exception,
128  "at least one byte should be available, or eof should have been thrown" );
129  }
130 
131 
132  namespace detail
133  {
135  {
136  public:
138  _ostr(std::move(os))
139 #ifndef NDEBUG
140  ,_shared_write_buffer_in_use(false)
141 #endif
142  {}
143 
145  boost::asio::streambuf _rdbuf;
146  std::shared_ptr<char> _shared_write_buffer;
147 #ifndef NDEBUG
149 #endif
150  };
151  }
152 
154  :my( new detail::buffered_ostream_impl( std::move(os) ) )
155  {
156  }
157 
159  :my( std::move(o.my) ){}
160 
162  {
163  my = std::move(i.my);
164  return *this;
165  }
166 
168 
169  size_t buffered_ostream::writesome( const char* buf, size_t len )
170  {
171  size_t written = static_cast<size_t>(my->_rdbuf.sputn( buf, len ));
172  if( written < len ) { flush(); }
173  return written + static_cast<size_t>(my->_rdbuf.sputn( buf+written, len-written ));
174  }
175 
176  size_t buffered_ostream::writesome( const std::shared_ptr<const char>& buf, size_t len, size_t offset )
177  {
178  return writesome(buf.get() + offset, len);
179  }
180 
182  {
183 #ifndef NDEBUG
184  // This code was written with the assumption that you'd only be making one call to flush
185  // at a time so it reuses _shared_write_buffer. If you really need to make concurrent calls to
186  // flush(), you'll need to prevent reusing _shared_write_buffer here
187  struct check_buffer_in_use {
188  bool& _buffer_in_use;
189  check_buffer_in_use(bool& buffer_in_use) : _buffer_in_use(buffer_in_use) { assert(!_buffer_in_use); _buffer_in_use = true; }
190  ~check_buffer_in_use() { assert(_buffer_in_use); _buffer_in_use = false; }
191  } buffer_in_use_checker(my->_shared_write_buffer_in_use);
192 #endif
193  const size_t write_buffer_size = 2048;
194  if (!my->_shared_write_buffer)
195  my->_shared_write_buffer.reset(new char[write_buffer_size], [](char* p){ delete[] p; });
196 
197  while( size_t bytes_from_rdbuf = static_cast<size_t>(my->_rdbuf.sgetn(my->_shared_write_buffer.get(), write_buffer_size)) )
198  my->_ostr->write( my->_shared_write_buffer, bytes_from_rdbuf );
199  my->_ostr->flush();
200  }
201 
203  {
204  flush();
205  my->_ostr->close();
206  }
207 
208 
209 }
std::shared_ptr< char > _shared_write_buffer
buffered_istream & operator=(buffered_istream &&i)
virtual char peek() const
buffered_ostream & operator=(buffered_ostream &&m)
buffered_ostream(ostream_ptr o, size_t bufsize=4096)
std::shared_ptr< char > _shared_read_buffer
#define FC_ASSERT(TEST,...)
Checks a condition and throws an assert_exception if the test is FALSE.
Definition: exception.hpp:345
#define FC_THROW_EXCEPTION(EXCEPTION, FORMAT,...)
Definition: exception.hpp:378
virtual std::size_t readsome(char *buf, std::size_t len)
Defines exception&#39;s used by fc.
virtual size_t writesome(const char *buf, size_t len)
Definition: api.hpp:15
buffered_istream(istream_ptr is)
Reads data from an unbuffered stream and enables peek functionality.
std::shared_ptr< istream > istream_ptr
Definition: iostream.hpp:35
std::shared_ptr< ostream > ostream_ptr
Definition: iostream.hpp:59