Column Store
ProjectionData.h
Go to the documentation of this file.
1 #pragma once
2 
3 #include <stdio.h>
4 #include <string.h>
5 
6 #include <experimental/filesystem>
7 #include <fstream>
8 #include <map>
9 #include <memory>
10 #include <mutex>
11 #include <nlohmann/json.hpp>
12 #include <string>
13 #include <vector>
14 
16 #include "Parser/Projection.h"
17 #include "Parser/SchemaLoader.h"
18 #include "interfaces/DataRecord.h"
19 
20 #ifdef __cpp_lib_filesystem
21 namespace fs = std::filesystem;
22 #elif __cpp_lib_experimental_filesystem
23 namespace fs = std::experimental::filesystem;
24 #else
25 #error "no filesystem support ='("
26 #endif
27 
28 using nlohmann::json;
29 
30 namespace ColumnStore {
31 
33  std::vector<unsigned char> data;
34  ColumnStoreData(std::vector<unsigned char> &data) : data(data) {}
35  // ColumnStoreData(std::vector<unsigned char> &&data) : data(data) {}
36  // ColumnStoreData(ColumnStoreData &&data) : data(data.data) {}
38 
39  ColumnStoreData(DataRecord &record, std::vector<Parser::Column> &columns)
40  : ColumnStoreData(columns) {
41  set(record, columns);
42  }
43 
44  ColumnStoreData(std::vector<Parser::Column> &columns) {
45  int size = 0;
46  for (auto &c : columns) {
47  size += c;
48  // std::cout << c.type.size << std::endl;
49  }
50  // std::cout << size << std::endl;
51  data.resize(size);
52  }
53 
54  float getFloat(int i) {
55  return *reinterpret_cast<float *>(data.data() + i);
56  }
57 
58  int getInt(int i) { return *reinterpret_cast<int *>(data.data() + i); }
59 
60  std::string getString(int i, int length) {
61  return std::string(data.data() + i, data.data() + i + length);
62  }
63 
64  void setFloat(int i, float value) {
65  *reinterpret_cast<float *>(data.data() + i) = value;
66  }
67 
68  void setInt(int i, int value) {
69  *reinterpret_cast<int *>(data.data() + i) = value;
70  }
71 
72  void setString(int i, std::string &string) {
73  memcpy(data.data() + i, string.c_str(), string.size());
74  }
75 
76  void setString(int i, std::string string) {
77  memcpy(data.data() + i, string.c_str(), string.size());
78  }
79 
80  int size() const { return data.size(); }
81 
82  operator const char *() const { return (const char *)data.data(); }
83  operator char *() { return (char *)data.data(); }
84 
85  friend std::ostream &operator<<(std::ostream &output,
86  const ColumnStoreData &data) {
87  output.write(data, data.size());
88  return output;
89  }
90 
91  friend std::istream &operator>>(std::istream &input,
93  input.read(data, data.size());
94  return input;
95  }
96 
97  void read(std::istream &input) { input.read(*this, data.size()); }
98 
99  void write(std::ostream &output) { output.write(*this, data.size()); }
100 
101  DataRecord get(std::vector<Parser::Column> &columns) {
102  std::vector<DataValue> values;
103  values.reserve(columns.size());
104 
105  int offset = 0;
106  for (int i = 0; i < columns.size(); ++i) {
107  Parser::DataType type = columns[i];
108  if (type == ColumnStore::DataType::INT) {
109  values.push_back(DataValue(getInt(offset)));
110  offset += 4;
111  } else if (type == ColumnStore::DataType::FLOAT) {
112  values.push_back(DataValue(getFloat(offset)));
113  offset += 4;
114  } else if (type == ColumnStore::DataType::STRING) {
115  values.push_back(DataValue(getString(offset, type.size)));
116  offset += type.size;
117  } else
118  throw std::runtime_error("Unknown DataType");
119  }
120  return DataRecord(values);
121  }
122 
123  void set(DataRecord &record, std::vector<Parser::Column> &columns) {
124  int offset = 0;
125  for (int i = 0; i < columns.size(); ++i) {
126  Parser::DataType type = columns[i];
127  if (type == ColumnStore::DataType::INT) {
128  setInt(offset, record[i].as<int>());
129  offset += 4;
130  } else if (type == ColumnStore::DataType::FLOAT) {
131  setFloat(offset, record[i].as<float>());
132  offset += 4;
133  } else if (type == ColumnStore::DataType::STRING) {
134  setString(offset, record[i].as<std::string>());
135  offset += type.size;
136  } else
137  throw std::runtime_error("Unknown DataType");
138  }
139  }
140 };
141 
143  fs::path file;
144  json metadata;
146 
147  static std::mutex lock;
148  static std::map<fs::path, std::shared_ptr<MetadataManager>> metaMap;
149 
150  MetadataManager(fs::path file) : file(file) {
151  std::ifstream inp(file);
152  inp >> metadata;
153  inp.close();
154 
155  Parser::SchemaExtractor schema_extractor(metadata["schema_path"]);
156  schemaMetadata = schema_extractor.get_meta_data();
157  }
158 
159  public:
160  static std::shared_ptr<MetadataManager> getInstance(
161  std::string column_store_path) {
162  fs::path file = getMetaDataPath(column_store_path);
163 
164  if (MetadataManager::metaMap.count(file))
166 
167  std::lock_guard guard(MetadataManager::lock);
168 
169  // Using Double Checked Locking
170  if (!MetadataManager::metaMap.count(file))
172  std::shared_ptr<MetadataManager>(new MetadataManager(file));
173 
175  }
176 
177  void save() {
178  std::ofstream o(file);
179  o << metadata.dump(4) << std::endl;
180  o.close();
181  }
182 
183  operator json &() { return metadata; }
184  operator Parser::SchemaMetaData &() { return schemaMetadata; }
185  json &getFileMetadata() { return metadata; }
187 
188  json &getProjectionFileInfo(std::string projection) {
189  assert(metadata["projections"].count(projection));
190  return metadata["projections"][projection];
191  }
192 
193  Parser::Projection &getProjectionSchemaInfo(std::string projection) {
194  return schemaMetadata.get_projection(projection);
195  }
196 };
197 
198 std::mutex MetadataManager::lock;
199 std::map<fs::path, std::shared_ptr<MetadataManager>> MetadataManager::metaMap;
200 
201 typedef std::shared_ptr<MetadataManager> MetadataManagerSingleton;
202 
205 
206  public:
207  ColStoreLoader(std::string column_store_path)
208  : manager(MetadataManager::getInstance(column_store_path)) {}
209 
210  void update(std::string projection_name) {
211  std::cout << "Updating " << projection_name << std::endl;
212  auto &fileData = manager->getProjectionFileInfo(projection_name);
213  int tuplesMoved = fileData["tuples_move_count"];
214  auto &schema = manager->getProjectionSchemaInfo(projection_name);
215  auto &db_metadata = manager->getFileMetadata();
216  fs::path file = manager->getProjectionFileInfo(projection_name)["file"];
217  Postgres::PostgreSQLMetaData postgresql_metadata(db_metadata["source_db_config"]["db_name"],
218  db_metadata["source_db_config"]["db_user"],
219  db_metadata["source_db_config"]["db_password"],
220  manager->getSchemaMetadata());
221  Postgres::PostgreSQLDataSource postgresql_data_source(postgresql_metadata, projection_name);
222  postgresql_data_source.advance(tuplesMoved);
223  std::vector<Parser::Column> projection_columns = schema.get_metadata_columns();
224  ColumnStoreData projection_data(projection_columns);
225 
226  std::ofstream o(file, std::ofstream::out | std::ofstream::app);
227  int c = 0;
228  while (postgresql_data_source.hasNext()) {
229  auto record = postgresql_data_source.next();
230  projection_data.set(record, projection_columns);
231  projection_data.write(o);
232  c++;
233  if(c % 1000 == 0)
234  std::cout << c << " records processed" << std::endl;
235 
236  }
237  std::cout << c << " records processed" << std::endl;
238  o.close();
239 
240  int newTotal = tuplesMoved + c;
241 
242  fileData["tuples_move_count"] = newTotal;
243  manager->save();
244  }
245 
246  void updateAll() {
247  for (auto &[projection_name, _] :
248  manager->getFileMetadata()["projections"].items()) {
249  update(projection_name);
250  }
251 
252  manager->save();
253  }
254 };
255 
256 }; // namespace ColumnStore
ColumnStore::MetadataManager::save
void save()
Definition: ProjectionData.h:177
ColumnStore::ColStoreLoader::ColStoreLoader
ColStoreLoader(std::string column_store_path)
Definition: ProjectionData.h:207
ColumnStore::ColumnStoreData::operator<<
friend std::ostream & operator<<(std::ostream &output, const ColumnStoreData &data)
Definition: ProjectionData.h:85
ColumnStore::ColumnStoreData::getString
std::string getString(int i, int length)
Definition: ProjectionData.h:60
Postgres::PostgreSQLDataSource::hasNext
bool hasNext()
Definition: PostgreSQLDataGenerator.h:204
ColumnStore::ColumnStoreData::setString
void setString(int i, std::string &string)
Definition: ProjectionData.h:72
Parser::SchemaMetaData
Definition: SchemaMetaData.h:14
Postgres::PostgreSQLDataSource::advance
void advance(int recordCount)
Definition: PostgreSQLDataGenerator.h:194
ColumnStore::ColumnStoreData::read
void read(std::istream &input)
Definition: ProjectionData.h:97
ColumnStore::ColumnStoreData::getFloat
float getFloat(int i)
Definition: ProjectionData.h:54
ColumnStore
Definition: ColStoreDataGenerator.h:22
Postgres::PostgreSQLDataSource::next
DataRecord next()
Definition: PostgreSQLDataGenerator.h:137
ColumnStore::MetadataManager::schemaMetadata
Parser::SchemaMetaData schemaMetadata
Definition: ProjectionData.h:145
ColumnStore::MetadataManager::getProjectionSchemaInfo
Parser::Projection & getProjectionSchemaInfo(std::string projection)
Definition: ProjectionData.h:193
ColumnStore::ColumnStoreData::ColumnStoreData
ColumnStoreData(int size=0)
Definition: ProjectionData.h:37
ColumnStore::ColumnStoreData::size
int size() const
Definition: ProjectionData.h:80
ColumnStore::ColumnStoreData::get
DataRecord get(std::vector< Parser::Column > &columns)
Definition: ProjectionData.h:101
ColumnStore::DataType::STRING
@ STRING
ColumnStore::MetadataManager::MetadataManager
MetadataManager(fs::path file)
Definition: ProjectionData.h:150
ProjectionData.h
Parser::DataType
Definition: Column.h:7
Parser::Projection
Definition: Projection.h:44
ColumnStore::ColumnStoreData::ColumnStoreData
ColumnStoreData(std::vector< unsigned char > &data)
Definition: ProjectionData.h:34
ColumnStore::ColStoreLoader
Definition: ProjectionData.h:203
ColumnStore::ColStoreLoader::manager
MetadataManagerSingleton manager
Definition: ProjectionData.h:204
ColumnStore::MetadataManager::getFileMetadata
json & getFileMetadata()
Definition: ProjectionData.h:185
ColumnStore::DataRecord
Stores a row of data.
Definition: DataRecord.h:64
DataRecord.h
Data storage structures.
StoreInitializer.h
ColumnStore::ColumnStoreData::operator>>
friend std::istream & operator>>(std::istream &input, ColumnStoreData &data)
Definition: ProjectionData.h:91
ColumnStore::MetadataManagerSingleton
std::shared_ptr< MetadataManager > MetadataManagerSingleton
Definition: ProjectionData.h:201
ColumnStore::ColumnStoreData::ColumnStoreData
ColumnStoreData(DataRecord &record, std::vector< Parser::Column > &columns)
Definition: ProjectionData.h:39
Parser::SchemaMetaData::get_projection
Projection & get_projection(string projection_name)
Definition: SchemaMetaData.cpp:13
ColumnStore::ColumnStoreData::setFloat
void setFloat(int i, float value)
Definition: ProjectionData.h:64
ColumnStore::MetadataManager::metaMap
static std::map< fs::path, std::shared_ptr< MetadataManager > > metaMap
Definition: ProjectionData.h:148
Parser::SchemaExtractor::get_meta_data
SchemaMetaData get_meta_data()
Definition: SchemaExtractor.h:24
initConfig
void initConfig(int ac, char *av[])
Definition: ColumnStoreUpdater.cpp:12
ColumnStore::ColumnStoreData::data
std::vector< unsigned char > data
Definition: ProjectionData.h:33
ColumnStore::getMetaDataPath
fs::path getMetaDataPath(std::string column_store_path)
Definition: StoreInitializer.h:28
ColumnStore::MetadataManager::getSchemaMetadata
Parser::SchemaMetaData & getSchemaMetadata()
Definition: ProjectionData.h:186
SchemaLoader.h
ColumnStore::MetadataManager::lock
static std::mutex lock
Definition: ProjectionData.h:147
ColumnStore::ColumnStoreData::getInt
int getInt(int i)
Definition: ProjectionData.h:58
ColumnStore::DataType::INT
@ INT
ColumnStore::ColumnStoreData::write
void write(std::ostream &output)
Definition: ProjectionData.h:99
ColumnStore::ColumnStoreData::set
void set(DataRecord &record, std::vector< Parser::Column > &columns)
Definition: ProjectionData.h:123
ColumnStore::ColStoreLoader::updateAll
void updateAll()
Definition: ProjectionData.h:246
ColumnStore::ColumnStoreData::ColumnStoreData
ColumnStoreData(std::vector< Parser::Column > &columns)
Definition: ProjectionData.h:44
ColumnStore::MetadataManager
Definition: ProjectionData.h:142
ColumnStore::ColumnStoreData
Definition: ProjectionData.h:32
ColumnStore::MetadataManager::getInstance
static std::shared_ptr< MetadataManager > getInstance(std::string column_store_path)
Definition: ProjectionData.h:160
ColumnStore::ColumnStoreData::setString
void setString(int i, std::string string)
Definition: ProjectionData.h:76
ColumnStore::MetadataManager::file
fs::path file
Definition: ProjectionData.h:143
ColumnStore::MetadataManager::metadata
json metadata
Definition: ProjectionData.h:144
PostgreSQLMetaData.h
Parser::SchemaExtractor
Definition: SchemaExtractor.h:14
ColumnStore::DataValue
Implementation of a single data element.
Definition: DataRecord.h:28
Projection.h
ColumnStore::MetadataManager::getProjectionFileInfo
json & getProjectionFileInfo(std::string projection)
Definition: ProjectionData.h:188
Parser::DataType::size
int size
Definition: Column.h:9
ColumnStore::ColStoreLoader::update
void update(std::string projection_name)
Definition: ProjectionData.h:210
ColumnStore::ColumnStoreData::setInt
void setInt(int i, int value)
Definition: ProjectionData.h:68
Postgres::PostgreSQLDataSource
Definition: PostgreSQLDataGenerator.h:24
main
int main(int argc, char *argv[])
Definition: ColumnStoreUpdater.cpp:44
ColumnStore::DataType::FLOAT
@ FLOAT
config
po::variables_map config
Definition: ColumnStoreUpdater.cpp:10
Postgres::PostgreSQLMetaData
Definition: PostgreSQLMetaData.h:13
PostgreSQLDataGenerator.h